~/my_interfaces/action/ActionSetLed.action :
# Goal Service Request int32 data --- # Result Service Request string result --- # Feedback Topic bool feedback |
my_action_client.py :
# !/usr/bin/env/ python3 import rclpy from rclpy.node import Node from rclpy.action import ActionClient from my_interfaces.action import ActionSetLed class ActionClientLed(Node): def __init__(self): super().__init__('action_client_node_led') self.action_client_led = ActionClient(self, ActionSetLed, 'action_led') def send_goal(self, data): goal_msg = ActionSetLed.Goal() goal_msg.data = data self.action_client_led.wait_for_server() self._send_goal_future = self.action_client_led.send_goal_async(goal_msg, feedback_callback=self.feedback_callback) self._send_goal_future.add_done_callback(self.goal_response_callback) def goal_response_callback(self, future): goal_handle = future.result() if not goal_handle.accepted: self.get_logger().info('Goal rejected') return self.get_logger().info('Goal accepted') self._get_result_future = goal_handle.get_result_async() self._get_result_future.add_done_callback(self.get_result_callback) def get_result_callback(self, future): result = future.result().result self.get_logger().info('Result: {0}'.format(result.result)) rclpy.shutdown() def feedback_callback(self, feedback_msg): feedback = feedback_msg.feedback self.get_logger().info('Received feedback: {0}'.format(feedback.feedback)) def main(args=None): rclpy.init(args=args) action_client_led = ActionClientLed() action_client_led.send_goal(5) rclpy.spin(action_client_led) # action_client_led.destroy_node() # rclpy.shutdown() if __name__ == '__main__': main() |
my_action_server.py :
# !/usr/bin/env/ python3 import rclpy from rclpy.node import Node from rclpy.action import ActionServer from rclpy.qos import QoSProfile from my_interfaces.action import ActionSetLed import RPi.GPIO as GPIO import time # setup for RPi4B LED26 = 26 GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.setup(LED26, GPIO.OUT) class ActionServerLed(Node): def __init__(self): super().__init__('action_server_node_led') self.action_server_led = ActionServer( self, ActionSetLed, 'action_led', self.execute_callback ) def execute_callback(self, goal_handle): self.get_logger().info('Executing goal ...') feedback_msg = ActionSetLed.Feedback() feedback_msg.feedback = True for i in range(0, goal_handle.request.data): GPIO.output(LED26, GPIO.HIGH) time.sleep(1.0) GPIO.output(LED26, GPIO.LOW) time.sleep(2.0) self.get_logger().info('Feedback: {0}'.format(feedback_msg.feedback)) goal_handle.publish_feedback(feedback_msg) goal_handle.succeed() result = ActionSetLed.Result() result.result = "Succeed" return result def main(args=None): rclpy.init(args=args) action_server_led = ActionServerLed() try: rclpy.spin(action_server_led) except KeyboardInterrupt: action_server_led.get_logger().info('keyboard Interrupt') finally: action_server_led.destroy_node() rclpy.shutdown() GPIO.cleanup() if __name__ == '__main__': main() |
setup.py :
import os from glob import glob from setuptools import setup package_name = 'my_pkg' setup( name=package_name, version='0.0.0', packages=[package_name], data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), (os.path.join('share', package_name), glob('launch/*.launch.py')), ], install_requires=['setuptools'], zip_safe=True, maintainer='ubuntu', maintainer_email='ubuntu@todo.todo', description='TODO: Package description', license='TODO: License declaration', tests_require=['pytest'], entry_points={ 'console_scripts': [ 'led_pub_node = my_pkg.my_pub:main', 'led_sub_node = my_pkg.my_sub:main', 'client_node_led = my_pkg.my_client:main', 'service_node_led = my_pkg.my_service:main', 'action_client_node_led = my_pkg.my_action_client:main', 'action_server_node_led = my_pkg.my_action_server:main', ], }, ) |
package.xml :
<?xml version="1.0"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <package format="3"> <name>my_pkg</name> <version>0.0.0</version> <description>TODO: Package description</description> <maintainer email="ubuntu@todo.todo">ubuntu</maintainer> <license>TODO: License declaration</license> <depend>rclpy</depend> <exec_depend>ros2launch</exec_depend> <exec_depend>my_interfaces</exec_depend> <test_depend>ament_copyright</test_depend> <test_depend>ament_flake8</test_depend> <test_depend>ament_pep257</test_depend> <test_depend>python3-pytest</test_depend> <export> <build_type>ament_python</build_type> </export> </package> |
[Reference]
[1] https://docs.ros.org/en/foxy/Tutorials/Actions/Writing-a-Py-Action-Server-Client.html
'ROS > ROS2FoxyUbuntu20.04RPi4B' 카테고리의 다른 글
Motor Drive Control on RPi4B using ROS2 Service (0) | 2022.05.12 |
---|---|
Motor Drive Control on RPi4B using ROS2 Topic (0) | 2022.05.12 |
LED Control on RPi4B using ROS2 Service (0) | 2022.05.11 |
LED Control on RPi4B using ROS2 Topic (0) | 2022.05.10 |
Ubuntu 20.04 on RPi4B (0) | 2022.05.04 |