Source code for robot_brain

#!/usr/bin/env python3
"""
.. module:: robot_brain.py
   :platform: Unix
   :synopsis: Script for the initialization of the robots required data.  
.. moduleauthor:: yeshwanth guru krishnakumar

Robot brain data for control and planning executed in the script .
 
"""
import random
import rospy
import threading
import smach_ros
import smach
from smach import StateMachine,State
from std_msgs.msg import Bool
from robot_brain2 import sensory2
from Expo_assignment_1 import architecture_name_mapper as anm
from expo_assignment_1.msg import Point, ControlGoal, PlanGoal
from actionlib import SimpleActionServer
from expo_assignment_1.msg import ControlFeedback, ControlResult
from expo_assignment_1.msg import Point, PlanFeedback, PlanResult
from expo_assignment_1.srv import SetPose,GetPoseResponse, SetPoseResponse
from expo_assignment_1.srv import GetPose

import expo_assignment_1 

LOOP_SLEEP_TIME = 0.5

LOG_TAG = anm.NODE_SURVILANCE_BRAIN


[docs]class ControllingAction: def __init__(self): self._as = SimpleActionServer(anm.ACTION_CONTROLLER, expo_assignment_1.msg.ControlAction, execute_cb=self.execute_callback, auto_start=False) self._as.start() self.log_message(f'{anm.ACTION_CONTROLLER} Action Server initialized. It will navigate through the plan with a delay between each via point spanning in [{self.get_random_motion_time()[0]}, {self.get_random_motion_time()[1]}).')
[docs] def execute_callback(self, goal): if not self.check_goal(goal): return feedback = ControlFeedback() self.log_message('Server is controlling...') self.navigate_through_points(goal.via_points, feedback) self.set_succeeded(feedback)
[docs] def check_goal(self, goal): if not (goal and goal.via_points): self.log_message('No via points provided! This service will be aborted!') self._as.set_aborted() return False return True
[docs] def navigate_through_points(self, points, feedback): for point in points: if self._as.is_preempt_requested(): self.log_message('Service has been canceled by the client!') self._as.set_preempted() return delay = self.get_random_motion_time()[0] + random.random() * (self.get_random_motion_time()[1] - self.get_random_motion_time()[0]) rospy.sleep(delay) feedback.reached_point = point self._as.publish_feedback(feedback) self.log_message(f'Reaching point ({point.x}, {point.y}).') self.set_current_robot_position(point) self.log_message('All points navigated.')
[docs] def set_current_robot_position(self, point): self.log_message(f'Set current robot position to the `{anm.SERVER_SET_POSE}` node.') try: service = rospy.ServiceProxy(anm.SERVER_SET_POSE, SetPose) service(point) except rospy.ServiceException as e: self.log_message(f'Server cannot set current robot position: {e}')
[docs] def set_succeeded(self, feedback): result = ControlResult() result.reached_point = feedback.reached_point self.log_message('Motion control successes.') self._as.set_succeeded(result)
[docs] def get_random_motion_time(self): return rospy.get_param(anm.PARAM_CONTROLLER_TIME, [0.1, 2.0])
[docs] def log_message(self, message): rospy.loginfo(str(anm.tag_log(message, LOG_TAG)))
[docs]class PlanningAction(object): """ A class that creates a motion plan with a number of points spanning in a given range and each point generated with a delay spanning in another given range. The plan is executed from a start point to a target point within a given environment size. :ivar _random_plan_points: A list of two integers indicating the minimum and maximum number of points in the motion plan. :vartype _random_plan_points: list[int] :ivar _random_plan_time: A list of two floats indicating the minimum and maximum delay time between each point generation in the motion plan. :vartype _random_plan_time: list[float] :ivar _environment_size: A list of two floats indicating the maximum x and y values of the environment where the motion plan is executed. :vartype _environment_size: list[float] :ivar _as: A SimpleActionServer object that handles the motion planning action. :vartype _as: SimpleActionServer """ def __init__(self): self._random_plan_points = rospy.get_param(anm.PARAM_PLANNER_POINTS, [2, 8]) self._random_plan_time = rospy.get_param(anm.PARAM_PLANNER_TIME, [0.1, 1]) self._environment_size = rospy.get_param(anm.PARAM_ENVIRONMENT_SIZE) self._as = SimpleActionServer(anm.ACTION_PLANNER, expo_assignment_1.msg.PlanAction,execute_cb=self.execute_callback, auto_start=False) self._as.start() log_msg = (f'`{anm.ACTION_PLANNER}` Action Server initialised. It will create random path with a number of point ' f'spanning in [{self._random_plan_points[0]}, {self._random_plan_points[1]}). Each point will be generated ' f'with a delay spanning in [{self._random_plan_time[0]}, {self._random_plan_time[1]}).')
[docs] def execute_callback(self, goal): start_point = _get_pose_client() target_point = goal.target if start_point is None or target_point is None: rospy.logerr(anm.tag_log('Cannot have `None` start point nor target_point. This service will be aborted!.', LOG_TAG)) self._as.set_aborted() return if not(self._is_valid(start_point) and self._is_valid(target_point)): rospy.logerr(anm.tag_log(f'Start point ({start_point.x}, {start_point.y}) or target point ({target_point.x}, \ {target_point.y}) point out of the environment. This service will be aborted!.', LOG_TAG)) self._as.set_aborted() return feedback = PlanFeedback(via_points=[start_point]) self._as.publish_feedback(feedback) for i in range(random.randint(self._random_plan_points[0], self._random_plan_points[1] + 1)): if self._as.is_preempt_requested(): rospy.loginfo(anm.tag_log('Server has been cancelled by the client!', LOG_TAG)) self._as.set_preempted() return new_point = Point() new_point.x = random.uniform(0, self._environment_size[0]) new_point.y = random.uniform(0, self._environment_size[1]) feedback.via_points.append(new_point) if i < self._random_plan_points[1]: self._as.publish_feedback(feedback) rospy.sleep(random.uniform(self._random_plan_time[0], self._random_plan_time[1])) else: feedback.via_points.append(target_point) result = PlanResult(via_points=feedback.via_points) self._as.set_succeeded(result) log_msg = 'Motion plan succeeded with plan: ' log_msg += ''.join('(' + str(point.x) + ', ' + str(point.y) + '), ' for point in result.via_points) rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
def _is_valid(self, point): return 0.0 <= point.x <= self._environment_size[0] and 0.0 <= point.y <= self._environment_size[1]
def _get_pose_client(): service = rospy.ServiceProxy(anm.SERVER_GET_POSE, GetPose) response = service() pose = response.pose rospy.loginfo(anm.tag_log(f'Retrieving current robot position from the `{anm.NODE_ROBOTS_CONDITION}` node as: ({pose.x}, {pose.y}).', LOG_TAG)) return pose
[docs]class RobotState: """ This class represents the state of the robot in terms of its position and battery status. """ def __init__(self): """ Constructor for the `RobotState` class. Initializes the `_pose` attribute to `None`, `_battery_low` attribute to `False`, and `_randomness` attribute to the value of `anm.PARAM_RANDOM_ACTIVE` parameter. If `_randomness` is `True`, it initializes `_random_battery_time` attribute to the value of `anm.PARAM_BATTERY_TIME` parameter. Then, it creates two services: `anm.SERVER_GET_POSE` and `anm.SERVER_SET_POSE`, which can be used to get and set the robot's position, respectively. It also starts a thread that continuously checks if the battery level is low or not and publishes the battery status to the `anm.TOPIC_BATTERY_LOW` topic. Args: None Returns: None """ self._pose, self._battery_low, self._randomness = None, False, rospy.get_param(anm.PARAM_RANDOM_ACTIVE, True) if self._randomness: self._random_battery_time = rospy.get_param(anm.PARAM_BATTERY_TIME, [15.0, 60.0]) rospy.Service(anm.SERVER_GET_POSE, GetPose, self.get_pose) rospy.Service(anm.SERVER_SET_POSE, SetPose, self.set_pose) th = threading.Thread(target=self.is_battery_low_) th.start() log_msg = (f'Initialise node `{anm.NODE_ROBOTS_CONDITION}` with services `{anm.SERVER_GET_POSE}` and ' f'`{anm.SERVER_SET_POSE}`, and topic {anm.TOPIC_BATTERY_LOW}.') rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
[docs] def set_pose(self, request): """ Service callback method for setting the robot's position. Args: request: An object of `SetPoseRequest` type, which contains the new pose for the robot. Returns: A response object of `SetPoseResponse` type. """ self._pose = request.pose if request.pose is not None else rospy.logerr(anm.tag_log('Cannot set an unspecified robot position', LOG_TAG)) return SetPoseResponse()
[docs] def get_pose(self, request): """ Service callback method for getting the robot's current position. Args: request: An object of `GetPoseRequest` type, which contains no arguments. Returns: A response object of `GetPoseResponse` type, which contains the current pose of the robot. """ if self._pose is None: rospy.logerr(anm.tag_log('Cannot get an unspecified robot position', LOG_TAG)) response = GetPoseResponse() response.pose = self._pose return response
[docs] def is_battery_low_(self): """ A helper method that continuously checks if the battery level is low or not and publishes the battery status to the `anm.TOPIC_BATTERY_LOW` topic. Args: None Returns: None """ publisher = rospy.Publisher(anm.TOPIC_BATTERY_LOW, Bool, queue_size=1, latch=True) self.battery_notifier_(publisher) if self._randomness else None
[docs] def battery_notifier_(self, publisher): """ A helper method that publishes the current battery status to the `publisher` object. Args: publisher: A `rospy.Publisher` object that is used to publish the battery status. Returns: None """ delay = 0 while not rospy.is_shutdown(): publisher.publish(Bool(self._battery_low)) if self._battery_low: print("Robot battery is low after",delay,"seconds") else: print("Robot battery is full",delay,"seconds") delay = random.uniform(self._random_battery_time[0], self._random_battery_time[1]) if self._randomness else None rospy.sleep(delay) self._battery_low = not self._battery_low
[docs]class ChooseCorridor(smach.State): """ A state that chooses a random point in the environment and sends a planning goal to the planner action server to find a path to that point through via-points. Parameters: interface_cortex (CortexInterface): The interface with the other nodes of the architecture. ontology_cortex (CortexOntology): The ontology interface with knowledge of the environment and robot's current position. Outputs: robot_position (string): The robot's current position in the environment. target (string): The target location for the robot to navigate to. random_plan (list of Points): The via-points planned by the planner to reach the target location. Outcome: 'Robot_recharging': The robot's battery is low and it needs to go recharge. 'corridor_chosen': The planner has finished computing a plan through via-points to reach the target location. 'urgentroom_chosen': The target location is a room that requires an urgent transition. """ def __init__(self, interface_cortex, ontology_cortex): """ Initializes the ChooseCorridor state. Args: interface_cortex (CortexInterface): The interface with the other nodes of the architecture. ontology_cortex (CortexOntology): The ontology interface with knowledge of the environment and robot's current position. """ self._cortex = interface_cortex self._ontology = ontology_cortex self.environment_size = rospy.get_param('config/environment_size') smach.State.__init__(self, outcomes=['Robot_recharging', 'corridor_chosen', 'urgentroom_chosen'], output_keys=['robot_position', 'target', 'random_plan'])
[docs] def execute(self, userdata): """ Executes the ChooseCorridor state. Args: userdata (UserData): The data to be passed between states. Returns: string: The outcome of the state. """ # Define a random point to be reached through some via-points to be planned. goal = PlanGoal() goal.target = Point(x=random.uniform(0, self.environment_size[0]), y=random.uniform(0, self.environment_size[1])) robot_position, target = self._ontology.choose_target() userdata.robot_position = robot_position userdata.target = target # Invoke the planner action server. self._cortex.planner_client.send_goal(goal) while not rospy.is_shutdown(): # Acquire the mutex to assure data consistencies with the ROS subscription threads managed by `self._cortex`. self._cortex.mutex.acquire() try: # If the battery is low, then cancel the control action server and take the `battery_low` transition. if self._cortex.is_battery_low(): # Higher priority self._cortex.planner_client.cancel_goals() self._ontology.go_to_recharge(robot_position) return 'Robot_recharging' # If the target is a Rooom then planner cancels goals and transition TRANS_DECIDED_URGENT occurs if target == 'Room-R1' or target == 'Room-R2' or target == 'Room-R3' or target == 'Room-R4': self._cortex.planner_client.cancel_goals() return 'urgentroom_chosen' # If the planner finishes its computation, then take the TRANS_DECIDED_CORRIDOR transition if self._cortex.planner_client.is_done(): userdata.random_plan = self._cortex.planner_client.get_results().via_points return 'corridor_chosen' finally: # Release the mutex to unblock the `self._cortex` subscription threads if they are waiting. self._cortex.mutex.release() # Wait for a reasonably small amount of time to allow `self._cortex` processing stimulus (eventually). rospy.sleep(LOOP_SLEEP_TIME)
[docs]class emergency(smach.State): """ This state handles emergency situations in the robot navigation system. It generates a random goal to be reached through some via-points planned by an action server. If the battery is low or the target is a corridor, the planner goal is cancelled and the appropriate transition is taken. When the controller finishes its computation, the 'urgentroom_chosen' transition is taken. Required parameters: - interface_cortex: an object that provides interfaces with other nodes of the architecture - ontology_cortex: an object that provides the robot's knowledge of the environment Optional parameters: - None Input keys: - None Output keys: - robot_position: the robot's current position - target: the target to be reached - random_plan: the planned via-points to reach the random goal Outcomes: - 'Robot_recharging': if the battery is low and the robot needs to go to the charging station - 'urgentroom_chosen': when the controller finishes its computation - 'corridor_chosen': if the target is a corridor and the robot needs to go there """ def __init__(self, interface_cortex, ontology_cortex): """ Constructor for the emergency state. @param interface_cortex: an object that provides interfaces with other nodes of the architecture @type interface_cortex: object @param ontology_cortex: an object that provides the robot's knowledge of the environment @type ontology_cortex: object @return: None """ super(emergency, self).__init__( outcomes=['Robot_recharging', 'urgentroom_chosen', 'corridor_chosen'], output_keys=['robot_position', 'target', 'random_plan'] ) # Get a reference to the interfaces with the other nodes of the architecture. self._cortex = interface_cortex self._ontology = ontology_cortex # Get the environment size from ROS parameters. self.environment_size = rospy.get_param('config/environment_size')
[docs] def execute(self, userdata): """ Execute function for the emergency state. @param userdata: data input/output that serves as a communication channel between states @type userdata: UserData @return: the outcome of the state machine (one of the defined outcomes) """ # Define a random point to be reached through some via-points to be planned. goal = PlanGoal() goal.target = Point(x=random.uniform(0, self.environment_size[0]), y=random.uniform(0, self.environment_size[1])) robot_position, target = self._ontology.choose_target() userdata.robot_position = robot_position userdata.target = target # Invoke the planner action server. self._cortex.planner_client.send_goal(goal) while not rospy.is_shutdown(): # Acquire the mutex to assure data consistencies with the ROS subscription threads managed by `self._cortex`. self._cortex.mutex.acquire() try: # If the battery is low, then cancel the control action server and take the `battery_low` transition. if self._cortex.is_battery_low(): # Higher priority self._cortex.planner_client.cancel_goals() self._ontology.go_to_recharge(robot_position) return 'Robot_recharging' #if the target is a corridor then planner cancels goal and transition TRANS_DECIDED_CORRIDOR occurs if target=='Room-C1' or target=='Room-C2' or target=='Room-E': self._cortex.planner_client.cancel_goals() return 'corridor_chosen' # If the controller finishes its computation, then take the `went_random_pose` transition, which is related to the `repeat` transition. if self._cortex.planner_client.is_done(): userdata.random_plan = self._cortex.planner_client.get_results().via_points return 'urgentroom_chosen' finally: # Release the mutex to unblock the `self._cortex` subscription threads if they are waiting. self._cortex.mutex.release() # Wait for a reasonably small amount of time to allow `self._cortex` processing stimulus (eventually). rospy.sleep(LOOP_SLEEP_TIME)
[docs]class emergencylocation(smach.State): """ This class defines a state in a SMACH state machine that sends a robot to a target location using a given plan. If the robot's battery is low, the state will cancel the plan and send the robot to recharge. :param interface_cortex: An instance of the interface_cortex class used to communicate with the robot's control system. :type interface_cortex: interface_cortex :param ontology_cortex: An instance of the ontology_cortex class used to interact with the robot's knowledge base. :type ontology_cortex: ontology_cortex :return: The outcome of the state. Either 'Robot_recharging' if the robot needs to recharge, or 'robot_moved_urgent' if the robot reaches the target. :raises: Any exceptions thrown by the underlying code. """ def __init__(self, interface_cortex, ontology_cortex): """ Initialize the state. :param interface_cortex: An instance of the interface_cortex class used to communicate with the robot's control system. :type interface_cortex: interface_cortex :param ontology_cortex: An instance of the ontology_cortex class used to interact with the robot's knowledge base. :type ontology_cortex: ontology_cortex """ super(emergencylocation, self).__init__( outcomes=['Robot_recharging', 'robot_moved_urgent'], input_keys=['random_plan', 'robot_position', 'target'], output_keys=['crobot_position'] ) self.interface_cortex = interface_cortex self.ontology_cortex = ontology_cortex
[docs] def execute(self, userdata): """ Execute the state. :param userdata: The user data provided to the state. :type userdata: smach.UserData :return: The outcome of the state. Either 'Robot_recharging' if the robot needs to recharge, or 'robot_moved_urgent' if the robot reaches the target. """ plan = userdata.random_plan goal = ControlGoal(via_points=plan) self.interface_cortex.controller_client.send_goal(goal) robot_position = userdata.robot_position target = userdata.target while not rospy.is_shutdown(): with self.interface_cortex.mutex: if self.interface_cortex.is_battery_low(): self.interface_cortex.controller_client.cancel_goals() self.ontology_cortex.go_to_recharge(robot_position) return 'Robot_recharging' if self.interface_cortex.controller_client.is_done(): self.ontology_cortex.move_location(target, robot_position) rospy.sleep(anm.MONITOR_TIME) return 'robot_moved_urgent' rospy.sleep(LOOP_SLEEP_TIME)
[docs]class Recharging(State): """ This class defines a state in a SMACH state machine that sends a robot to recharge if its battery is low. :param interface_cortex: An instance of the interface_cortex class used to communicate with the robot's control system. :type interface_cortex: interface_cortex :param ontology_cortex: An instance of the ontology_cortex class used to interact with the robot's knowledge base. :type ontology_cortex: ontology_cortex :return: The outcome of the state, which is always 'recharged'. :raises: Any exceptions thrown by the underlying code. """ def __init__(self, interface_cortex, ontology_cortex): """ Initialize the state. :param interface_cortex: An instance of the interface_cortex class used to communicate with the robot's control system. :type interface_cortex: interface_cortex :param ontology_cortex: An instance of the ontology_cortex class used to interact with the robot's knowledge base. :type ontology_cortex: ontology_cortex """ super().__init__(outcomes=['recharged']) self._cortex = interface_cortex self._ontology = ontology_cortex
[docs] def execute(self, userdata): """ Execute the state. :param userdata: The user data provided to the state. :type userdata: smach.UserData :return: The outcome of the state, which is always 'recharged'. """ while not rospy.is_shutdown(): self._cortex.mutex.acquire() try: if not self._cortex.is_battery_low(): self._cortex.reset_states() return 'recharged' finally: self._cortex.mutex.release() rospy.sleep(LOOP_SLEEP_TIME)
if __name__ == '__main__': rospy.init_node(anm.NODE_SURVILANCE_BRAIN, log_level=rospy.INFO) server = ControllingAction() server = PlanningAction() # Instantiate the node manager class RobotState() # Define the cortex cortex = sensory2() # Initialize robot position robot_pose_param = rospy.get_param(anm.PARAM_INITIAL_POSE, [0, 0]) cortex.init_robot_pose(Point(x=robot_pose_param[0], y=robot_pose_param[1])) rospy.spin()