Architecture
Detailed software architecture of the AV2.
System Overview
The AV2 is built as a modular, single-process Python application with multiple threads for sensor acquisition and visualization.
Design Principles
Separation of Concerns: Each module handles one responsibility
Thread Safety: Shared state protected by locks
Configurability: Parameters in YAML/code, not hardcoded
Testability: Components can be tested in isolation
Module Hierarchy
utm-navigator/
│
├── runner.py # Main entry point and control loop
│
├── Sensors
│ ├── xsens_class.py # GPS/IMU interface
│ └── sensors/
│ ├── sensor_interface.py # Abstract base class
│ ├── lidar_interface.py # Velodyne LIDAR
│ └── camera_interface.py # RGB/RGBD cameras
│
├── Perception
│ └── perception/
│ ├── occupancy_grid.py # Bayesian grid mapping
│ ├── state.py # Thread-safe state container
│ ├── core_types.py # Data structures
│ ├── preprocessing/
│ │ ├── ipm_processor.py # Inverse perspective mapping
│ │ └── yolopv2_adapter.py # Deep learning detector
│ └── visualization/
│ ├── occupancy_visualizer.py
│ └── open3d_viewer.py
│
├── Planning
│ ├── osmnx_class.py # Route planning
│ └── planning/
│ └── dwa_planner.py # Dynamic Window Approach
│
├── Control
│ ├── pure_pursuit_controller.py
│ └── vehicle_actuator.py
│
└── Configuration
└── config/
└── dwa_config.yaml
Main Control Loop
The AutonomousRunner class in runner.py orchestrates all components:
class AutonomousRunner:
def __init__(self):
self.vehicle_state = VehicleState()
self.gps = None
self.navigator = None
self.controller = None
self.actuator = None
def setup_gps(self, require_rtk=True, rtk_timeout=60):
"""Initialize GPS/IMU and wait for fix."""
self.gps = XsensReceiver()
# Wait for RTK fix...
def setup_navigation(self, dest_lat, dest_lon):
"""Plan route to destination."""
self.navigator = Navigator()
self.waypoints = self.navigator.plan_route(
start=(current_lat, current_lon),
end=(dest_lat, dest_lon)
)
def setup_controller(self):
"""Initialize steering controller."""
self.controller = PurePursuitController(wheel_base=1.23)
self.actuator = VehicleActuator()
def control_loop(self):
"""Main control iteration."""
# 1. Get current state
position = self.gps.get_position()
heading = self.gps.get_heading()
speed = self.gps.get_speed()
# 2. Transform waypoints to vehicle frame
local_waypoints = self.transform_waypoints(
self.waypoints, position, heading
)
# 3. Compute steering
steering = self.controller.compute_steering(
local_waypoints, speed
)
# 4. Send commands
self.actuator.set_steering(steering)
self.actuator.set_throttle(target_throttle)
def run(self, update_hz=50):
"""Run control loop at specified rate."""
dt = 1.0 / update_hz
while self.running:
start = time.time()
self.control_loop()
elapsed = time.time() - start
if elapsed < dt:
time.sleep(dt - elapsed)
Thread-Safe State
VehicleState
Thread-safe container for vehicle state:
class VehicleState:
def __init__(self):
self._lock = threading.Lock()
self._position = (0.0, 0.0) # (x, y) in UTM
self._heading = 0.0 # radians from North
self._speed = 0.0 # m/s
def update(self, position, heading, speed):
with self._lock:
self._position = position
self._heading = heading
self._speed = speed
def get_state(self):
with self._lock:
return (self._position, self._heading, self._speed)
PerceptionState
Thread-safe container for perception data:
class PerceptionState:
def __init__(self):
self._lock = threading.Lock()
self._occupancy_grid = None
self._detections = []
self._lane_offset = 0.0
def update_grid(self, grid):
with self._lock:
self._occupancy_grid = grid.copy()
def get_grid(self):
with self._lock:
return self._occupancy_grid.copy() if self._occupancy_grid else None
Sensor Abstraction
All sensors implement a common interface:
class SensorInterface(ABC):
@abstractmethod
def connect(self) -> bool:
"""Connect to sensor hardware."""
pass
@abstractmethod
def disconnect(self) -> None:
"""Disconnect from sensor."""
pass
@abstractmethod
def is_connected(self) -> bool:
"""Check connection status."""
pass
@abstractmethod
def get_data(self) -> Any:
"""Get latest sensor data."""
pass
This allows easy substitution of sensors (e.g., different LIDAR models).
Configuration System
YAML Configuration
Complex parameters are stored in YAML files:
# config/dwa_config.yaml
vehicle:
max_speed: 5.0
max_yaw_rate: 1.0
robot_radius: 0.8
costs:
goal_cost_gain: 0.2
obstacle_cost_gain: 1.0
Loading configuration:
import yaml
with open('config/dwa_config.yaml') as f:
config = yaml.safe_load(f)
planner = DWAPlanner(
max_speed=config['vehicle']['max_speed'],
robot_radius=config['vehicle']['robot_radius']
)
Dataclasses
Configuration containers use dataclasses:
from dataclasses import dataclass
@dataclass
class DWAConfig:
max_speed: float = 5.0
max_yaw_rate: float = 1.0
max_accel: float = 2.0
robot_radius: float = 0.8
dt: float = 0.1
predict_time: float = 3.0
Error Handling
Sensor Failures
try:
position = self.gps.get_position()
if position is None:
raise SensorError("GPS position unavailable")
except SensorError as e:
self.logger.warning(f"Sensor error: {e}")
self.enter_safe_mode()
Safe Mode
When errors occur, the system enters safe mode:
Reduce speed to minimum
Apply brakes gradually
Alert operator
Attempt sensor recovery
Logging
The system uses Python’s logging module:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('av2')
logger.info("Starting autonomous navigation")
logger.warning("RTK fix lost, using standard GPS")
logger.error("LIDAR communication timeout")
CSV data logging for analysis:
class ControlLogger:
def __init__(self, filename):
self.file = open(filename, 'w')
self.writer = csv.writer(self.file)
self.writer.writerow(['time', 'x', 'y', 'heading', 'speed', 'steering'])
def log(self, state, steering):
self.writer.writerow([
time.time(),
state.x, state.y,
state.heading,
state.speed,
steering
])
Performance Considerations
Timing
Control loop target: 50-100 Hz
Perception processing: 10-30 Hz
Visualization: 10-30 Hz (optional)
Bottlenecks
YOLOPv2 inference: GPU-bound, ~50ms per frame
LIDAR processing: CPU-bound point cloud operations
Visualization: Can be disabled for performance
Optimization Tips
Use NumPy vectorized operations
Profile with
cProfileto find hotspotsDisable visualization during performance-critical tests
Consider TensorRT for YOLOPv2 optimization