my_pub_motor643.py :
# !/usr/bin/env/ python3 import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile from std_msgs.msg import String import time class PublisherMotor643(Node): def __init__(self): super().__init__('pub_node_motor643') qos_profile = QoSProfile(depth=10) # queue_size = 10 self._publisher = self.create_publisher(String, 'topic_motor643', qos_profile) msg = String() msg.data = "on" self._publisher.publish(msg) now = time.localtime() datetime_pub = time.strftime('%c', now) self.get_logger().info(f'The message {msg.data} is published at {datetime_pub}') def main(args=None): rclpy.init(args=args) publisher_motor643 = PublisherMotor643() try: rclpy.spin(publisher_motor643) except KeyboardInterrupt: publisher_motor643.get_logger().info('keyboard Interrupt') finally: publisher_motor643.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() |
my_sub_motor643.py :
# !/usr/bin/env/ python3 import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile from std_msgs.msg import String import RPi.GPIO as GPIO import time import cv2 # setup for RPi4B PWMA = 18 AIN1 = 22 AIN2 = 27 PWMB = 23 BIN1 = 25 BIN2 = 24 def motor_back(speed): L_Motor.ChangeDutyCycle(speed) GPIO.output(AIN2,False)#AIN2 GPIO.output(AIN1,True) #AIN1 R_Motor.ChangeDutyCycle(speed) GPIO.output(BIN2,False)#BIN2 GPIO.output(BIN1,True) #BIN1 def motor_go(speed): L_Motor.ChangeDutyCycle(speed) GPIO.output(AIN2,True)#AIN2 GPIO.output(AIN1,False) #AIN1 R_Motor.ChangeDutyCycle(speed) GPIO.output(BIN2,True)#BIN2 GPIO.output(BIN1,False) #BIN1 def motor_stop(): L_Motor.ChangeDutyCycle(0) GPIO.output(AIN2,False)#AIN2 GPIO.output(AIN1,False) #AIN1 R_Motor.ChangeDutyCycle(0) GPIO.output(BIN2,False)#BIN2 GPIO.output(BIN1,False) #BIN1 def motor_right(speed): L_Motor.ChangeDutyCycle(speed) GPIO.output(AIN2,True)#AIN2 GPIO.output(AIN1,False) #AIN1 R_Motor.ChangeDutyCycle(0) GPIO.output(BIN2,False)#BIN2 GPIO.output(BIN1,True) #BIN1 def motor_left(speed): L_Motor.ChangeDutyCycle(0) GPIO.output(AIN2,False)#AIN2 GPIO.output(AIN1,True) #AIN1 R_Motor.ChangeDutyCycle(speed) GPIO.output(BIN2,True)#BIN2 GPIO.output(BIN1,False) #BIN1 GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.setup(AIN2,GPIO.OUT) GPIO.setup(AIN1,GPIO.OUT) GPIO.setup(PWMA,GPIO.OUT) GPIO.setup(BIN1,GPIO.OUT) GPIO.setup(BIN2,GPIO.OUT) GPIO.setup(PWMB,GPIO.OUT) L_Motor= GPIO.PWM(PWMA,100) L_Motor.start(0) R_Motor = GPIO.PWM(PWMB,100) R_Motor.start(0) speedSet = 30 # Speed 50 -> 40 -> 30 because it is hard to drive class SubscriberMotor643(Node): def __init__(self): super().__init__('sub_node_motor643') qos_profile = QoSProfile(depth = 10) # queue_size = 10 self._subscription = self.create_subscription( String, 'topic_motor643', self.sub_callback, qos_profile ) def sub_callback(self, msg): now = time.localtime() datetime_sub = time.strftime('%c', now) self.get_logger().info(f'The message {msg.data} is subscribed at {datetime_sub}') if msg.data == 'on': camera = cv2.VideoCapture(-1) camera.set(3, 640) camera.set(4, 480) # filepath = "/home/ubuntu/PPiCar/video/train" # i = 0 # carState = "stop" while( camera.isOpened() ): keyValue = cv2.waitKey(10) if keyValue == ord('q'): break elif keyValue == 82: # print("go") carState = "go" motor_go(speedSet) elif keyValue == 84: # print("stop") carState = "stop" motor_stop() elif keyValue == 81: # print("left") carState = "left" motor_left(speedSet) elif keyValue == 83: # print("right") carState = "right" motor_right(speedSet) _, image = camera.read() image = cv2.flip(image,-1) cv2.imshow('Original', image) cv2.imwrite("/home/ubuntu/RPiCar/test.png", image) # height, _, _ = image.shape # save_image = image[int(height/2):,:,:] # save_image = cv2.cvtColor(save_image, cv2.COLOR_BGR2YUV) # save_image = cv2.GaussianBlur(save_image, (3,3), 0) # save_image = cv2.resize(save_image, (200,66)) # cv2.imshow('Save', save_image) # if carState == "left": # cv2.imwrite("%s_%05d_%03d.png" % (filepath, i, 45), save_image) # i += 1 # elif carState == "right": # cv2.imwrite("%s_%05d_%03d.png" % (filepath, i, 135), save_image) # i += 1 # elif carState == "go": # cv2.imwrite("%s_%05d_%03d.png" % (filepath, i, 90), save_image) # i += 1 cv2.destroyAllWindows() def main(args=None): rclpy.init(args=args) subscriber_motor643 = SubscriberMotor643() try: rclpy.spin(subscriber_motor643) except KeyboardInterrupt: subscriber_motor643.get_logger().info('keyboard Interrupt') finally: subscriber_motor643.destroy_node() rclpy.shutdown() GPIO.cleanup() if __name__ == '__main__': main() |
pip3 install opencv-python
pip3 install numpy==1.20.2
and install library needed for OpenCV
'ROS > ROS2FoxyUbuntu20.04RPi4B' 카테고리의 다른 글
LED Control on RPi4B using ROS2 Topic by C++ (0) | 2022.05.25 |
---|---|
Motor Drive Control on RPi4B using ROS2 Service (0) | 2022.05.12 |
Motor Drive Control on RPi4B using ROS2 Topic (0) | 2022.05.12 |
LED Control on RPi4B using ROS2 Action (0) | 2022.05.11 |
LED Control on RPi4B using ROS2 Service (0) | 2022.05.11 |