ROS/ROS2FoxyUbuntu20.04RPi4B

LED Control on RPi4B using ROS2 Topic

leohycho 2022. 5. 10. 12:28

my_pub.py

# !/usr/bin/env/ python3

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import String


class LedPublisher(Node):

    def __init__(self):
        super().__init__('led_pub_node')
        qos_profile = QoSProfile(depth=10) # queue_size = 10
        self.publisher_led = self.create_publisher(String, 'topic_led', qos_profile)
        timer_period = 3.0  # seconds
        self.timer = self.create_timer(timer_period, self.publisher_led_callback)
        
    def publisher_led_callback(self):
        msg = String()
        msg.data = "led_on"
        self.publisher_led.publish(msg)
        clock_time_pub = self.get_clock().now().to_msg().sec
        self.get_logger().info(f'{msg.data} message is published at {clock_time_pub}')        

def main(args=None):
    rclpy.init(args=args)

    led_publisher = LedPublisher()
    
    try:
        rclpy.spin(led_publisher)
    except KeyboardInterrupt:
        led_publisher.get_logger().info('keyboard Interrupt')
    finally:        
        led_publisher.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

 

my_sub.py

# !/usr/bin/env/ python3

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import String

import RPi.GPIO as GPIO
import time

# setup for RPi4B
LED26 = 26

GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(LED26, GPIO.OUT)


class LedSubscriber(Node):

    def __init__(self):
        super().__init__('led_sub_node')        
        qos_profile = QoSProfile(depth = 10) # queue_size = 10
        self.subscription_led = self.create_subscription(
            String, 'topic_led', self.sub_callback, qos_profile
        )
        self.subscription_led  # prevent unused variable warning

    def sub_callback(self, msg):
        if msg.data == 'led_on':
            GPIO.output(LED26, GPIO.HIGH)
            time.sleep(1.0)
            GPIO.output(LED26, GPIO.LOW)
            clock_time_sub = self.get_clock().now().to_msg().sec
            self.get_logger().info(f'{msg.data} message is subscribed at {clock_time_sub}')                    
        
def main(args=None):
    rclpy.init(args=args)

    led_subscriber = LedSubscriber()

    try:
        rclpy.spin(led_subscriber)
    except KeyboardInterrupt:
        led_subscriber.get_logger().info('keyboard Interrupt')
    finally:
        led_subscriber.destroy_node()
        rclpy.shutdown()
        GPIO.cleanup()

if __name__ == '__main__':
    main()

 

setup.py

import os
from glob import glob

from setuptools import setup

package_name = 'my_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name), glob('launch/*.launch.py')),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='ubuntu',
    maintainer_email='ubuntu@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'led_pub_node = my_pkg.my_pub:main',
            'led_sub_node = my_pkg.my_sub:main',
        ],
    },
)

 

colcon build --symlink-install --package-select my_pkg

ros2 run my_pkg led_pub_node

ros2 run my_pkg led_sub_node