Action 서버
개요
ROS2 액션은 장시간 실행 작업(수초~수분)을 위한 3방향 통신 메커니즘입니다:
- Goal (클라이언트 → 서버): 달성할 목표
- Feedback (서버 → 클라이언트): 진행 상황 업데이트
- Result (서버 → 클라이언트): 최종 결과
- Cancel (클라이언트 → 서버): 중단 요청
PLEM의 표준 액션
1. FollowJointTrajectory
조인트 공간 궤적 실행 (표준 ROS2 Control 액션).
서버: joint_trajectory_controller
액션 이름: /joint_trajectory_controller/follow_joint_trajectory
패키지: joint_trajectory_controller (ROS2 Control 표준 컨트롤러)
런치: plem_bringup/launch/ros2_control.launch.py
활성화: ros2_control.launch.py 실행 시 자동 시작
다중 로봇 시스템에서는 네임스페이스가 추가됩니다:
/{robot_id}/joint_trajectory_controller/follow_joint_trajectory
메시지 구조:
# Goal
trajectory_msgs/JointTrajectory trajectory
# Result
int32 error_code
string error_string
# Feedback
trajectory_msgs/JointTrajectoryPoint desired
trajectory_msgs/JointTrajectoryPoint actual
trajectory_msgs/JointTrajectoryPoint error
CLI 테스트:
# 액션 서버 확인
ros2 action list | grep follow_joint_trajectory
# 간단한 궤적 전송
# 참고: 조인트 이름은 URDF에 정의된 실제 이름 사용 (예: Indy7의 경우 'joint0', 'joint1' 등)
# 스캐폴드로 생성된 패키지는 'joint1', 'joint2' 등을 사용할 수 있음
ros2 action send_goal /joint_trajectory_controller/follow_joint_trajectory \
control_msgs/action/FollowJointTrajectory \
"{trajectory: {joint_names: ['joint0', 'joint1', 'joint2', 'joint3', 'joint4', 'joint5'], \
points: [{positions: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], time_from_start: {sec: 2}}]}}"
2. SetMode
로봇 모드 제어 (BRAKED, TRAJECTORY, FREEDRIVE).
서버 노드: wim_rt_driver
액션 이름: /wim_rt_driver/set_mode
패키지: PLEM 런타임에 포함 (자동 시작)
런치: plem_bringup/launch/plem_launch.py에서 자동 시작
다중 로봇 시스템에서는 네임스페이스가 추가됩니다:
/{robot_id}/wim_rt_driver/set_mode
메시지 구조:
# Goal
wim_msgs/RobotMode target_robot_mode # 0=BRAKED, 2=TRAJECTORY, 3=FREEDRIVE (1=STOPPING은 내부용)
# Result
bool success
string message
# Feedback
wim_msgs/RobotMode current_robot_mode
wim_msgs/SafetyMode current_safety_mode
CLI 테스트:
# TRAJECTORY 모드로 전환 (mode: 2)
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode \
"{target_robot_mode: {mode: 2}}"
# FREEDRIVE 모드로 전환 (mode: 3)
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode \
"{target_robot_mode: {mode: 3}}"
# BRAKED 모드로 전환 (mode: 0)
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode \
"{target_robot_mode: {mode: 0}}"
3. PlanTrajectory
MoveIt/Pilz를 통한 모션 계획.
서버 노드: PLEM 모션 플래닝 서버
액션 이름: /plan_trajectory
패키지: PLEM 런타임에 포함 (MoveIt 실행 시 자동 시작)
런치: plem_bringup/launch/moveit.launch.py
활성화: moveit.launch.py 실행 시 자동 시작 (MoveIt 모션 플래너 필요)
다중 로봇 시스템에서는 네임스페이스가 추가됩니다:
/{robot_id}/plan_trajectory
메시지 구조:
# Goal
wim_msgs/MotionTarget[] targets # MOVEJ/MOVEL 타겟 시퀀스
# Result
trajectory_msgs/JointTrajectory trajectory
float64 duration
uint32 waypoint_count
bool success
string message
# Feedback
uint32 current_segment
uint32 total_segments
Python 클라이언트 예제
기본 액션 클라이언트
#!/usr/bin/env python3
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from wim_msgs.action import SetMode
class ModeControlClient(Node):
def __init__(self):
super().__init__('mode_control_client')
# 다중 로봇 시: f'/{robot_id}/wim_rt_driver/set_mode' 사용
self._action_client = ActionClient(self, SetMode, '/wim_rt_driver/set_mode')
def send_goal(self, target_mode):
"""
target_mode: 0=BRAKED, 2=TRAJECTORY, 3=FREEDRIVE (1=STOPPING은 내부용)
"""
goal_msg = SetMode.Goal()
goal_msg.target_robot_mode.mode = target_mode
self._action_client.wait_for_server()
self.get_logger().info(f'목표 전송: 모드={target_mode}')
send_goal_future = self._action_client.send_goal_async(
goal_msg, feedback_callback=self.feedback_callback)
send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('목표 거부됨')
return
self.get_logger().info('목표 승인됨')
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
if result.success:
self.get_logger().info(f'성공: {result.message}')
else:
self.get_logger().error(f'실패: {result.message}')
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(
f'피드백: 현재 모드={feedback.current_robot_mode.mode}, '
f'안전 모드={feedback.current_safety_mode.safety_mode}'
)
def main(args=None):
rclpy.init(args=args)
client = ModeControlClient()
# TRAJECTORY 모드로 전환
client.send_goal(target_mode=2)
rclpy.spin(client)
if __name__ == '__main__':
main()
궤적 실행 클라이언트
#!/usr/bin/env python3
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from builtin_interfaces.msg import Duration
class TrajectoryClient(Node):
def __init__(self):
super().__init__('trajectory_client')
self._action_client = ActionClient(
self,
FollowJointTrajectory,
'/joint_trajectory_controller/follow_joint_trajectory'
)
def send_trajectory(self, positions_list, durations):
"""
positions_list: 조인트 위치 리스트 (각 6개 값)
durations: 각 웨이포인트 도달 시간 (초)
"""
goal_msg = FollowJointTrajectory.Goal()
trajectory = JointTrajectory()
# 참고: 조인트 이름은 URDF 정의에 따라 다름
# Indy7 예시: 'joint0', 'joint1', etc. (0-indexed, no underscore)
# 스캐폴드 생성 패키지: 'joint1', 'joint2', etc. (1-indexed)
trajectory.joint_names = [
'joint0', 'joint1', 'joint2',
'joint3', 'joint4', 'joint5'
]
for positions, duration_sec in zip(positions_list, durations):
point = JointTrajectoryPoint()
point.positions = positions
point.time_from_start = Duration(sec=int(duration_sec))
trajectory.points.append(point)
goal_msg.trajectory = trajectory
self._action_client.wait_for_server()
self.get_logger().info(f'{len(positions_list)}개 웨이포인트 궤적 전송')
send_goal_future = self._action_client.send_goal_async(
goal_msg, feedback_callback=self.feedback_callback)
send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('궤적 거부됨')
return
self.get_logger().info('궤적 실행 시작')
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
if result.error_code == 0:
self.get_logger().info('궤적 실행 완료')
else:
self.get_logger().error(f'실행 실패: {result.error_string}')
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
# 선택 사항: 진행 상황 로깅
pass
def main(args=None):
rclpy.init(args=args)
client = TrajectoryClient()
# 예제: 2개 웨이포인트 궤적
positions_list = [
[0.0, 0.5, 1.0, 0.0, 0.0, 0.0], # 웨이포인트 1
[0.5, 0.5, 0.5, 0.5, 0.5, 0.5], # 웨이포인트 2
]
durations = [2.0, 4.0] # 2초, 4초
client.send_trajectory(positions_list, durations)
rclpy.spin(client)
if __name__ == '__main__':
main()
C++ 클라이언트 예제
비동기 액션 클라이언트
#include <rclcpp/rclcpp.hpp>
#include <rclcpp_action/rclcpp_action.hpp>
#include <wim_msgs/action/set_mode.hpp>
class ModeControlClient : public rclcpp::Node {
public:
using SetMode = wim_msgs::action::SetMode;
using GoalHandle = rclcpp_action::ClientGoalHandle<SetMode>;
ModeControlClient() : Node("mode_control_client") {
// 다중 로봇 시: "/{robot_id}/wim_rt_driver/set_mode" 사용
client_ = rclcpp_action::create_client<SetMode>(this, "/wim_rt_driver/set_mode");
}
void send_goal(int8_t target_mode) {
if (!client_->wait_for_action_server(std::chrono::seconds(5))) {
RCLCPP_ERROR(get_logger(), "액션 서버 사용 불가");
return;
}
auto goal = SetMode::Goal();
goal.target_robot_mode.mode = target_mode;
auto send_goal_options = rclcpp_action::Client<SetMode>::SendGoalOptions();
send_goal_options.feedback_callback =
[this](GoalHandle::SharedPtr, const std::shared_ptr<const SetMode::Feedback> feedback) {
RCLCPP_INFO(get_logger(), "현재 모드: %d", feedback->current_robot_mode.mode);
};
send_goal_options.result_callback =
[this](const GoalHandle::WrappedResult& result) {
if (result.code == rclcpp_action::ResultCode::SUCCEEDED) {
RCLCPP_INFO(get_logger(), "성공: %s", result.result->message.c_str());
} else {
RCLCPP_ERROR(get_logger(), "실패");
}
};
client_->async_send_goal(goal, send_goal_options);
}
private:
rclcpp_action::Client<SetMode>::SharedPtr client_;
};
int main(int argc, char** argv) {
rclcpp::init(argc, argv);
auto node = std::make_shared<ModeControlClient>();
node->send_goal(2); // TRAJECTORY 모드 (0=BRAKED, 2=TRAJECTORY, 3=FREEDRIVE)
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
액션 취소
Python 취소 예제
class CancellableClient(Node):
def __init__(self):
super().__init__('cancellable_client')
self._action_client = ActionClient(self, FollowJointTrajectory, '/joint_trajectory_controller/follow_joint_trajectory')
self._goal_handle = None
def send_goal(self, trajectory):
send_goal_future = self._action_client.send_goal_async(trajectory)
send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
self._goal_handle = future.result()
if self._goal_handle.accepted:
self.get_logger().info('목표 승인됨')
def cancel_goal(self):
if self._goal_handle:
self.get_logger().info('목표 취소 중...')
cancel_future = self._goal_handle.cancel_goal_async()
cancel_future.add_done_callback(self.cancel_done_callback)
def cancel_done_callback(self, future):
cancel_response = future.result()
if len(cancel_response.goals_canceling) > 0:
self.get_logger().info('목표 취소됨')
else:
self.get_logger().warn('취소 실패')
CLI 취소
ROS2 Humble CLI에서는 단일 명령으로 액션을 취소할 수 없습니다. 위의 Python 예시를 사용하거나, 로봇 모드를 BRAKED로 전환하여 궤적 실행을 중단하세요.
# 모드 전환으로 궤적 중단
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode "{target_mode: 0}"
에러 처리
결과 코드
| 코드 | 의미 | 조치 |
|---|---|---|
SUCCEEDED | 목표 달성 | 정상 완료 |
CANCELED | 사용자 취소 | 취소 보고 |
ABORTED | 서버 에러 | 에러 로깅, 재시도 |
UNKNOWN | 예상치 못한 상태 | 조사 필요 |
일반적인 에러 시나리오
목표 거부:
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('목표 거부됨 - 원인:')
self.get_logger().error('- SafetyMode 확인 (NORMAL이어야 함)')
self.get_logger().error('- 파라미터 범위 확인')
self.get_logger().error('- 하드웨어 준비 상태 확인')
return
실행 중 타임아웃:
def get_result_callback(self, future):
result = future.result().result
if not result.success and 'timeout' in result.message.lower():
self.get_logger().error('타임아웃 발생 - 복구 시도')
# 모드를 BRAKED로 리셋
# 또는 안전 상태 확인
안전 위반:
# 주기적으로 안전 상태 확인
def feedback_callback(self, feedback_msg):
if feedback_msg.feedback.current_safety_mode.safety_mode != 0: # NORMAL
self.get_logger().warn('안전 위반 감지 - 목표 취소')
self.cancel_goal()
디버깅
액션 서버 상태 확인
# 사용 가능한 액션 서버
ros2 action list
# 액션 인터페이스 정보
ros2 action info /wim_rt_driver/set_mode
# 액션 타입 정의
ros2 interface show wim_msgs/action/SetMode
액션 트래픽 모니터링
# 목표 전송 모니터링
ros2 topic echo /wim_rt_driver/set_mode/_action/send_goal
# 피드백 모니터링
ros2 topic echo /wim_rt_driver/set_mode/_action/feedback
# 결과 모니터링
ros2 topic echo /wim_rt_driver/set_mode/_action/get_result
로깅
# 액션 서버 로그
ros2 topic echo /rosout | grep ModeControlActionServer
# 특정 노드 로그 레벨 설정
ros2 run rqt_console rqt_console
요약
핵심 요점:
- 표준 액션:
FollowJointTrajectory(궤적 실행),SetMode(모드 제어),PlanTrajectory(모션 계획) - 런치 구조:
plem_bringup/launch/ros2_control.launch.py: FollowJointTrajectory + SetMode 액션 서버plem_bringup/launch/moveit.launch.py: PlanTrajectory 액션 서버plem_bringup/launch/plem_launch.py: 모든 컴포넌트 자동 통합
- 패키지 위치:
- FollowJointTrajectory:
joint_trajectory_controller(ROS2 Control 표준) - SetMode/PlanTrajectory:
plem_ros2_adapter(PLEM 커스텀)
- FollowJointTrajectory:
- 호출 방법: CLI (
ros2 action send_goal), Python (ActionClient), C++ (rclcpp_action::Client) - 에러 처리: 목표 거부 확인, 결과 코드 검증, 안전 상태 모니터링
- 취소:
cancel_goal_async()(Python),async_cancel_goal()(C++) - 디버깅:
ros2 action list/info,/rosout로그, 피드백 토픽
다음 단계:
- ROS2 런치 시스템 - IPC 통신 및 런치 시스템 이해
- 안전 모드 - SafetyMode와 RobotMode 상태 관리