Source code for node_action_server_ros_iot_bridge

#!/usr/bin/env python

# ROS Node - Action Server - IoT ROS Bridge

"""
This node is responsible for running an Action Server which serves as a bridge between
Ros and IoT.

Any task like getting data from a MQTT topic to pushing data on spreadsheets is
achieved by this Action Server. All the necessary parameters are first loaded from the
parameter server. Then whenever a client sends a goal to this Action Server, a new thread
is started to process the goal asynchronously. New thread is an essential requirement to process
multiple goals at once.
"""

from __future__ import print_function

import threading
import rospy
import actionlib

from pkg_ros_iot_bridge.msg import msgRosIotAction
from pkg_ros_iot_bridge.msg import msgRosIotResult

from pkg_ros_iot_bridge.msg import msgMqttSub    # Message Class for MQTT Subscription Messages

from pyiot import iot                            # Custom Python Module to perform MQTT Tasks


[docs]class RosIotBridgeActionServer(object): """ Initialize the Action Server and load all necessary parameters from the parameter server. This class can subscribe and publish to any MQTT topic. If a MQTT topic is subscribed, then any data published in that topic is published to another ROS topic "/ros_iot_bridge/mqtt/sub" so that other ROS nodes can access that data for their use. It can also get and post using HTTP protocol to update google spreadsheets. """ # pylint: disable=too-many-instance-attributes # Constructor def __init__(self): # Initialize the Action Server self._as = actionlib.ActionServer('/action_ros_iot', msgRosIotAction, self.on_goal, self.on_cancel, auto_start=False) ''' * self.on_goal - It is the function pointer which points to a function which will be called when the Action Server receives a Goal. * self.on_cancel - It is the function pointer which points to a function which will be called when the Action Server receives a Cancel Request. ''' # Read and Store IoT Configuration data from Parameter Server param_config_iot = rospy.get_param('config_pyiot') self._config_mqtt_server_url = param_config_iot['mqtt']['server_url'] self._config_mqtt_server_port = param_config_iot['mqtt']['server_port'] self._config_mqtt_sub_topic = param_config_iot['mqtt']['topic_sub'] self._config_mqtt_pub_topic = param_config_iot['mqtt']['topic_pub'] self._config_mqtt_qos = param_config_iot['mqtt']['qos'] self._config_mqtt_sub_cb_ros_topic = param_config_iot['mqtt']['sub_cb_ros_topic'] self._config_spread_sheet_id = param_config_iot['google_apps']['spread_sheet_id'] self._config_sheet_id_eyantra = param_config_iot['google_apps']['spread_sheet_id_eyantra'] print(param_config_iot) # Initialize ROS Topic Publication # Incoming message from MQTT Subscription will be published on a ROS # Topic (/ros_iot_bridge/mqtt/sub). # ROS Nodes can subscribe to this ROS Topic (/ros_iot_bridge/mqtt/sub) to get # messages from MQTT Subscription. self._handle_ros_pub = rospy.Publisher(self._config_mqtt_sub_cb_ros_topic, msgMqttSub, queue_size=10) # Subscribe to MQTT Topic (eyrc/isPicuTP/iot_to_ros) which is defined in config_pyiot.yaml. # self.mqtt_sub_callback() function will be called when there is a message from # MQTT Subscription. ret = iot.mqtt_subscribe_thread_start(self.mqtt_sub_callback, self._config_mqtt_server_url, self._config_mqtt_server_port, self._config_mqtt_sub_topic, self._config_mqtt_qos) if ret == 0: rospy.loginfo("MQTT Subscribe Thread Started") else: rospy.logerr("Failed to start MQTT Subscribe Thread") # Start the Action Server self._as.start() rospy.loginfo("Started ROS-IoT Bridge Action Server.") # This is a callback function for MQTT Subscriptions
[docs] def mqtt_sub_callback(self, _client, _userdata, message): """ This function is called whenever something is published to the subscribed MQTT topic and publish the same data to a ROS topic so that other ROS nodes can access it :param _client: Client :param _userdata: User Data :param message: Message published on the MQTT topic :type message: msgMqttSub :return: None """ payload = str(message.payload.decode("utf-8")) print("[MQTT SUB CB] Message: ", payload) print("[MQTT SUB CB] Topic: ", message.topic) msg_mqtt_sub = msgMqttSub() msg_mqtt_sub.timestamp = rospy.Time.now() msg_mqtt_sub.topic = message.topic msg_mqtt_sub.message = payload self._handle_ros_pub.publish(msg_mqtt_sub)
# This function will be called when Action Server receives a Goal
[docs] def on_goal(self, goal_handle): """ This function is called whenever a new goal is received by the Action Server :param goal_handle: Unique ID of the goal :return: None """ goal = goal_handle.get_goal() rospy.loginfo("Received new goal from Client") rospy.loginfo(goal) # Validate incoming goal parameters if goal.protocol == "mqtt" or goal.protocol == "http": if goal.mode == "pub" or goal.mode == "sub" or goal.mode == "POST": goal_handle.set_accepted() # Start a new thread to process new goal from the client (Asynchronous Processing) # 'self.process_goal' - is the function pointer which points to a function that # will process incoming Goals thread = threading.Thread(name="worker", target=self.process_goal, args=(goal_handle,)) thread.start() else: goal_handle.set_rejected() return else: goal_handle.set_rejected() return
# This function is called is a separate thread to process Goal.
[docs] def process_goal(self, goal_handle): """ This function is called to create a separate thread to process the Goal. :param goal_handle: Unique ID of the goal :return: None """ # pylint: disable=too-many-branches result = msgRosIotResult() goal_id = goal_handle.get_goal_id() rospy.loginfo("Processing goal : " + str(goal_id.id)) goal = goal_handle.get_goal() # Goal Processing if goal.protocol == "mqtt": rospy.logwarn("MQTT") if goal.mode == "pub": rospy.logwarn("MQTT PUB Goal ID: " + str(goal_id.id)) rospy.logwarn(goal.topic + " > " + goal.message) ret = iot.mqtt_publish(self._config_mqtt_server_url, self._config_mqtt_server_port, goal.topic, goal.message, self._config_mqtt_qos) if ret == 0: rospy.loginfo("MQTT Publish Successful.") result.flag_success = True else: rospy.logerr("MQTT Failed to Publish") result.flag_success = False elif goal.mode == "sub": rospy.logwarn("MQTT SUB Goal ID: " + str(goal_id.id)) rospy.logwarn(goal.topic) ret = iot.mqtt_subscribe_thread_start(self.mqtt_sub_callback, self._config_mqtt_server_url, self._config_mqtt_server_port, goal.topic, self._config_mqtt_qos) if ret == 0: rospy.loginfo("MQTT Subscribe Thread Started") result.flag_success = True else: rospy.logerr("Failed to start MQTT Subscribe Thread") result.flag_success = False # To handle HTTP goals elif goal.protocol == "http": rospy.logwarn("HTTP") if goal.mode == "POST": # In case of HTTP request, goal.topic represents the spreadsheet if goal.topic == 'eyantra': ret = iot.http_post(self._config_sheet_id_eyantra, goal.message) else: ret = iot.http_post(self._config_spread_sheet_id, goal.message) if ret == 'success': rospy.loginfo("Spreadsheet updated successfully") result.flag_success = True else: rospy.loginfo("Error in updating spreadsheet") result.flag_success = False rospy.loginfo("Send goal result to client") if result.flag_success is True: rospy.loginfo("Succeeded") goal_handle.set_succeeded(result) else: rospy.loginfo("Goal Failed. Aborting.") goal_handle.set_aborted(result) rospy.loginfo("Goal ID: " + str(goal_id.id) + " Goal Processing Done.")
# This function will be called when Goal Cancel request is send to the Action Server
[docs] def on_cancel(self, goal_handle): """ This function will be called when Goal Cancel request is send to the Action Server. :param goal_handle: Unique ID of the goal :return: None """ # pylint: disable=no-self-use rospy.loginfo("Received cancel request.") goal_id = goal_handle.get_goal_id() rospy.loginfo("Goal with goal id ", goal_id, " has been cancelled")
# Main
[docs]def main(): """ Main function to create an object of class RosIotBridgeActionServer to initialize the Action Server. :return: None """ rospy.init_node('node_action_server_ros_iot_bridge') _action_server = RosIotBridgeActionServer() rospy.spin()
if __name__ == '__main__': main()