Network scanner - Scan Operation

class codestral_ros2_gen.network_scan.scan_operation.ScanOperation(targets, scanner=None, timeout=5.0, packet_size=64, sending_interval=0.05, send_buff_size=65536, recv_buff_size=131072, logger=None)[source]

Bases: object

Context manager for a single network scan operation.

This class manages the lifecycle of a network scan, including the configuration of sockets, creation of NetworkHost objects, sending of ICMP packets, and collection of responses. It uses sync blocking socket for sending and asyncio non-blocking operations for receiving packets.

Parameters:
  • targets (str)

  • timeout (float)

  • packet_size (int)

  • sending_interval (float)

  • send_buff_size (int)

  • recv_buff_size (int)

  • logger (Logger | None)

__init__(targets, scanner=None, timeout=5.0, packet_size=64, sending_interval=0.05, send_buff_size=65536, recv_buff_size=131072, logger=None)[source]

Initialize the ScanOperation.

Parameters:
  • targets (str) – The network targets to scan.

  • scanner (Optional[object]) – An optional scanner object with configuration attributes.

  • timeout (float) – The timeout for the scan operation.

  • packet_size (int) – The size of the ICMP packets to send.

  • sending_interval (float) – The interval between sending packets.

  • send_buff_size (int) – The size of the send buffer.

  • recv_buff_size (int) – The size of the receive buffer.

  • logger (Optional[logging.Logger]) – The logger to use for logging messages.

_configure_sockets()[source]

Configure send and receive sockets with optimal settings for Linux.

This method sets up the send and receive sockets with appropriate options for ICMP communication.

_close_sockets()[source]

Properly clean up and close sockets.

This method ensures that the sockets are closed properly, clearing any remaining data and shutting down the sockets.

Return type:

None

_create_hosts()[source]

Create NetworkHost objects for the given targets.

This method parses the network targets and creates NetworkHost objects for each target IP address.

Return type:

None

async execute()[source]

Execute the scan operation and return results.

This method orchestrates the sending of ICMP packets and the collection of responses. It logs the progress and handles any errors that occur during the scan.

Returns:

A dictionary containing the scan results for each target IP address.

Return type:

Dict[str, dict]

_send_packets()[source]

Send ICMP echo request packets to all hosts.

This method sends ICMP echo request packets to each target IP address and logs the progress.

Return type:

None

async _collect_responses()[source]

Asynchronously collect ICMP responses with a timeout.

This method collects ICMP responses from the target IP addresses until the timeout is reached. It uses asyncio to achieve non-blocking behavior.

Raises:

RuntimeError – If the response collection fails.

Return type:

None

async _receive_packet(sock, size)[source]

Helper to get both packet and address with asyncio.

This method sets up a callback to receive packets asynchronously using asyncio’s event loop.

Parameters:
  • sock (socket.socket) – The socket to receive packets from.

  • size (int) – The size of the buffer to use for receiving packets.

Returns:

A tuple containing the received packet and the address of the sender.

Return type:

tuple

_prepare_results()[source]

Prepare the scan results.

This method prepares the scan results by marking any remaining hosts as unresponsive and formatting the results.

Returns:

A dictionary containing the scan results for each target IP address.

Return type:

Dict[str, dict]