인공지능...?/터틀봇 프로젝트
[Turtlebot3] 키보드 조작 이동 코드
복습
2024. 7. 28. 19:39
728x90
ROS 2를 사용하여 원격으로 로봇을 제어하기 위해 토픽을 사용하는 방법
기존에는 아래의 코드를 사용했지만 키보드를 사용하기 위해서는 루트 권한이 필요하다고 함.
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import keyboard # 키보드 라이브러리 임포트
class RobotDriver(Node):
def __init__(self):
super().__init__('robot_driver')
self.publisher = self.create_publisher(Twist, 'cmd_vel', 10)
self.speed = 0.0
self.angle = 0.0
def update(self):
msg = Twist()
msg.linear.x = self.speed
msg.angular.z = self.angle
self.publisher.publish(msg)
def check_keyboard(self):
if keyboard.is_pressed('up'):
self.speed = 0.5
elif keyboard.is_pressed('down'):
self.speed = -0.5
elif keyboard.is_pressed('left'):
self.angle = 0.5
elif keyboard.is_pressed('right'):
self.angle = -0.5
elif keyboard.is_pressed('space'):
self.speed = 0
self.angle = 0
self.update() # 속도나 각도가 변경될 때마다 메시지를 업데이트하고 발행
def main(args=None):
rclpy.init(args=args)
node = RobotDriver()
try:
while rclpy.ok():
node.check_keyboard()
rclpy.spin_once(node, timeout_sec=0.1) # 실시간 반응성을 위해 짧은 타임아웃 사용
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
# pip install keyboard
ROS 2 Topic을 사용한 원격 제어 키보드 입력을 직접 처리하는 대신, ROS 2의 토픽을 사용하여 원격으로 로봇을 제어할 수 있습니다. 이 방법은 ROS 2 시스템의 분산 능력을 활용하며, 다양한 입력 소스 (웹 인터페이스, GUI, 다른 노드 등)에서 로봇을 제어할 수 있게 해줍니다.
따라서 아래의 방법을 시도함.
1. ROS 2 노드 설계
필요한 패키지 설치
sudo apt install ros-<distro>-demo-nodes-cpp # ROS 2 C++ 데모 노드
sudo apt install ros-<distro>-demo-nodes-py # ROS 2 Python 데모 노드
# 여기서 <distro>는 ROS 2 배포판의 이름 (예: foxy, galactic 등)입니다.
나는 현재 foxy를 사용하므로 아래의 코드를 사용한다.
sudo apt install ros-foxy-demo-nodes-py
이미 설치 되어 있었다.
2. 코드 설명
클래스 TeleopNode
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import sys
import select
import tty
import termios
class TeleopNode(Node):
def __init__(self):
super().__init__('teleop_twist_keyboard')
self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
self.settings = termios.tcgetattr(sys.stdin)
self.key_map = {
'w': (1.0, 0.0),
's': (-1.0, 0.0),
'a': (0.0, 1.0),
'd': (0.0, -1.0),
' ': (0.0, 0.0) # Stop
}
1. __init__ 메서드
- 노드 초기화: super().__init__('teleop_twist_keyboard')를 호출하여 Node 클래스를 초기화하고, 노드 이름을 'teleop_twist_keyboard'로 설정합니다.
- 퍼블리셔 생성: self.create_publisher(Twist, 'cmd_vel', 10)를 사용하여 cmd_vel 토픽에 메시지를 발행하기 위한 퍼블리셔를 생성합니다. Twist 메시지 타입을 사용하며, QoS(Quality of Service)의 큐 크기를 10으로 설정합니다.
- 터미널 설정 저장: self.settings = termios.tcgetattr(sys.stdin)을 통해 터미널의 현재 설정을 저장합니다. 이는 나중에 터미널 설정을 원래대로 복원하는 데 사용됩니다.
- 키 매핑: self.key_map에서 키보드의 각 키('w', 's', 'a', 'd', ' ')에 대한 선형 및 각속도 명령을 설정합니다.
이때 ' '는 스페이스 바이다.
self.key_map 딕셔너리에서 각 키는 로봇에게 전달할 Twist 메시지의 선형 및 각속도 값을 포함하고 있습니다. Twist 메시지는 로봇의 속도를 제어하기 위해 사용되며, 다음과 같이 구성됩니다:
- linear.x: 로봇의 전진/후진 속도 (m/s)
- angular.z: 로봇의 회전 속도 (rad/s)
각 키에 대한 값은 튜플 (linear.x, angular.z)로 설정됩니다. 여기서:
def get_key(self):
tty.setraw(sys.stdin.fileno())
rlist, _, _ = select.select([sys.stdin], [], [], 0.1)
if rlist:
key = sys.stdin.read(1)
else:
key = ''
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.settings)
return key
2. get_key 메서드
- 키 입력 감지: tty.setraw(sys.stdin.fileno())를 호출하여 터미널을 원시 모드로 설정합니다. 이 모드에서는 입력한 키 값이 바로 프로그램으로 전달됩니다.
- 키 읽기: select.select([sys.stdin], [], [], 0.1)를 사용하여 최대 0.1초 동안 키 입력을 대기합니다. 입력이 감지되면 sys.stdin.read(1)을 호출하여 키 값을 읽습니다.
- 터미널 설정 복원: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.settings)를 호출하여 터미널 설정을 원래대로 복원합니다.
def publish_velocity(self):
try:
key = self.get_key() # 키보드에서 키 입력을 받습니다.
if key in self.key_map: # 받은 키가 key_map 딕셔너리에 있는지 확인합니다.
linear, angular = self.key_map[key] # 해당 키에 매핑된 선형 및 각속도 값을 가져옵니다.
twist = Twist()
twist.linear.x = linear # 선형 속도 값을 Twist 메시지의 linear.x에 설정합니다.
twist.angular.z = angular # 각속도 값을 Twist 메시지의 angular.z에 설정합니다.
self.publisher_.publish(twist) # 설정된 Twist 메시지를 cmd_vel 토픽으로 발행합니다.
except Exception as e:
self.get_logger().error('Exception caught: ' + str(e))
3. publish_velocity 메서드
- 키 입력 처리: get_key 메서드를 호출하여 키 값을 얻고, 이 키 값이 key_map에 있다면 해당 키에 매핑된 선형 및 각속도를 Twist 메시지로 설정합니다.
- 메시지 발행: 설정된 Twist 메시지를 cmd_vel 토픽으로 발행합니다.
def run(self):
while rclpy.ok():
self.publish_velocity()
4. run 메서드
- 메인 루프: rclpy.ok()가 True인 동안 publish_velocity 메서드를 호출하여 반복적으로 키 입력을 체크하고, 입력에 따라 속도 메시지를 발행합니다.
def main(args=None):
rclpy.init(args=args)
teleop_node = TeleopNode()
try:
teleop_node.run()
except KeyboardInterrupt:
pass
finally:
teleop_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
main 함수
- 노드 실행: rclpy.init(args=args)로 ROS 2 파이썬 클라이언트 라이브러리를 초기화하고, TeleopNode 인스턴스를 생성한 후 run 메서드를 호출하여 노드를 실행합니다.
- 예외 처리: try-except 블록을 사용하여 KeyboardInterrupt (일반적으로 Ctrl+C)가 발생할 경우 프로그램을 깔끔하게 종료합니다.
전체 코드
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import sys
import select
import tty
import termios
class TeleopNode(Node):
def __init__(self):
super().__init__('teleop_twist_keyboard')
self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
self.settings = termios.tcgetattr(sys.stdin)
self.key_map = {
'w': (1.0, 0.0),
's': (-1.0, 0.0),
'a': (0.0, 1.0),
'd': (0.0, -1.0),
' ': (0.0, 0.0) # Stop
}
def get_key(self):
tty.setraw(sys.stdin.fileno())
rlist, _, _ = select.select([sys.stdin], [], [], 0.1)
if rlist:
key = sys.stdin.read(1)
else:
key = ''
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.settings)
return key
def publish_velocity(self):
try:
key = self.get_key()
if key in self.key_map:
linear, angular = self.key_map[key]
twist = Twist()
twist.linear.x = linear
twist.angular.z = angular
self.publisher_.publish(twist)
except Exception as e:
self.get_logger().error('Exception caught: ' + str(e))
def run(self):
while rclpy.ok():
self.publish_velocity()
def main(args=None):
rclpy.init(args=args)
teleop_node = TeleopNode()
try:
teleop_node.run()
except KeyboardInterrupt:
pass
finally:
teleop_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
그런데 이 코드는 ctrl + c를 해도 터미널이 멈추지 않는다.
왜냐하면 tty.setraw 함수로 인해 터미널의 입력 설정이 변경되기 때문이다. 따라서 아래와 같이 문제를 해결한다.
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import sys
import select
import tty
import termios
class TeleopNode(Node):
def __init__(self):
super().__init__('teleop_twist_keyboard')
self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
self.settings = termios.tcgetattr(sys.stdin)
self.key_map = {
'w': (1.0, 0.0),
's': (-1.0, 0.0),
'a': (0.0, 1.0),
'd': (0.0, -1.0),
' ': (0.0, 0.0) # Stop
}
def get_key(self):
original_settings = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
rlist, _, _ = select.select([sys.stdin], [], [], 0.01) # 시간 감소
if rlist:
key = sys.stdin.read(1)
if key == '\x03': # Ctrl+C
raise KeyboardInterrupt
else:
key = ''
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, original_settings)
return key
def publish_velocity(self):
try:
key = self.get_key()
if key in self.key_map:
linear, angular = self.key_map[key]
twist = Twist()
twist.linear.x = linear
twist.angular.z = angular
self.publisher_.publish(twist)
except Exception as e:
self.get_logger().error('Exception caught: ' + str(e))
def run(self):
try:
while rclpy.ok():
self.publish_velocity()
except KeyboardInterrupt:
self.get_logger().info('Keyboard interrupt received, shutting down.')
self.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
teleop_node = TeleopNode()
try:
teleop_node.run()
except KeyboardInterrupt:
pass
finally:
teleop_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
그냥 완벽한 키보드 작동 코드
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import sys
import select
import tty
import termios
class TeleopNode(Node):
def __init__(self):
super().__init__('teleop_twist_keyboard')
self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
self.settings = termios.tcgetattr(sys.stdin)
self.current_linear_speed = 0.0
self.current_angular_speed = 0.0
self.max_linear_speed = 2.0 # 최대 선형 속도
self.min_linear_speed = -2.0 # 최소 선형 속도
self.speed_increment = 0.1 # 속도 증가량
self.angular_increment = 0.1 # 각속도 증가량
self.key_map = {
'w': (self.speed_increment, 0.0),
's': (-self.speed_increment, 0.0),
'a': (0.0, self.angular_increment),
'd': (0.0, -self.angular_increment),
' ': (0.0, 0.0) # Stop
}
def get_key(self):
original_settings = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
rlist, _, _ = select.select([sys.stdin], [], [], 0.01)
if rlist:
key = sys.stdin.read(1)
if key == '\x03': # Ctrl+C
raise KeyboardInterrupt
else:
key = ''
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, original_settings)
return key
def publish_velocity(self):
try:
key = self.get_key()
if key in self.key_map:
linear_change, angular_change = self.key_map[key]
if key == ' ':
# 스페이스바를 눌렀을 때 속도 및 각속도를 0.0으로 설정
self.current_linear_speed = 0.0
self.current_angular_speed = 0.0
else:
# 새로운 속도 계산
new_linear_speed = self.current_linear_speed + linear_change
new_angular_speed = self.current_angular_speed + angular_change
# 최대/최소 속도 제한
self.current_linear_speed = max(min(new_linear_speed, self.max_linear_speed), self.min_linear_speed)
self.current_angular_speed = max(min(new_angular_speed, self.max_linear_speed), self.min_linear_speed)
# 속도 값이 항상 float 타입이 되도록 보장
self.current_linear_speed = float(self.current_linear_speed)
self.current_angular_speed = float(self.current_angular_speed)
twist = Twist()
twist.linear.x = self.current_linear_speed
twist.angular.z = self.current_angular_speed
self.publisher_.publish(twist)
self.log_speeds()
except Exception as e:
self.get_logger().error('Exception caught: ' + str(e))
def log_speeds(self):
self.get_logger().info(f'Current Speed: Linear={self.current_linear_speed:.3f} m/s, Angular={self.current_angular_speed:.3f} rad/s')
def run(self):
while rclpy.ok():
self.publish_velocity()
def main(args=None):
rclpy.init(args=args)
teleop_node = TeleopNode()
try:
teleop_node.run()
except KeyboardInterrupt:
pass
finally:
teleop_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
728x90