import asyncio
import logging
from typing import Dict, Optional
import time
from .scan_operation import ScanOperation
from . import nscan_logger
if nscan_logger is not None:
from codestral_ros2_gen.utils.init_pkg_logger import init_pkg_logger
[docs]
class NetworkScanner:
"""
NetworkScanner performs a complete network scan by:
* Setting scan parameters.
* Utilizing a ScanOperation context manager that configures both send and receive sockets.
* Sending ICMP packets synchronously and then collecting responses asynchronously.
* Formatting and reporting the results upon scan completion.
Example usage:
>>> from codestral_ros2_gen.network_scan.network_scanner import NetworkScanner
>>> scanner = NetworkScanner()
>>> hosts = scanner.scan("192.168.10.0/24")
>>> print(scanner.format_results(hosts, show_all=False))
"""
[docs]
def __init__(
self,
timeout: float = 5.0,
packet_size: int = 64,
sending_interval: float = 0.05,
send_buff_size: int = 65536,
recv_buff_size: int = 131072,
logger: Optional[logging.Logger] = None,
):
"""
Initialize the network scanner with configurable parameters.
Args:
timeout: Timeout for response in seconds.
packet_size: Size of the ICMP echo request packets.
sending_interval: Interval between sending packets.
send_buff_size: Buffer size for sending socket.
recv_buff_size: Buffer size for receiving socket.
logger: Optional logger instance.
"""
self.timeout = timeout
self.packet_size = packet_size
self.sending_interval = sending_interval
self.send_buff_size = send_buff_size
self.recv_buff_size = recv_buff_size
self.scan_start = None
if logger is not None:
self.logger = logger
elif nscan_logger is not None:
self.logger = init_pkg_logger()
self.logger.name = nscan_logger
else:
raise RuntimeError("No logger provided and default logger is not set")
[docs]
def scan(self, targets: str) -> Dict[str, dict]:
"""
Execute a network scan synchronously for the specified targets.
This method uses asyncio.run to run an asynchronous scan operation which:
- Configures the scan environment within a ScanOperation context.
- Synchronously sends ICMP packets via a blocking send socket.
- Asynchronously gathers responses via a non-blocking receive socket.
Args:
targets: Network targets in CIDR format or as comma-separated IP addresses.
Returns:
A dictionary mapping host IP addresses to their scan results (state, response time, error).
"""
try:
self.scan_start = time.time()
self.logger.info(f"Starting network scan for {targets}")
return asyncio.run(self._scan_async(targets))
except Exception as e:
self.logger.error(f"Scan failed: {e}")
# suppress the exception and return an empty results dictionary
return {}
finally:
self.scan_time = time.time() - self.scan_start if self.scan_start else 0
self.logger.info(f"Scan completed in {self.scan_time:.2f} seconds")
self.scan_start = None
[docs]
async def _scan_async(self, targets: str) -> Dict[str, dict]:
"""
Asynchronously execute the network scan operation.
Within the ScanOperation context, this method first configures both send and receive sockets,
then sends ICMP packets synchronously, and finally initiates an asynchronous loop to collect responses.
Args:
targets: Network targets in CIDR or comma-separated IP format.
Returns:
A dictionary of scan results for each target host.
"""
async with ScanOperation(
targets=targets,
scanner=self,
timeout=self.timeout,
packet_size=self.packet_size,
sending_interval=self.sending_interval,
send_buff_size=self.send_buff_size,
recv_buff_size=self.recv_buff_size,
logger=self.logger,
) as operation:
return await operation.execute()