my_pub.py
# !/usr/bin/env/ python3 import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile from std_msgs.msg import String class LedPublisher(Node): def __init__(self): super().__init__('led_pub_node') qos_profile = QoSProfile(depth=10) # queue_size = 10 self.publisher_led = self.create_publisher(String, 'topic_led', qos_profile) timer_period = 3.0 # seconds self.timer = self.create_timer(timer_period, self.publisher_led_callback) def publisher_led_callback(self): msg = String() msg.data = "led_on" self.publisher_led.publish(msg) clock_time_pub = self.get_clock().now().to_msg().sec self.get_logger().info(f'{msg.data} message is published at {clock_time_pub}') def main(args=None): rclpy.init(args=args) led_publisher = LedPublisher() try: rclpy.spin(led_publisher) except KeyboardInterrupt: led_publisher.get_logger().info('keyboard Interrupt') finally: led_publisher.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() |
my_sub.py
# !/usr/bin/env/ python3 import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile from std_msgs.msg import String import RPi.GPIO as GPIO import time # setup for RPi4B LED26 = 26 GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.setup(LED26, GPIO.OUT) class LedSubscriber(Node): def __init__(self): super().__init__('led_sub_node') qos_profile = QoSProfile(depth = 10) # queue_size = 10 self.subscription_led = self.create_subscription( String, 'topic_led', self.sub_callback, qos_profile ) self.subscription_led # prevent unused variable warning def sub_callback(self, msg): if msg.data == 'led_on': GPIO.output(LED26, GPIO.HIGH) time.sleep(1.0) GPIO.output(LED26, GPIO.LOW) clock_time_sub = self.get_clock().now().to_msg().sec self.get_logger().info(f'{msg.data} message is subscribed at {clock_time_sub}') def main(args=None): rclpy.init(args=args) led_subscriber = LedSubscriber() try: rclpy.spin(led_subscriber) except KeyboardInterrupt: led_subscriber.get_logger().info('keyboard Interrupt') finally: led_subscriber.destroy_node() rclpy.shutdown() GPIO.cleanup() if __name__ == '__main__': main() |
setup.py
import os from glob import glob from setuptools import setup package_name = 'my_pkg' setup( name=package_name, version='0.0.0', packages=[package_name], data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), (os.path.join('share', package_name), glob('launch/*.launch.py')), ], install_requires=['setuptools'], zip_safe=True, maintainer='ubuntu', maintainer_email='ubuntu@todo.todo', description='TODO: Package description', license='TODO: License declaration', tests_require=['pytest'], entry_points={ 'console_scripts': [ 'led_pub_node = my_pkg.my_pub:main', 'led_sub_node = my_pkg.my_sub:main', ], }, ) |
colcon build --symlink-install --package-select my_pkg
ros2 run my_pkg led_pub_node
ros2 run my_pkg led_sub_node
'ROS > ROS2FoxyUbuntu20.04RPi4B' 카테고리의 다른 글
Motor Drive Control on RPi4B using ROS2 Service (0) | 2022.05.12 |
---|---|
Motor Drive Control on RPi4B using ROS2 Topic (0) | 2022.05.12 |
LED Control on RPi4B using ROS2 Action (0) | 2022.05.11 |
LED Control on RPi4B using ROS2 Service (0) | 2022.05.11 |
Ubuntu 20.04 on RPi4B (0) | 2022.05.04 |