ROS2 Runner

class codestral_ros2_gen.utils.ros2_runner.ROS2Runner(node_command, test_command, test_timeout=30)[source]

Bases: object

ROS2Runner encapsulates methods to launch a ROS2 node, run pytest tests in the ROS2 environment, capture the test output for analysis, and terminate the node.

Parameters:
  • node_command (str)

  • test_command (str)

  • test_timeout (int)

node_command

Shell command to launch the ROS2 node.

Type:

str

test_command

Shell command to run tests (pytest).

Type:

str

test_timeout

How many seconds to wait for the process before timing out. Defaults to 30 seconds.

Type:

int

node_process

The process object for the ROS2 node.

Type:

subprocess.Popen

test_output

Captured output from the test command.

Type:

str

tests_passed

Number of tests that passed.

Type:

int

tests_failed

Number of tests that failed.

Type:

int

tests_skipped

Number of tests that were skipped.

Type:

int

__init__(node_command, test_command, test_timeout=30)[source]

Initialize the ROS2Runner.

Parameters:
  • node_command (str) – Shell command to launch the ROS2 node.

  • test_command (str) – Shell command to run tests (pytest).

  • test_timeout (int) – How many seconds to wait for the process before timing out. Defaults to 30 seconds.

start_node()[source]

Start the ROS2 node and verify it launched successfully.

If “skip / SKIP” is found in the node_command, the node is not started.

Raises:

RuntimeError – If the node fails to start or register with ROS2.

Return type:

None

run_tests()[source]

Run pytest using the test_command and capture the output.

Returns:

The return code of the test command. If the command times out,

it returns 1 by default.

Return type:

int

kill_node()[source]

Terminate the running ROS2 node cleanly, and force-kill if necessary. Uses psutil to find and terminate the actual ROS2 node process, not just the shell.

Return type:

None

_get_tests_stat()[source]

Parse the test output to get the number of tests passed, failed, and skipped.

Return type:

None

run()[source]

Execute the full test run: start node, run tests, and terminate the node.

Returns:

Tuple with overall test success status, captured test output and a tuple with the number of tests passed, failed, and skipped.

Return type:

tuple[bool, str, tuple[int, int, int]]