Robotics - [ROS2 Foundation] Ros2 Rclpy Executor Callback Model

Rclpy Executors, Rate Object, Custom Node Destruction

Posted by Rico's Nerd Cluster on November 27, 2024

Rclpy MultiThreadedExecutor

rclpy.executors.MultiThreadedExecutor is implemented entirely in Python, not as a direct wrapper around the C++ rclcpp::executors::MultiThreadedExecutor. Internally it builds a concurrent.futures.ThreadPoolExecutor to run callbacks, so the threads it launches are ordinary Python threads that share the Global Interpreter Lock (GIL)

Here is the source code of Rclpy’s executors. In MultiThreadedExecutor,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from concurrent.futures import ThreadPoolExecutor
class MultiThreadedExecutor(Executor):
    ...
    def __init__(self, num_threads=None, *, context=None):
        ...
        self._executor = ThreadPoolExecutor(num_threads)   # <-- Python thread pool
    def _spin_once_impl(
        self,
        timeout_sec: Optional[Union[float, TimeoutObject]] = None,
        wait_condition: Callable[[], bool] = lambda: False
    ) -> None:
        handler, entity, node = self.wait_for_ready_callbacks(
                timeout_sec, None, wait_condition)
        self._executor.submit(handler)
        self._futures.append(handler)
        for future in self._futures:  # check for any exceptions
            if future.done():
                self._futures.remove(future)
                future.result()
1
- Note that if there's no `timeout_sec` specified, the `executor` could block indefinitely until a future becomes ready.
  • Callbacks that spend most time C extensions, such as those in rclpy._rclpy bindings, NumPy, OpenCV, can overlap and really run in parallel on different OS threads.
  • Both rclpy.executors.MultiThreadedExecutor and rclcpp::executors::MultiThreadedExecutor sit on top of the common C layer (rcl) and wait on the same DDS wait-sets.
    • The C++ version does its scheduling and callback invocation in C++ threads with no GIL
    • Python implementation will time-slice under the GIL and does not use multiple cores.

Threading Model (Subscriber, Service, etc.)

ROS 2 provides two main threading models for subscription callbacks: - By default, all callbacks run on a single thread.

1
2
3
4
5
rclpy.init()
node = rclpy.create_node("single_threaded")
executor = rclpy.executors.SingleThreadedExecutor()
executor.add_node(node)
executor.spin()  # All callbacks run in ONE thread
  • Multi-Threaded Executor (Parallel Processing of Callbacks) with Reentrant Callback Groups (More Control over Threads) When using MultiThreadedExecutor, you can explicitly declare that certain callbacks should run in parallel by using ReentrantCallbackGroup.
1
2
3
4
from rclpy.callback_groups import ReentrantCallbackGroup
group = ReentrantCallbackGroup()
node.create_subscription(Imu, "imu_data", imu_callback, 10, callback_group=group)
node.create_subscription(LaserScan, "scan", scan_callback, 10, callback_group=group)

rclpy.Rate

rclpy.Rate is a wrapper around a ROS Timer callback. Any callback in ROS 2 needs an executor to execute. rclpy.spin() is a wrapper around an executor, and it checks if any callback needs to be executed. So, rclpy.Rate() needs to communicate with a rclpy.spin() on a separate thread. This is a key difference in exeuction model from ROS1

Reference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import rclpy

rclpy.init()
node = rclpy.create_node('simple_node')

# Spin in a separate thread
thread = threading.Thread(target=rclpy.spin, args=(node, ), daemon=True)
thread.start()

rate = node.create_rate(2)

try:
    while rclpy.ok():
        print('Help me body, you are my only hope')
        rate.sleep()
except KeyboardInterrupt:
    pass

rclpy.shutdown()
thread.join()

Destruction

Sometimes, we want to publish / send service requests upon destructing a node. In that case, we need to disable the default SIGINT handler because it will shutdown our executor before destructing objects that publish / send service requests / roslog’s own publisher:

1
2
3
4
5
6
7
8
9
10
11
# We want our objects destructed first, so we can use adios services
rclpy.init(signal_handler_options=rclpy.SignalHandlerOptions.NO)
server = WebServerNode()
try:
    server.run()
except KeyboardInterrupt:
    pass                        # we caught SIGINT ourselves
finally:
    # executor is still spinning here
    server.destroy_node()
    rclpy.shutdown()            # now shut rclpy down

Issues

ValueError: generator already executing

The direct cause comes from Python’s generator machinery: you’ve got two threads both trying to drive the same spin‐loop generator in rclpy. For example:

  • if you spin a node on one thread

    1
    2
    
      t = Thread(target=lambda: rclpy.spin(self.cmc), daemon=True)
      t.start()
    
  • And call a service on another thread:

    1
    2
    
      fut = self.load_cli.call_async(req)
      rclpy.spin_until_future_complete(self.cmc, fut)
    
  • Under the hood, spin() calls spin_once(), but spin_until_future_complete will also invoke spin_once() on that same executor generator.

    • SingleThreadedExecutor is implemented around a Python generator (_wait_for_ready_callbacks)
    • self.executor.spin_until_future_complete() tries to re‑enter the same generator. Python generators are not re‑entrant, so the interpreter raises ValueError: generator already executing.
    • MultiThreadedExecutor doesn’t use that generator; it waits in C and dispatches callbacks through a thread‑pool, so the re‑entrancy never occurs—hence “it works with MultiThreadedExecutor.”

resp = fut.result() returns None

Future.result() returning None just means the Future was never filled. The rclpy Future returns None when no result has been set yet

  • fut = client.call_async(req) merely enqueues the request and returns a Future. That Future is populated only when the executor takes the response from DDS and runs the client’s response-handler callback.

The rclpy docs spell the solution out: “Execute work until the future is complete.” You must either call rclpy.spin_until_future_complete() or keep an executor spinning concurrently. - GitHub issue #1141 shows the exact symptom—Future is always None until an executor spins.

E.g., Use a MultiThreadedExecutor executor (recommended)

1
2
3
4
5
6
from rclpy.executors import MultiThreadedExecutor

self.executor = MultiThreadedExecutor()
self.executor.add_node(self.cmc)
# Spin in background
Thread(target=self.executor.spin, daemon=True).start()