ROS/ROS2FoxyUbuntu20.04RPi4B

Motor Drive Control on RPi4B using ROS2 Service

leohycho 2022. 5. 12. 17:55

my_client_motor354.py :

# !/usr/bin/env/ python3

import rclpy
from rclpy.node import Node

from my_interfaces.srv import SetMotor

class ClientMotor354(Node):

    def __init__(self):
        super().__init__('client_node_motor354')        
        self._client = self.create_client(SetMotor, 'service_motor354')
        while not self._client.wait_for_service(timeout_sec=0.1):
            self.get_logger().info('The service is not available, Waiting again...')
        self.request =SetMotor.Request()     
        
    def send_request(self):        
        self.request.speed = 50 # %
        self.request.start = "on"
        self.future = self._client.call_async(self.request)
                

def main(args=None):
    rclpy.init(args=args)

    client_motor354 = ClientMotor354()
    client_motor354.send_request()
    
    while rclpy.ok():
        rclpy.spin_once(client_motor354)
        if client_motor354.future.done():
            try:
                response = client_motor354.future.result()
                client_motor354.get_logger().info(f'result : {response.end}')
            except Exception as e:
                client_motor354.get_logger().info(
                    'Service call failed %r' % (e,)
                )
            break

    client_motor354.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

 

my_service_motor354.py :

# !/usr/bin/env/ python3

import rclpy
from rclpy.node import Node

from my_interfaces.srv import SetMotor

import RPi.GPIO as GPIO
import time

# setup for RPi4B
SW1 = 5
SW2 = 6
SW3 = 13
SW4 = 19

PWMA = 18
AIN1 = 22
AIN2 = 27

PWMB = 23
BIN1 = 25
BIN2 = 24

GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(SW1,GPIO.IN,pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(SW2,GPIO.IN,pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(SW3,GPIO.IN,pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(SW4,GPIO.IN,pull_up_down=GPIO.PUD_DOWN)

GPIO.setup(PWMA,GPIO.OUT)
GPIO.setup(AIN1,GPIO.OUT)
GPIO.setup(AIN2,GPIO.OUT)

GPIO.setup(PWMB,GPIO.OUT)
GPIO.setup(BIN1,GPIO.OUT)
GPIO.setup(BIN2,GPIO.OUT)

L_Motor = GPIO.PWM(PWMA,500)
L_Motor.start(0)

R_Motor = GPIO.PWM(PWMB,500)
R_Motor.start(0)

def motor_forward(speed):
    GPIO.output(AIN1,0)
    GPIO.output(AIN2,1)
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN1,0)
    GPIO.output(BIN2,1)
    R_Motor.ChangeDutyCycle(speed)

def motor_back(speed):
    GPIO.output(AIN1,1)
    GPIO.output(AIN2,0)
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN1,1)
    GPIO.output(BIN2,0)
    R_Motor.ChangeDutyCycle(speed)
    
def motor_left(speed):
    GPIO.output(AIN1,1)
    GPIO.output(AIN2,0)
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN1,0)
    GPIO.output(BIN2,1)
    R_Motor.ChangeDutyCycle(speed)
    
def motor_right(speed):
    GPIO.output(AIN1,0)
    GPIO.output(AIN2,1)
    L_Motor.ChangeDutyCycle(speed)
    GPIO.output(BIN1,1)
    GPIO.output(BIN2,0)
    R_Motor.ChangeDutyCycle(speed)

def motor_stop():
    GPIO.output(AIN1,0)
    GPIO.output(AIN2,1)
    L_Motor.ChangeDutyCycle(0)
    GPIO.output(BIN1,0)
    GPIO.output(BIN2,1)
    R_Motor.ChangeDutyCycle(0)

class ServiceMotor354(Node):
  
    def __init__(self):
        super().__init__('service_node_motor354')            
        self._service = self.create_service(
            SetMotor, 'service_motor354', self.service_motor354_callback
            )

    def service_motor354_callback(self, request, response):
        self.request_speed = request.speed
        self.request_start = request.start

        now = time.localtime()
        datetime_sub = time.strftime('%c', now)
        self.get_logger().info(f'requests are arrived at {datetime_sub}')   

        count = 0   
    
        while (self.request_start == "on"):
            count += 1
            if GPIO.input(SW1) == 1:
                motor_forward(self.request_speed)                
            elif GPIO.input(SW2) == 1:
                motor_right(self.request_speed)                
            elif GPIO.input(SW3) == 1:
                motor_left(self.request_speed)                
            elif GPIO.input(SW4) == 1:
                motor_back(self.request_speed)                
            else:
                motor_stop()                       
            time.sleep(0.1)
            if count == 100:
                break
                
        response.end = "Reqested work is done"                           

        return response    
        
def main(args=None):
    rclpy.init(args=args)

    service_motor354 = ServiceMotor354()

    try:
        rclpy.spin_once(service_motor354)
    except KeyboardInterrupt:
        service_motor354.get_logger().info('keyboard Interrupt')
    finally:
        service_motor354.destroy_node()
        rclpy.shutdown()
        GPIO.cleanup()

if __name__ == '__main__':
    main()

 

my_interfaces/srv/SetMotor.srv :

# Request
int32 speed
string start
---
# Response
string end