Skip to content

sensors

Sensor abstractions for non-video timestamped data such as IMU readings.

omgrab.sensors.sensor

Sensor abstraction for non-video timestamped data (e.g., IMU readings).

SensorDataUnavailableError

Bases: RuntimeError

Exception raised when sensor data is not available within the timeout.

Sensor

Bases: ABC

Source of timestamped data payloads (e.g., serialized sensor readings).

get_next_item(timeout_s=None) abstractmethod

Get the next data payload.

Parameters:

Name Type Description Default
timeout_s Optional[float]

Maximum time to wait in seconds, or None to block.

None

Returns:

Type Description
tuple[bytes, datetime]

Tuple of (data_bytes, timestamp).

Raises:

Type Description
SensorDataUnavailableError

If no data is available within the timeout.

setup() abstractmethod

Setup the sensor.

close() abstractmethod

Close the sensor.

__enter__()

Enter the context manager.

__exit__(exc_type, exc_value, traceback)

Exit the context manager.

omgrab.sensors.queue_reader_sensor

Queue-based sensor that reads timestamped bytes payloads from a queue.

QueueReaderSensor(data_queue)

Bases: Sensor

Sensor that reads timestamped bytes payloads from a queue.

Initialize the queue reader sensor.

Parameters:

Name Type Description Default
data_queue Queue[tuple[bytes, datetime]]

Queue providing (data_bytes, timestamp) tuples.

required

setup()

No-op for queue-based sensor.

close()

No-op for queue-based sensor.

get_next_item(timeout_s=None)

Get the next data payload from the queue.

flush_queue()

Flush all pending items from the queue.

Returns:

Type Description
int

Number of items flushed.