Skip to main content

Action Servers

Overview

ROS2 actions are a three-way communication mechanism for long-running tasks (seconds to minutes):

  • Goal (client → server): The objective to achieve
  • Feedback (server → client): Progress updates
  • Result (server → client): Final outcome
  • Cancel (client → server): Cancellation request

PLEM Standard Actions

1. FollowJointTrajectory

Joint-space trajectory execution (standard ROS2 Control action).

Server: joint_trajectory_controller Action name: /joint_trajectory_controller/follow_joint_trajectory Package: joint_trajectory_controller (ROS2 Control standard controller) Launch: plem_bringup/launch/ros2_control.launch.py Activation: Automatically starts when ros2_control.launch.py is executed

Multi-Robot Environments

In multi-robot systems, a namespace is added: /{robot_id}/joint_trajectory_controller/follow_joint_trajectory

Message structure:

# Goal
trajectory_msgs/JointTrajectory trajectory

# Result
int32 error_code
string error_string

# Feedback
trajectory_msgs/JointTrajectoryPoint desired
trajectory_msgs/JointTrajectoryPoint actual
trajectory_msgs/JointTrajectoryPoint error

CLI testing:

# Check action server
ros2 action list | grep follow_joint_trajectory

# Send simple trajectory
# Note: Use actual joint names defined in URDF (e.g., 'joint0', 'joint1' for Indy7)
# Scaffold-generated packages may use 'joint1', 'joint2', etc.
ros2 action send_goal /joint_trajectory_controller/follow_joint_trajectory \
control_msgs/action/FollowJointTrajectory \
"{trajectory: {joint_names: ['joint0', 'joint1', 'joint2', 'joint3', 'joint4', 'joint5'], \
points: [{positions: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], time_from_start: {sec: 2}}]}}"

2. SetMode

Robot mode control (BRAKED, TRAJECTORY, FREEDRIVE).

Server node: wim_rt_driver Action name: /wim_rt_driver/set_mode Package: Included in PLEM runtime (starts automatically) Launch: Automatically started in plem_bringup/launch/plem_launch.py

Multi-Robot Environments

In multi-robot systems, a namespace is added: /{robot_id}/wim_rt_driver/set_mode

Message structure:

# Goal
wim_msgs/RobotMode target_robot_mode # 0=BRAKED, 2=TRAJECTORY, 3=FREEDRIVE (1=STOPPING is internal)

# Result
bool success
string message

# Feedback
wim_msgs/RobotMode current_robot_mode
wim_msgs/SafetyMode current_safety_mode

CLI testing:

# Switch to TRAJECTORY mode (mode: 2)
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode \
"{target_robot_mode: {mode: 2}}"

# Switch to FREEDRIVE mode (mode: 3)
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode \
"{target_robot_mode: {mode: 3}}"

# Switch to BRAKED mode (mode: 0)
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode \
"{target_robot_mode: {mode: 0}}"

3. PlanTrajectory

Motion planning through MoveIt/Pilz.

Server node: PLEM motion planning server Action name: /plan_trajectory Package: Included in PLEM runtime (starts automatically when MoveIt is running) Launch: plem_bringup/launch/moveit.launch.py Activation: Automatically starts when moveit.launch.py is executed (requires MoveIt motion planner)

Multi-Robot Environments

In multi-robot systems, a namespace is added: /{robot_id}/plan_trajectory

Message structure:

# Goal
wim_msgs/MotionTarget[] targets # MOVEJ/MOVEL target sequence

# Result
trajectory_msgs/JointTrajectory trajectory
float64 duration
uint32 waypoint_count
bool success
string message

# Feedback
uint32 current_segment
uint32 total_segments

Python Client Examples

Basic Action Client

#!/usr/bin/env python3
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from wim_msgs.action import SetMode

class ModeControlClient(Node):
def __init__(self):
super().__init__('mode_control_client')
# For multi-robot: use f'/{robot_id}/wim_rt_driver/set_mode'
self._action_client = ActionClient(self, SetMode, '/wim_rt_driver/set_mode')

def send_goal(self, target_mode):
"""
target_mode: 0=BRAKED, 2=TRAJECTORY, 3=FREEDRIVE (1=STOPPING is internal)
"""
goal_msg = SetMode.Goal()
goal_msg.target_robot_mode.mode = target_mode

self._action_client.wait_for_server()
self.get_logger().info(f'Sending goal: mode={target_mode}')

send_goal_future = self._action_client.send_goal_async(
goal_msg, feedback_callback=self.feedback_callback)
send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('Goal rejected')
return

self.get_logger().info('Goal accepted')
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_callback)

def get_result_callback(self, future):
result = future.result().result
if result.success:
self.get_logger().info(f'Success: {result.message}')
else:
self.get_logger().error(f'Failed: {result.message}')

def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(
f'Feedback: current mode={feedback.current_robot_mode.mode}, '
f'safety mode={feedback.current_safety_mode.safety_mode}'
)

def main(args=None):
rclpy.init(args=args)
client = ModeControlClient()

# Switch to TRAJECTORY mode
client.send_goal(target_mode=2)
rclpy.spin(client)

if __name__ == '__main__':
main()

Trajectory Execution Client

#!/usr/bin/env python3
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from builtin_interfaces.msg import Duration

class TrajectoryClient(Node):
def __init__(self):
super().__init__('trajectory_client')
self._action_client = ActionClient(
self,
FollowJointTrajectory,
'/joint_trajectory_controller/follow_joint_trajectory'
)

def send_trajectory(self, positions_list, durations):
"""
positions_list: List of joint positions (6 values each)
durations: Time to reach each waypoint (seconds)
"""
goal_msg = FollowJointTrajectory.Goal()

trajectory = JointTrajectory()
# Note: Joint names vary by URDF definition
# Indy7 example: 'joint0', 'joint1', etc. (0-indexed, no underscore)
# Scaffold-generated packages: 'joint1', 'joint2', etc. (1-indexed)
trajectory.joint_names = [
'joint0', 'joint1', 'joint2',
'joint3', 'joint4', 'joint5'
]

for positions, duration_sec in zip(positions_list, durations):
point = JointTrajectoryPoint()
point.positions = positions
point.time_from_start = Duration(sec=int(duration_sec))
trajectory.points.append(point)

goal_msg.trajectory = trajectory

self._action_client.wait_for_server()
self.get_logger().info(f'Sending trajectory with {len(positions_list)} waypoints')

send_goal_future = self._action_client.send_goal_async(
goal_msg, feedback_callback=self.feedback_callback)
send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('Trajectory rejected')
return

self.get_logger().info('Trajectory execution started')
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_callback)

def get_result_callback(self, future):
result = future.result().result
if result.error_code == 0:
self.get_logger().info('Trajectory execution completed')
else:
self.get_logger().error(f'Execution failed: {result.error_string}')

def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
# Optional: Log progress
pass

def main(args=None):
rclpy.init(args=args)
client = TrajectoryClient()

# Example: 2-waypoint trajectory
positions_list = [
[0.0, 0.5, 1.0, 0.0, 0.0, 0.0], # Waypoint 1
[0.5, 0.5, 0.5, 0.5, 0.5, 0.5], # Waypoint 2
]
durations = [2.0, 4.0] # 2 seconds, 4 seconds

client.send_trajectory(positions_list, durations)
rclpy.spin(client)

if __name__ == '__main__':
main()

C++ Client Examples

Asynchronous Action Client

#include <rclcpp/rclcpp.hpp>
#include <rclcpp_action/rclcpp_action.hpp>
#include <wim_msgs/action/set_mode.hpp>

class ModeControlClient : public rclcpp::Node {
public:
using SetMode = wim_msgs::action::SetMode;
using GoalHandle = rclcpp_action::ClientGoalHandle<SetMode>;

ModeControlClient() : Node("mode_control_client") {
// For multi-robot: use "/{robot_id}/wim_rt_driver/set_mode"
client_ = rclcpp_action::create_client<SetMode>(this, "/wim_rt_driver/set_mode");
}

void send_goal(int8_t target_mode) {
if (!client_->wait_for_action_server(std::chrono::seconds(5))) {
RCLCPP_ERROR(get_logger(), "Action server unavailable");
return;
}

auto goal = SetMode::Goal();
goal.target_robot_mode.mode = target_mode;

auto send_goal_options = rclcpp_action::Client<SetMode>::SendGoalOptions();

send_goal_options.feedback_callback =
[this](GoalHandle::SharedPtr, const std::shared_ptr<const SetMode::Feedback> feedback) {
RCLCPP_INFO(get_logger(), "Current mode: %d", feedback->current_robot_mode.mode);
};

send_goal_options.result_callback =
[this](const GoalHandle::WrappedResult& result) {
if (result.code == rclcpp_action::ResultCode::SUCCEEDED) {
RCLCPP_INFO(get_logger(), "Success: %s", result.result->message.c_str());
} else {
RCLCPP_ERROR(get_logger(), "Failed");
}
};

client_->async_send_goal(goal, send_goal_options);
}

private:
rclcpp_action::Client<SetMode>::SharedPtr client_;
};

int main(int argc, char** argv) {
rclcpp::init(argc, argv);
auto node = std::make_shared<ModeControlClient>();
node->send_goal(2); // TRAJECTORY mode (0=BRAKED, 2=TRAJECTORY, 3=FREEDRIVE)
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}

Action Cancellation

Python Cancellation Example

class CancellableClient(Node):
def __init__(self):
super().__init__('cancellable_client')
self._action_client = ActionClient(self, FollowJointTrajectory, '/joint_trajectory_controller/follow_joint_trajectory')
self._goal_handle = None

def send_goal(self, trajectory):
send_goal_future = self._action_client.send_goal_async(trajectory)
send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
self._goal_handle = future.result()
if self._goal_handle.accepted:
self.get_logger().info('Goal accepted')

def cancel_goal(self):
if self._goal_handle:
self.get_logger().info('Canceling goal...')
cancel_future = self._goal_handle.cancel_goal_async()
cancel_future.add_done_callback(self.cancel_done_callback)

def cancel_done_callback(self, future):
cancel_response = future.result()
if len(cancel_response.goals_canceling) > 0:
self.get_logger().info('Goal canceled')
else:
self.get_logger().warn('Cancellation failed')

CLI Cancellation

ROS2 Humble CLI Limitations

ROS2 Humble CLI does not support canceling actions with a single command. Use the Python example above, or switch the robot mode to BRAKED to stop trajectory execution.

# Stop trajectory by switching mode
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode "{target_mode: 0}"

Error Handling

Result Codes

CodeMeaningAction
SUCCEEDEDGoal achievedNormal completion
CANCELEDUser canceledReport cancellation
ABORTEDServer errorLog error, retry
UNKNOWNUnexpected stateInvestigation required

Common Error Scenarios

Goal rejection:

def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('Goal rejected - check:')
self.get_logger().error('- SafetyMode (must be NORMAL)')
self.get_logger().error('- Parameter ranges')
self.get_logger().error('- Hardware ready state')
return

Execution timeout:

def get_result_callback(self, future):
result = future.result().result
if not result.success and 'timeout' in result.message.lower():
self.get_logger().error('Timeout occurred - attempting recovery')
# Reset mode to BRAKED
# Or check safety state

Safety violation:

# Periodically check safety state
def feedback_callback(self, feedback_msg):
if feedback_msg.feedback.current_safety_mode.safety_mode != 0: # NORMAL
self.get_logger().warn('Safety violation detected - canceling goal')
self.cancel_goal()

Debugging

Check Action Server Status

# Available action servers
ros2 action list

# Action interface info
ros2 action info /wim_rt_driver/set_mode

# Action type definition
ros2 interface show wim_msgs/action/SetMode

Monitor Action Traffic

# Monitor goal sending
ros2 topic echo /wim_rt_driver/set_mode/_action/send_goal

# Monitor feedback
ros2 topic echo /wim_rt_driver/set_mode/_action/feedback

# Monitor results
ros2 topic echo /wim_rt_driver/set_mode/_action/get_result

Logging

# Action server logs
ros2 topic echo /rosout | grep ModeControlActionServer

# Set log level for specific node
ros2 run rqt_console rqt_console

Summary

Key points:

  1. Standard actions: FollowJointTrajectory (trajectory execution), SetMode (mode control), PlanTrajectory (motion planning)
  2. Launch structure:
    • plem_bringup/launch/ros2_control.launch.py: FollowJointTrajectory + SetMode action servers
    • plem_bringup/launch/moveit.launch.py: PlanTrajectory action server
    • plem_bringup/launch/plem_launch.py: Automatic integration of all components
  3. Package locations:
    • FollowJointTrajectory: joint_trajectory_controller (ROS2 Control standard)
    • SetMode/PlanTrajectory: plem_ros2_adapter (PLEM custom)
  4. Invocation methods: CLI (ros2 action send_goal), Python (ActionClient), C++ (rclcpp_action::Client)
  5. Error handling: Check goal rejection, validate result codes, monitor safety state
  6. Cancellation: cancel_goal_async() (Python), async_cancel_goal() (C++)
  7. Debugging: ros2 action list/info, /rosout logs, feedback topics

Next steps: