ROS/ROS2FoxyUbuntu20.04RPi4B

LED Control on RPi4B using ROS2 Action

leohycho 2022. 5. 11. 18:02

~/my_interfaces/action/ActionSetLed.action :

# Goal Service Request
int32 data
---
# Result Service Request
string result
---
# Feedback Topic
bool feedback

my_action_client.py :

# !/usr/bin/env/ python3

import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient

from my_interfaces.action import ActionSetLed

class ActionClientLed(Node):

    def __init__(self):
        super().__init__('action_client_node_led')        
        self.action_client_led = ActionClient(self, ActionSetLed, 'action_led')             
        
    def send_goal(self, data):
        goal_msg = ActionSetLed.Goal()
        goal_msg.data = data

        self.action_client_led.wait_for_server()
        self._send_goal_future = self.action_client_led.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected')
            return
        self.get_logger().info('Goal accepted')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)
    
    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.result))
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.feedback))      


def main(args=None):
    rclpy.init(args=args)

    action_client_led = ActionClientLed()
    action_client_led.send_goal(5)
    rclpy.spin(action_client_led)
    
    # action_client_led.destroy_node()
    # rclpy.shutdown()

if __name__ == '__main__':
    main()

my_action_server.py :

# !/usr/bin/env/ python3

import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from rclpy.qos import QoSProfile

from my_interfaces.action import ActionSetLed

import RPi.GPIO as GPIO
import time

# setup for RPi4B
LED26 = 26

GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(LED26, GPIO.OUT)


class ActionServerLed(Node):

    def __init__(self):
        super().__init__('action_server_node_led')            
        self.action_server_led = ActionServer(
            self, ActionSetLed, 'action_led', self.execute_callback
            )

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal ...')

        feedback_msg = ActionSetLed.Feedback()
        feedback_msg.feedback = True   

        for i in range(0, goal_handle.request.data):
            GPIO.output(LED26, GPIO.HIGH)
            time.sleep(1.0)
            GPIO.output(LED26, GPIO.LOW)
            time.sleep(2.0)

            self.get_logger().info('Feedback: {0}'.format(feedback_msg.feedback))
            goal_handle.publish_feedback(feedback_msg)

        goal_handle.succeed()
        
        result = ActionSetLed.Result()
        result.result = "Succeed"

        return result
                
        
def main(args=None):
    rclpy.init(args=args)

    action_server_led = ActionServerLed()

    try:
        rclpy.spin(action_server_led)
    except KeyboardInterrupt:
        action_server_led.get_logger().info('keyboard Interrupt')
    finally:
        action_server_led.destroy_node()
        rclpy.shutdown()
        GPIO.cleanup()

if __name__ == '__main__':
    main()

setup.py :

import os
from glob import glob

from setuptools import setup

package_name = 'my_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name), glob('launch/*.launch.py')),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='ubuntu',
    maintainer_email='ubuntu@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'led_pub_node = my_pkg.my_pub:main',
            'led_sub_node = my_pkg.my_sub:main',
            'client_node_led = my_pkg.my_client:main',
            'service_node_led = my_pkg.my_service:main',
            'action_client_node_led = my_pkg.my_action_client:main',
            'action_server_node_led = my_pkg.my_action_server:main',
        ],
    },
)

package.xml :

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>my_pkg</name>
  <version>0.0.0</version>
  <description>TODO: Package description</description>
  <maintainer email="ubuntu@todo.todo">ubuntu</maintainer>
  <license>TODO: License declaration</license>

  <depend>rclpy</depend>
  <exec_depend>ros2launch</exec_depend>
  <exec_depend>my_interfaces</exec_depend>

  <test_depend>ament_copyright</test_depend>
  <test_depend>ament_flake8</test_depend>
  <test_depend>ament_pep257</test_depend>
  <test_depend>python3-pytest</test_depend>

  <export>
    <build_type>ament_python</build_type>
  </export>
</package>

 

 

[Reference]

[1] https://docs.ros.org/en/foxy/Tutorials/Actions/Writing-a-Py-Action-Server-Client.html