Skip to content

runtime

Recording lifecycle, monitoring, and device management.

omgrab.runtime.recording_manager

Recording manager that owns capture device and recording sessions.

RecordingConfig(device_ready_timeout_s=7.5, health_check_interval_s=1.0, session_join_timeout_s=10.0, chunk_length_s=60.0, max_queue_size=400) dataclass

Configuration for the recording manager.

Attributes:

Name Type Description
device_ready_timeout_s float

Max time to wait for the capture device to become ready before starting a recording.

health_check_interval_s float

Interval between device health checks during recording.

session_join_timeout_s float

Timeout for joining finished recording session threads during shutdown.

chunk_length_s float

Duration of each recording chunk in seconds.

max_queue_size int

Maximum size per encoder queue.

RecordingController

Bases: Protocol

Minimal interface for controlling recording start/stop.

Used by StateMachine to decouple from the full RecordingManager.

start_recording()

Start a recording. Returns True on success.

stop_recording()

Stop the active recording.

RecordingManager(devices, target_cameras, stream_names, stream_configs, spool_dir, output_dir, config=None, on_device_unhealthy=None, on_recording_complete=None, sensors=None, sensor_stream_names=None, sensor_stream_configs=None)

Manages recording sessions and capture device lifecycles.

Owns: - Capture devices (opened/closed per recording) - Active recording session (at most one at a time) - Finished sessions pending cleanup

The StateMachine calls start_recording/stop_recording; this class handles all the threading and device management internally.

Initialize the recording manager.

Parameters:

Name Type Description Default
devices list[CaptureDevice]

Capture devices to use for recording. All must be connected for recording to start, and all are monitored for health during recording.

required
target_cameras list[Camera]

List of cameras to capture from.

required
stream_names list[str]

List of stream names, one per camera.

required
stream_configs dict[str, VideoStreamConfig]

Configuration for each video stream.

required
spool_dir Path

Directory to write temporary video chunks.

required
output_dir Path

Directory to write merged final recordings.

required
config Optional[RecordingConfig]

Recording configuration.

None
on_device_unhealthy Optional[OnDeviceUnhealthyCallback]

Optional callback invoked when any device becomes unhealthy during recording.

None
on_recording_complete Optional[OnRecordingCompleteCallback]

Optional callback invoked when a recording has been merged. Called with the path to the merged MKV file.

None
sensors list[Sensor] | None

Optional list of sensors (e.g., IMU).

None
sensor_stream_names list[str] | None

Optional list of sensor stream names, one per sensor.

None
sensor_stream_configs dict[str, DataStreamConfig] | None

Optional configuration for each sensor stream.

None

is_recording property

True if there is an active session currently capturing.

active_recording_id property

Get the ID of the active recording, or None if not recording.

recording_started_at property

Get the start time of the active recording, or None if not recording.

set_on_device_unhealthy(callback)

Set the callback for device unhealthy events.

This allows setting the callback after construction, which is needed when there are circular dependencies (e.g., StateMachine needs RecordingManager and vice versa).

Parameters:

Name Type Description Default
callback Optional[OnDeviceUnhealthyCallback]

Callback to invoke when device becomes unhealthy during recording, or None to clear the callback.

required

set_on_recording_error(callback)

Set the callback for recording error events (e.g. encoder crash).

Parameters:

Name Type Description Default
callback Optional[OnRecordingErrorCallback]

Callback to invoke when a recording error occurs, or None to clear the callback.

required

start_recording()

Start a new recording session.

Opens the capture device, creates a new session with timestamp-based recording name, and starts threads.

Returns:

Type Description
bool

True if recording started successfully, False otherwise.

stop_recording()

Stop the current recording session.

Stops capture threads immediately, then merges chunks into a final MKV file. The session is moved to finished_sessions for later cleanup.

shutdown()

Shutdown the manager: stop active session and join all threads.

omgrab.runtime.recording_session

Recording session that owns capture threads and a parallel writer.

RecordingSession(recording_id, target_cameras, stream_names, stream_configs, spool_dir, start_chunk_callback=None, sensors=None, sensor_stream_names=None, sensor_stream_configs=None, chunk_length_s=60.0, max_queue_size=400, on_error=None)

A single recording session with isolated threads and parallel writer.

Owns: - Per-stream encoder queues (via ChunkedWriter) - Capture threads (one per camera/data source) - Encoder threads + mux thread (via ChunkedWriter)

Lifecycle: - start(): Start capture threads and parallel writer - stop(): Signal capture threads to stop; writer continues draining - join(): Wait for all threads to finish

Initialize the recording session.

Parameters:

Name Type Description Default
recording_id str

Unique ID for this recording (fixed for session lifetime).

required
target_cameras list[Camera]

List of cameras to capture from.

required
stream_names list[str]

List of stream names, one per camera.

required
stream_configs dict[str, VideoStreamConfig]

Configuration for each video stream.

required
spool_dir Path

Directory to write video files.

required
start_chunk_callback Optional[StartChunkCallback]

Optional callback to generate chunk IDs. When None, ChunkedWriter generates sequential 5-digit IDs.

None
sensors list[Sensor] | None

Optional list of sensors (e.g., IMU).

None
sensor_stream_names list[str] | None

Optional list of sensor stream names, one per sensor.

None
sensor_stream_configs dict[str, DataStreamConfig] | None

Optional configuration for each sensor stream.

None
chunk_length_s float

Duration of each recording chunk in seconds.

60.0
max_queue_size int

Maximum size per encoder queue.

400
on_error Optional[Callable[[], None]]

Optional callback invoked when the writer crashes. Called once (even if multiple streams fail). Propagates the error upward so the recording can be stopped.

None

stopped property

True if the session has been stopped.

recording_id property

Get the recording ID for this session.

recording_length_s property

Intended recording duration in seconds (capture start to stop signal).

is_alive property

True if any thread is still running.

start()

Start capture threads and parallel writer.

Raises:

Type Description
RuntimeError

If already started.

stop()

Stop the recording session.

Joins capture threads, then stops the parallel writer (which joins encoder threads and merges the final chunk). Blocks until complete.

join(timeout=None)

Wait for all threads to finish (including writer drain).

Parameters:

Name Type Description Default
timeout float | None

Maximum time to wait in seconds, or None for no timeout.

None

Returns:

Type Description
bool

True if all threads finished, False if timeout expired.

omgrab.runtime.network_monitor

Network connectivity monitor with hysteresis and backoff.

Simplified for SDK use: checks local network and internet reachability. No heartbeats or API probes.

Status

Bases: Enum

Discrete network health states in increasing order of quality.

Config(dns_timeout=1.0, tcp_timeout=1.0, internet_check_host='dns.google', internet_check_port=443, down_after_failures=3, up_after_successes=1, poll_ok_s=5.0, poll_min_s=1.0, poll_max_s=20.0, jitter_frac=0.1) dataclass

Configuration values controlling probes, hysteresis, and backoff.

Attributes:

Name Type Description
dns_timeout float

Timeout in seconds for DNS resolution probes.

tcp_timeout float

Timeout in seconds for TCP connection probes.

internet_check_host str

Hostname used for internet reachability checks.

internet_check_port int

TCP port used for internet reachability checks.

down_after_failures int

Consecutive probe failures before transitioning down.

up_after_successes int

Consecutive probe successes before transitioning up.

poll_ok_s float

Polling interval in seconds when the network is healthy.

poll_min_s float

Minimum polling interval in seconds during backoff.

poll_max_s float

Maximum polling interval in seconds during backoff.

jitter_frac float

Fraction of the polling interval added as random jitter.

Snapshot(status, detail, changed_at) dataclass

Immutable snapshot of the last stable network state.

Attributes:

Name Type Description
status Status

Current network health level.

detail str

Human-readable description of the last probe result.

changed_at float

Monotonic timestamp when the status last changed.

NetworkMonitor(cfg, iface=None)

Stateful network monitor with hysteresis and backoff.

Initialize the monitor with configuration and optional interface pinning.

Parameters:

Name Type Description Default
cfg Config

Configuration for network monitoring.

required
iface Optional[str]

Optional network interface to monitor.

None

snapshot property

Return the current stable snapshot.

wake()

Wake the monitor loop early.

register_on_change_callback(on_network_change)

Register a callback to be called on stable network state changes.

Note: Callbacks must be registered before the monitor is started.

Parameters:

Name Type Description Default
on_network_change Callable[[Snapshot], None]

Callback function to register.

required

Raises:

Type Description
RuntimeError

If the monitor has already started.

start()

Start the monitor thread.

shutdown()

Stop the monitor thread.

check_once()

Run a connectivity check: local network, then internet reachability.

omgrab.runtime.device_status

Centralized device status and system information manager.

This module provides a single source of truth for all device state, system metrics, and operational status. Can be queried by a display module or external consumers.

StateProvider

Bases: Protocol

Minimal interface for querying workflow state.

Used by DeviceStatusManager to decouple from the full StateMachine.

get_current_state()

Get the current workflow state as a string.

StorageInfo(total_bytes, used_bytes, available_bytes, used_percent) dataclass

Storage space information.

Attributes:

Name Type Description
total_bytes int

Total storage capacity in bytes.

used_bytes int

Used storage in bytes.

available_bytes int

Available storage in bytes.

used_percent float

Percentage of storage used (0-100).

CPUInfo(temperature_celsius, usage_percent, usage_per_core) dataclass

CPU metrics.

Attributes:

Name Type Description
temperature_celsius Optional[float]

CPU temperature in Celsius, or None if unavailable.

usage_percent float

Overall CPU usage percentage (0-100).

usage_per_core list[float]

Per-core usage percentages (0-100).

__str__()

Return human-readable CPU status.

MemoryInfo(total_bytes, used_bytes, available_bytes, used_percent) dataclass

Memory usage information.

Attributes:

Name Type Description
total_bytes int

Total RAM in bytes.

used_bytes int

Used RAM in bytes.

available_bytes int

Available RAM in bytes.

used_percent float

Percentage of RAM used (0-100).

__str__()

Return human-readable memory status.

NetworkInfo(status, wifi_ssid, wifi_signal_strength) dataclass

Network connectivity information.

Attributes:

Name Type Description
status str

Connection state, one of 'offline', 'network_only', or 'online'.

wifi_ssid Optional[str]

Connected WiFi network name, or None if not on WiFi.

wifi_signal_strength Optional[int]

Signal strength in dBm (e.g. -62), or None if unavailable.

RecordingInfo(is_recording, recording_id, duration_seconds) dataclass

Active recording information.

Attributes:

Name Type Description
is_recording bool

Whether a recording is currently in progress.

recording_id Optional[str]

Timestamp-based identifier for the active recording, or None.

duration_seconds Optional[float]

Elapsed recording time in seconds, or None if not recording.

BatteryInfo(percent, voltage_v, current_a, power_w, is_charging) dataclass

Battery status information.

Attributes:

Name Type Description
percent float

Battery percentage (0-100).

voltage_v float

Battery voltage in volts.

current_a float

Current draw in amps (positive when charging).

power_w float

Power consumption in watts.

is_charging bool

Whether the battery is currently charging.

__str__()

Return human-readable battery status.

DeviceStatus(device_id, software_version, uptime_seconds, state_machine_state, storage, cpu, memory, network, recording, device_healthy, device_error, battery) dataclass

Complete device status snapshot.

Aggregates all subsystem statuses into a single point-in-time reading, produced by DeviceStatusManager on each update cycle.

Attributes:

Name Type Description
device_id str

Unique identifier for this device.

software_version str

Currently running software version string.

uptime_seconds float

Seconds since the status manager started.

state_machine_state str

Current workflow state (e.g. 'idle', 'recording').

storage StorageInfo

Disk space metrics for the spool directory.

cpu CPUInfo

CPU temperature and usage metrics.

memory MemoryInfo

RAM usage metrics.

network NetworkInfo

Network connectivity and WiFi details.

recording RecordingInfo

Active recording state and duration.

device_healthy bool

Whether the device is operating normally.

device_error Optional[str]

Human-readable error description, or None if healthy.

battery Optional[BatteryInfo]

Battery metrics, or None if no battery hardware is present.

SystemMetricsProvider

Bases: Protocol

Interface for reading system-level hardware and OS metrics.

The default implementation reads from Linux /proc and /sys filesystems. Inject an alternative for testing or non-Linux development hosts.

get_cpu_info()

Get CPU temperature and usage information.

get_memory_info()

Get memory usage information.

get_storage_info(path)

Get storage space information for the given path.

get_wifi_info()

Get WiFi SSID and signal strength in dBm.

DeviceStatusManager(device_id, software_version, spool_base_dir, system_metrics, enable_battery_monitor=True, status_file=None)

Manages and aggregates device status information from multiple sources.

Initialize the device status manager.

Parameters:

Name Type Description Default
device_id str

Unique device identifier.

required
software_version str

Current software version.

required
spool_base_dir Path

Base directory for storage calculations.

required
system_metrics SystemMetricsProvider

Provider for system-level metrics (CPU, memory, storage, WiFi).

required
enable_battery_monitor bool

Whether to enable battery monitoring.

True
status_file Optional[Path]

Path to write status JSON file for external readers.

None

set_state_machine(state_machine)

Set the state machine reference.

set_recording_manager(recording_manager)

Set the recording manager reference.

set_network_monitor(network_monitor)

Set the network monitor reference.

start()

Start the background worker that updates status periodically.

shutdown()

Stop the background worker.

get_status()

Get a complete snapshot of device status.

Returns:

Type Description
DeviceStatus

DeviceStatus dataclass with all current information.

get_status_dict()

Get the cached status dictionary.

The status is updated every second by the background worker. This method returns the most recent cached value for fast access.

Returns:

Type Description
dict

Dictionary representation of device status.

omgrab.runtime.battery_monitor

Battery monitoring module for Waveshare UPS HAT B (INA219).

This module provides battery status information including voltage, current, power consumption, and battery percentage for devices with the Waveshare UPS HAT B.

Based on Waveshare's INA219 example code, adapted for this project's style.

BatteryStatus(voltage_v, current_a, power_w, percent) dataclass

Battery status information from the INA219 sensor.

Attributes:

Name Type Description
voltage_v float

Load voltage in volts.

current_a float

Current draw in amps (positive when charging).

power_w float

Power consumption in watts.

percent float

Estimated battery percentage (0-100).

__str__()

Return human-readable battery status.

BatteryMonitor(i2c_bus=1, i2c_addr=66, min_voltage=6.0, max_voltage=8.4)

Monitor battery status using INA219 chip on Waveshare UPS HAT B.

The INA219 measures voltage, current, and power. Battery percentage is calculated based on voltage (6V = 0%, 8.4V = 100% for typical 2S Li-ion).

Initialize the battery monitor.

Parameters:

Name Type Description Default
i2c_bus int

I2C bus number (default: 1).

1
i2c_addr int

I2C address of INA219 chip (default: 0x42 for UPS HAT B).

66
min_voltage float

Minimum battery voltage for 0% (default: 6.0V).

6.0
max_voltage float

Maximum battery voltage for 100% (default: 8.4V).

8.4

available property

True if battery monitoring hardware is available.

get_status()

Get current battery status.

If the battery hardware is not available, this method will periodically attempt to reconnect (every 30 seconds by default).

Returns:

Type Description
Optional[BatteryStatus]

BatteryStatus with current measurements, or None if battery monitoring

Optional[BatteryStatus]

is not available or read fails.

omgrab.runtime.gpio_manager

GPIO management for button monitoring and connection status.

ButtonConfig(long_press_threshold_s=5.0, double_press_window_s=0.5, check_interval_s=0.02) dataclass

Configuration for button press detection.

Attributes:

Name Type Description
long_press_threshold_s float

Hold duration to trigger WiFi reconfiguration.

double_press_window_s float

Window after a first tap in which a second tap counts as a double-press. Only used in IDLE state.

check_interval_s float

How often to poll the button GPIO (seconds).

button_monitor_loop(gpio_controller, state_machine_obj, config=None, on_wifi_setup_complete=None, on_button_press=None, on_wifi_hotspot_started=None, wifi_manager=None)

Monitor button presses and delegate to state machine.

Short press: Start/stop recording (handled by state machine) Double press (IDLE only): Camera preview (handled by state machine) Long press (5+ seconds): Trigger WiFi reconfiguration portal

Parameters:

Name Type Description Default
gpio_controller GPIOController

GPIO controller instance.

required
state_machine_obj StateMachine

State machine for managing workflow.

required
config Optional[ButtonConfig]

Button detection configuration.

None
on_wifi_setup_complete Optional[Callable[[bool], None]]

Optional callback invoked when WiFi setup completes.

None
on_button_press Optional[Callable[[], None]]

Optional callback invoked on any button press (rising edge).

None
on_wifi_hotspot_started Optional[Callable[[], None]]

Optional callback invoked when WiFi hotspot is started.

None
wifi_manager Optional[WifiManager]

Optional WifiManager instance for WiFi setup. If None, long-press and WiFi cancel are no-ops.

None

omgrab.runtime.wifi_connect

WiFi Connect manager with injectable container management functions.

The WifiManager class owns the WiFi setup lifecycle (state transitions, callbacks) but delegates container operations to injected functions. This allows the SDK to be independent of Docker while applications can provide Docker-specific implementations.

WifiManager(start_container, stop_container, is_running)

Manages WiFi setup lifecycle with injectable container functions.

The manager orchestrates the WiFi provisioning flow: 1. Check if the WiFi setup service is already running. 2. Start it via the injected function. 3. Wait for completion (via the start function's blocking behavior). 4. Invoke a callback when complete.

Container-specific logic (Docker API calls, process management, etc.) is provided by the caller via the constructor arguments.

Initialize the WiFi manager.

Parameters:

Name Type Description Default
start_container Callable[[bool, Optional[Callable[[bool], None]]], bool]

Function to start the WiFi setup service. Signature: (force_mode, callback) -> bool. Should return True if the service was started, False otherwise. The callback (if provided) should be invoked with True/False when the service completes.

required
stop_container Callable[[], bool]

Function to stop the WiFi setup service. Should return True if stopped, False otherwise.

required
is_running Callable[[], bool]

Function to check if the service is currently running. Should return True if running, False otherwise.

required

start_wifi_connect(force_mode=True, callback=None)

Start the WiFi setup service.

Parameters:

Name Type Description Default
force_mode bool

If True, forces the portal to start even if WiFi is connected.

True
callback Optional[Callable[[bool], None]]

Optional callback invoked when WiFi setup completes.

None

Returns:

Type Description
bool

True if the service was started, False otherwise.

stop_wifi_connect()

Stop the WiFi setup service if running.

Returns:

Type Description
bool

True if the service was stopped, False otherwise.

is_wifi_connect_running()

Check if the WiFi setup service is currently running.

Returns:

Type Description
bool

True if running, False otherwise.