Sensors

Software interfaces for AV2 sensors.

Overview

Each sensor has a dedicated interface class that handles:

  • Connection management

  • Data acquisition

  • Thread-safe access

  • Error handling

Sensor Interface Base Class

All sensors implement this abstract interface:

from abc import ABC, abstractmethod

class SensorInterface(ABC):
    @abstractmethod
    def connect(self) -> bool:
        """Connect to sensor. Returns True on success."""
        pass

    @abstractmethod
    def disconnect(self) -> None:
        """Disconnect from sensor."""
        pass

    @abstractmethod
    def is_connected(self) -> bool:
        """Check if sensor is connected."""
        pass

    @abstractmethod
    def get_data(self) -> Any:
        """Get latest sensor data."""
        pass

Xsens GPS/IMU Interface

The XsensReceiver class provides position, orientation, and velocity data.

Initialization

from xsens_class import XsensReceiver

# Auto-detect Xsens device
gps = XsensReceiver()

# Or specify port
gps = XsensReceiver(port='/dev/ttyUSB0')

Available Methods

# Position (WGS84)
lat, lon, alt = gps.get_position()

# Position (UTM meters)
x, y = gps.get_utm_position()

# Heading (radians from North, clockwise positive)
heading = gps.get_heading()

# Velocity (m/s)
speed = gps.get_speed()
vx, vy, vz = gps.get_velocity_enu()

# Orientation (Euler angles in degrees)
roll, pitch, yaw = gps.get_orientation()

# Orientation (quaternion)
qw, qx, qy, qz = gps.get_quaternion()

# Angular velocity (rad/s)
wx, wy, wz = gps.get_angular_velocity()

# Acceleration (m/s²)
ax, ay, az = gps.get_acceleration()

# RTK status
status = gps.get_rtk_status()  # "None", "Float", "Fixed"

Threading Model

The XsensReceiver runs a background thread for packet retrieval:

class XsensReceiver:
    def __init__(self):
        self._lock = threading.Lock()
        self._data_buffer = deque(maxlen=5)
        self._running = True
        self._thread = threading.Thread(target=self._receive_loop)
        self._thread.start()

    def _receive_loop(self):
        while self._running:
            packet = self.device.read_packet()
            with self._lock:
                self._data_buffer.append(packet)

    def get_position(self):
        with self._lock:
            if self._data_buffer:
                return self._data_buffer[-1].position
            return None

RTK Wait

Wait for RTK fix before starting navigation:

def wait_for_rtk(gps, timeout=60):
    start = time.time()
    while time.time() - start < timeout:
        status = gps.get_rtk_status()
        if status == "Fixed":
            return True
        time.sleep(1)
    return False

Velodyne LIDAR Interface

The VelodyneLIDAR class captures 3D point clouds.

Initialization

from sensors.lidar_interface import VelodyneLIDAR

lidar = VelodyneLIDAR(
    host='192.168.1.201',  # LIDAR IP
    port=2368              # Data port
)
lidar.connect()

Getting Point Cloud

# Get latest scan (Nx3 numpy array)
points = lidar.get_scan()

# Points format: [[x, y, z], [x, y, z], ...]
# Coordinates in LIDAR frame (meters)

# Get points with intensity
points, intensities = lidar.get_scan_with_intensity()

Implementation

class VelodyneLIDAR(SensorInterface):
    def __init__(self, host='192.168.1.201', port=2368):
        self.host = host
        self.port = port
        self._lock = threading.Lock()
        self._current_scan = None

    def connect(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.bind(('', self.port))
        self._running = True
        self._thread = threading.Thread(target=self._receive_loop)
        self._thread.start()
        return True

    def _receive_loop(self):
        while self._running:
            data, _ = self.socket.recvfrom(1206)
            points = velodyne_decoder.decode(data)
            with self._lock:
                self._current_scan = points

    def get_scan(self):
        with self._lock:
            return self._current_scan.copy() if self._current_scan else None

Camera Interfaces

RGB Camera

from sensors.camera_interface import RGBCamera

camera = RGBCamera(device_id=0)  # /dev/video0
camera.connect()

# Get frame
frame = camera.get_frame()  # numpy array (H, W, 3) BGR

# Get with timestamp
frame, timestamp = camera.get_frame_with_timestamp()

RGB-D Camera (RealSense)

from sensors.camera_interface import RGBDCamera

camera = RGBDCamera()
camera.connect()

# Get RGB frame
color = camera.get_color_frame()

# Get depth frame
depth = camera.get_depth_frame()  # (H, W) uint16, mm

# Get aligned frames
color, depth = camera.get_aligned_frames()

# Get intrinsics
intrinsics = camera.get_intrinsics()
# Returns: fx, fy, cx, cy

Camera Frame Dataclass

@dataclass
class CameraFrame:
    image: np.ndarray          # RGB image
    depth: Optional[np.ndarray]  # Depth map (if available)
    timestamp: float
    intrinsics: CameraIntrinsics

@dataclass
class CameraIntrinsics:
    fx: float  # Focal length x
    fy: float  # Focal length y
    cx: float  # Principal point x
    cy: float  # Principal point y
    width: int
    height: int

Camera Adapter

Standardizes output from different cameras:

from perception.camera_adapter import CameraAdapter

adapter = CameraAdapter(camera)
frame = adapter.get_standard_frame()
# Returns CameraFrame with consistent format

Sensor Fusion Example

Combining data from multiple sensors:

class SensorManager:
    def __init__(self):
        self.gps = XsensReceiver()
        self.lidar = VelodyneLIDAR()
        self.camera = RGBDCamera()

    def connect_all(self):
        self.gps.connect()
        self.lidar.connect()
        self.camera.connect()

    def get_synchronized_data(self):
        """Get data from all sensors."""
        timestamp = time.time()

        position = self.gps.get_position()
        heading = self.gps.get_heading()
        points = self.lidar.get_scan()
        frame = self.camera.get_frame()

        return {
            'timestamp': timestamp,
            'position': position,
            'heading': heading,
            'lidar': points,
            'camera': frame
        }