공부중

[Turtlebot3] 키보드 조작 이동 코드 본문

인공지능...?/터틀봇 프로젝트

[Turtlebot3] 키보드 조작 이동 코드

복습 2024. 7. 28. 19:39
728x90

ROS 2를 사용하여 원격으로 로봇을 제어하기 위해 토픽을 사용하는 방법

 

기존에는 아래의 코드를 사용했지만 키보드를 사용하기 위해서는 루트 권한이 필요하다고 함. 

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import keyboard  # 키보드 라이브러리 임포트

class RobotDriver(Node):
    def __init__(self):
        super().__init__('robot_driver')
        self.publisher = self.create_publisher(Twist, 'cmd_vel', 10)
        self.speed = 0.0
        self.angle = 0.0

    def update(self):
        msg = Twist()
        msg.linear.x = self.speed
        msg.angular.z = self.angle
        self.publisher.publish(msg)

    def check_keyboard(self):
        if keyboard.is_pressed('up'):
            self.speed = 0.5
        elif keyboard.is_pressed('down'):
            self.speed = -0.5
        elif keyboard.is_pressed('left'):
            self.angle = 0.5
        elif keyboard.is_pressed('right'):
            self.angle = -0.5
        elif keyboard.is_pressed('space'):
            self.speed = 0
            self.angle = 0

        self.update()  # 속도나 각도가 변경될 때마다 메시지를 업데이트하고 발행

def main(args=None):
    rclpy.init(args=args)
    node = RobotDriver()
    try:
        while rclpy.ok():
            node.check_keyboard()
            rclpy.spin_once(node, timeout_sec=0.1)  # 실시간 반응성을 위해 짧은 타임아웃 사용
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()




# pip install keyboard

 

ROS 2 Topic을 사용한 원격 제어 키보드 입력을 직접 처리하는 대신, ROS 2의 토픽을 사용하여 원격으로 로봇을 제어할 수 있습니다. 이 방법은 ROS 2 시스템의 분산 능력을 활용하며, 다양한 입력 소스 (웹 인터페이스, GUI, 다른 노드 등)에서 로봇을 제어할 수 있게 해줍니다.

 

따라서 아래의 방법을 시도함. 

 

1. ROS 2 노드 설계

 

필요한 패키지 설치

sudo apt install ros-<distro>-demo-nodes-cpp  # ROS 2 C++ 데모 노드
sudo apt install ros-<distro>-demo-nodes-py   # ROS 2 Python 데모 노드
# 여기서 <distro>는 ROS 2 배포판의 이름 (예: foxy, galactic 등)입니다.

 

나는 현재 foxy를 사용하므로 아래의 코드를 사용한다. 

sudo apt install ros-foxy-demo-nodes-py

 

이미 설치 되어 있었다. 

 

 

2. 코드 설명

클래스 TeleopNode

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import sys
import select
import tty
import termios

class TeleopNode(Node):
    def __init__(self):
        super().__init__('teleop_twist_keyboard')
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
        self.settings = termios.tcgetattr(sys.stdin)
        self.key_map = {
            'w': (1.0, 0.0),
            's': (-1.0, 0.0),
            'a': (0.0, 1.0),
            'd': (0.0, -1.0),
            ' ': (0.0, 0.0)  # Stop
        }

 

1. __init__ 메서드

  • 노드 초기화: super().__init__('teleop_twist_keyboard')를 호출하여 Node 클래스를 초기화하고, 노드 이름을 'teleop_twist_keyboard'로 설정합니다.
  • 퍼블리셔 생성: self.create_publisher(Twist, 'cmd_vel', 10)를 사용하여 cmd_vel 토픽에 메시지를 발행하기 위한 퍼블리셔를 생성합니다. Twist 메시지 타입을 사용하며, QoS(Quality of Service)의 큐 크기를 10으로 설정합니다.
  • 터미널 설정 저장: self.settings = termios.tcgetattr(sys.stdin)을 통해 터미널의 현재 설정을 저장합니다. 이는 나중에 터미널 설정을 원래대로 복원하는 데 사용됩니다.
  • 키 매핑: self.key_map에서 키보드의 각 키('w', 's', 'a', 'd', ' ')에 대한 선형 및 각속도 명령을 설정합니다.

이때 ' '는 스페이스 바이다.

 

self.key_map 딕셔너리에서 각 키는 로봇에게 전달할 Twist 메시지의 선형 및 각속도 값을 포함하고 있습니다. Twist 메시지는 로봇의 속도를 제어하기 위해 사용되며, 다음과 같이 구성됩니다:

  • linear.x: 로봇의 전진/후진 속도 (m/s)
  • angular.z: 로봇의 회전 속도 (rad/s)

각 키에 대한 값은 튜플 (linear.x, angular.z)로 설정됩니다. 여기서:

 

 

    def get_key(self):
        tty.setraw(sys.stdin.fileno())
        rlist, _, _ = select.select([sys.stdin], [], [], 0.1)
        if rlist:
            key = sys.stdin.read(1)
        else:
            key = ''
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.settings)
        return key

 

 

2. get_key 메서드

  • 키 입력 감지: tty.setraw(sys.stdin.fileno())를 호출하여 터미널을 원시 모드로 설정합니다. 이 모드에서는 입력한 키 값이 바로 프로그램으로 전달됩니다.
  • 키 읽기: select.select([sys.stdin], [], [], 0.1)를 사용하여 최대 0.1초 동안 키 입력을 대기합니다. 입력이 감지되면 sys.stdin.read(1)을 호출하여 키 값을 읽습니다.
  • 터미널 설정 복원: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.settings)를 호출하여 터미널 설정을 원래대로 복원합니다.

 

def publish_velocity(self):
    try:
        key = self.get_key()  # 키보드에서 키 입력을 받습니다.
        if key in self.key_map:  # 받은 키가 key_map 딕셔너리에 있는지 확인합니다.
            linear, angular = self.key_map[key]  # 해당 키에 매핑된 선형 및 각속도 값을 가져옵니다.
            twist = Twist()
            twist.linear.x = linear  # 선형 속도 값을 Twist 메시지의 linear.x에 설정합니다.
            twist.angular.z = angular  # 각속도 값을 Twist 메시지의 angular.z에 설정합니다.
            self.publisher_.publish(twist)  # 설정된 Twist 메시지를 cmd_vel 토픽으로 발행합니다.
    except Exception as e:
        self.get_logger().error('Exception caught: ' + str(e))

 

3. publish_velocity 메서드

  • 키 입력 처리: get_key 메서드를 호출하여 키 값을 얻고, 이 키 값이 key_map에 있다면 해당 키에 매핑된 선형 및 각속도를 Twist 메시지로 설정합니다.
  • 메시지 발행: 설정된 Twist 메시지를 cmd_vel 토픽으로 발행합니다.

 

 

    def run(self):
        while rclpy.ok():
            self.publish_velocity()

 

4. run 메서드

  • 메인 루프: rclpy.ok()가 True인 동안 publish_velocity 메서드를 호출하여 반복적으로 키 입력을 체크하고, 입력에 따라 속도 메시지를 발행합니다.

 

def main(args=None):
    rclpy.init(args=args)
    teleop_node = TeleopNode()
    try:
        teleop_node.run()
    except KeyboardInterrupt:
        pass
    finally:
        teleop_node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

main 함수

  • 노드 실행: rclpy.init(args=args)로 ROS 2 파이썬 클라이언트 라이브러리를 초기화하고, TeleopNode 인스턴스를 생성한 후 run 메서드를 호출하여 노드를 실행합니다.
  • 예외 처리: try-except 블록을 사용하여 KeyboardInterrupt (일반적으로 Ctrl+C)가 발생할 경우 프로그램을 깔끔하게 종료합니다.

 

 

 

전체 코드 

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import sys
import select
import tty
import termios

class TeleopNode(Node):
    def __init__(self):
        super().__init__('teleop_twist_keyboard')
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
        self.settings = termios.tcgetattr(sys.stdin)
        self.key_map = {
            'w': (1.0, 0.0),
            's': (-1.0, 0.0),
            'a': (0.0, 1.0),
            'd': (0.0, -1.0),
            ' ': (0.0, 0.0)  # Stop
        }

    def get_key(self):
        tty.setraw(sys.stdin.fileno())
        rlist, _, _ = select.select([sys.stdin], [], [], 0.1)
        if rlist:
            key = sys.stdin.read(1)
        else:
            key = ''
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.settings)
        return key

    def publish_velocity(self):
        try:
            key = self.get_key()
            if key in self.key_map:
                linear, angular = self.key_map[key]
                twist = Twist()
                twist.linear.x = linear
                twist.angular.z = angular
                self.publisher_.publish(twist)
        except Exception as e:
            self.get_logger().error('Exception caught: ' + str(e))

    def run(self):
        while rclpy.ok():
            self.publish_velocity()

def main(args=None):
    rclpy.init(args=args)
    teleop_node = TeleopNode()
    try:
        teleop_node.run()
    except KeyboardInterrupt:
        pass
    finally:
        teleop_node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

 

 

그런데 이 코드는 ctrl + c를 해도 터미널이 멈추지 않는다. 
왜냐하면 tty.setraw 함수로 인해 터미널의 입력 설정이 변경되기 때문이다. 따라서 아래와 같이 문제를 해결한다. 

 

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import sys
import select
import tty
import termios

class TeleopNode(Node):
    def __init__(self):
        super().__init__('teleop_twist_keyboard')
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
        self.settings = termios.tcgetattr(sys.stdin)
        self.key_map = {
            'w': (1.0, 0.0),
            's': (-1.0, 0.0),
            'a': (0.0, 1.0),
            'd': (0.0, -1.0),
            ' ': (0.0, 0.0)  # Stop
        }

    def get_key(self):
        original_settings = termios.tcgetattr(sys.stdin)
        try:
            tty.setraw(sys.stdin.fileno())
            rlist, _, _ = select.select([sys.stdin], [], [], 0.01)  # 시간 감소
            if rlist:
                key = sys.stdin.read(1)
                if key == '\x03':  # Ctrl+C
                    raise KeyboardInterrupt
            else:
                key = ''
        finally:
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, original_settings)
        return key



    def publish_velocity(self):
        try:
            key = self.get_key()
            if key in self.key_map:
                linear, angular = self.key_map[key]
                twist = Twist()
                twist.linear.x = linear
                twist.angular.z = angular
                self.publisher_.publish(twist)
        except Exception as e:
            self.get_logger().error('Exception caught: ' + str(e))

    def run(self):
        try:
            while rclpy.ok():
                self.publish_velocity()
        except KeyboardInterrupt:
            self.get_logger().info('Keyboard interrupt received, shutting down.')
            self.destroy_node()
            rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)
    teleop_node = TeleopNode()
    try:
        teleop_node.run()
    except KeyboardInterrupt:
        pass
    finally:
        teleop_node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

 

 

 

 

 

 

그냥 완벽한 키보드 작동 코드

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import sys
import select
import tty
import termios

class TeleopNode(Node):
    def __init__(self):
        super().__init__('teleop_twist_keyboard')
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
        self.settings = termios.tcgetattr(sys.stdin)
        self.current_linear_speed = 0.0
        self.current_angular_speed = 0.0
        self.max_linear_speed = 2.0  # 최대 선형 속도
        self.min_linear_speed = -2.0  # 최소 선형 속도
        self.speed_increment = 0.1  # 속도 증가량
        self.angular_increment = 0.1  # 각속도 증가량
        self.key_map = {
            'w': (self.speed_increment, 0.0),
            's': (-self.speed_increment, 0.0),
            'a': (0.0, self.angular_increment),
            'd': (0.0, -self.angular_increment),
            ' ': (0.0, 0.0)  # Stop
        }

    def get_key(self):
        original_settings = termios.tcgetattr(sys.stdin)
        try:
            tty.setraw(sys.stdin.fileno())
            rlist, _, _ = select.select([sys.stdin], [], [], 0.01)
            if rlist:
                key = sys.stdin.read(1)
                if key == '\x03':  # Ctrl+C
                    raise KeyboardInterrupt
            else:
                key = ''
        finally:
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, original_settings)
        return key
    
    def publish_velocity(self):
        try:
            key = self.get_key()
            if key in self.key_map:
                linear_change, angular_change = self.key_map[key]
                
                if key == ' ':
                    # 스페이스바를 눌렀을 때 속도 및 각속도를 0.0으로 설정
                    self.current_linear_speed = 0.0
                    self.current_angular_speed = 0.0
                else:
                    # 새로운 속도 계산
                    new_linear_speed = self.current_linear_speed + linear_change
                    new_angular_speed = self.current_angular_speed + angular_change

                    # 최대/최소 속도 제한
                    self.current_linear_speed = max(min(new_linear_speed, self.max_linear_speed), self.min_linear_speed)
                    self.current_angular_speed = max(min(new_angular_speed, self.max_linear_speed), self.min_linear_speed)

                # 속도 값이 항상 float 타입이 되도록 보장
                self.current_linear_speed = float(self.current_linear_speed)
                self.current_angular_speed = float(self.current_angular_speed)

                twist = Twist()
                twist.linear.x = self.current_linear_speed
                twist.angular.z = self.current_angular_speed
                self.publisher_.publish(twist)
                self.log_speeds()
        except Exception as e:
            self.get_logger().error('Exception caught: ' + str(e))

    def log_speeds(self):
        self.get_logger().info(f'Current Speed: Linear={self.current_linear_speed:.3f} m/s, Angular={self.current_angular_speed:.3f} rad/s')

    def run(self):
        while rclpy.ok():
            self.publish_velocity()

def main(args=None):
    rclpy.init(args=args)
    teleop_node = TeleopNode()
    try:
        teleop_node.run()
    except KeyboardInterrupt:
        pass
    finally:
        teleop_node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

 

 

 

 

 

 

 

728x90