runtime¶
Recording lifecycle, monitoring, and device management.
omgrab.runtime.recording_manager
¶
Recording manager that owns capture device and recording sessions.
RecordingConfig(device_ready_timeout_s=7.5, health_check_interval_s=1.0, session_join_timeout_s=10.0, chunk_length_s=60.0, max_queue_size=400)
dataclass
¶
Configuration for the recording manager.
Attributes:
| Name | Type | Description |
|---|---|---|
device_ready_timeout_s |
float
|
Max time to wait for the capture device to become ready before starting a recording. |
health_check_interval_s |
float
|
Interval between device health checks during recording. |
session_join_timeout_s |
float
|
Timeout for joining finished recording session threads during shutdown. |
chunk_length_s |
float
|
Duration of each recording chunk in seconds. |
max_queue_size |
int
|
Maximum size per encoder queue. |
RecordingController
¶
RecordingManager(devices, target_cameras, stream_names, stream_configs, spool_dir, output_dir, config=None, on_device_unhealthy=None, on_recording_complete=None, sensors=None, sensor_stream_names=None, sensor_stream_configs=None)
¶
Manages recording sessions and capture device lifecycles.
Owns: - Capture devices (opened/closed per recording) - Active recording session (at most one at a time) - Finished sessions pending cleanup
The StateMachine calls start_recording/stop_recording; this class handles all the threading and device management internally.
Initialize the recording manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
devices
|
list[CaptureDevice]
|
Capture devices to use for recording. All must be connected for recording to start, and all are monitored for health during recording. |
required |
target_cameras
|
list[Camera]
|
List of cameras to capture from. |
required |
stream_names
|
list[str]
|
List of stream names, one per camera. |
required |
stream_configs
|
dict[str, VideoStreamConfig]
|
Configuration for each video stream. |
required |
spool_dir
|
Path
|
Directory to write temporary video chunks. |
required |
output_dir
|
Path
|
Directory to write merged final recordings. |
required |
config
|
Optional[RecordingConfig]
|
Recording configuration. |
None
|
on_device_unhealthy
|
Optional[OnDeviceUnhealthyCallback]
|
Optional callback invoked when any device becomes unhealthy during recording. |
None
|
on_recording_complete
|
Optional[OnRecordingCompleteCallback]
|
Optional callback invoked when a recording has been merged. Called with the path to the merged MKV file. |
None
|
sensors
|
list[Sensor] | None
|
Optional list of sensors (e.g., IMU). |
None
|
sensor_stream_names
|
list[str] | None
|
Optional list of sensor stream names, one per sensor. |
None
|
sensor_stream_configs
|
dict[str, DataStreamConfig] | None
|
Optional configuration for each sensor stream. |
None
|
is_recording
property
¶
True if there is an active session currently capturing.
active_recording_id
property
¶
Get the ID of the active recording, or None if not recording.
recording_started_at
property
¶
Get the start time of the active recording, or None if not recording.
set_on_device_unhealthy(callback)
¶
Set the callback for device unhealthy events.
This allows setting the callback after construction, which is needed when there are circular dependencies (e.g., StateMachine needs RecordingManager and vice versa).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Optional[OnDeviceUnhealthyCallback]
|
Callback to invoke when device becomes unhealthy during recording, or None to clear the callback. |
required |
set_on_recording_error(callback)
¶
Set the callback for recording error events (e.g. encoder crash).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Optional[OnRecordingErrorCallback]
|
Callback to invoke when a recording error occurs, or None to clear the callback. |
required |
start_recording()
¶
Start a new recording session.
Opens the capture device, creates a new session with timestamp-based recording name, and starts threads.
Returns:
| Type | Description |
|---|---|
bool
|
True if recording started successfully, False otherwise. |
stop_recording()
¶
Stop the current recording session.
Stops capture threads immediately, then merges chunks into a final MKV file. The session is moved to finished_sessions for later cleanup.
shutdown()
¶
Shutdown the manager: stop active session and join all threads.
omgrab.runtime.recording_session
¶
Recording session that owns capture threads and a parallel writer.
RecordingSession(recording_id, target_cameras, stream_names, stream_configs, spool_dir, start_chunk_callback=None, sensors=None, sensor_stream_names=None, sensor_stream_configs=None, chunk_length_s=60.0, max_queue_size=400, on_error=None)
¶
A single recording session with isolated threads and parallel writer.
Owns: - Per-stream encoder queues (via ChunkedWriter) - Capture threads (one per camera/data source) - Encoder threads + mux thread (via ChunkedWriter)
Lifecycle: - start(): Start capture threads and parallel writer - stop(): Signal capture threads to stop; writer continues draining - join(): Wait for all threads to finish
Initialize the recording session.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
recording_id
|
str
|
Unique ID for this recording (fixed for session lifetime). |
required |
target_cameras
|
list[Camera]
|
List of cameras to capture from. |
required |
stream_names
|
list[str]
|
List of stream names, one per camera. |
required |
stream_configs
|
dict[str, VideoStreamConfig]
|
Configuration for each video stream. |
required |
spool_dir
|
Path
|
Directory to write video files. |
required |
start_chunk_callback
|
Optional[StartChunkCallback]
|
Optional callback to generate chunk IDs. When None, ChunkedWriter generates sequential 5-digit IDs. |
None
|
sensors
|
list[Sensor] | None
|
Optional list of sensors (e.g., IMU). |
None
|
sensor_stream_names
|
list[str] | None
|
Optional list of sensor stream names, one per sensor. |
None
|
sensor_stream_configs
|
dict[str, DataStreamConfig] | None
|
Optional configuration for each sensor stream. |
None
|
chunk_length_s
|
float
|
Duration of each recording chunk in seconds. |
60.0
|
max_queue_size
|
int
|
Maximum size per encoder queue. |
400
|
on_error
|
Optional[Callable[[], None]]
|
Optional callback invoked when the writer crashes. Called once (even if multiple streams fail). Propagates the error upward so the recording can be stopped. |
None
|
stopped
property
¶
True if the session has been stopped.
recording_id
property
¶
Get the recording ID for this session.
recording_length_s
property
¶
Intended recording duration in seconds (capture start to stop signal).
is_alive
property
¶
True if any thread is still running.
start()
¶
Start capture threads and parallel writer.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If already started. |
stop()
¶
Stop the recording session.
Joins capture threads, then stops the parallel writer (which joins encoder threads and merges the final chunk). Blocks until complete.
join(timeout=None)
¶
Wait for all threads to finish (including writer drain).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Maximum time to wait in seconds, or None for no timeout. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if all threads finished, False if timeout expired. |
omgrab.runtime.network_monitor
¶
Network connectivity monitor with hysteresis and backoff.
Simplified for SDK use: checks local network and internet reachability. No heartbeats or API probes.
Status
¶
Bases: Enum
Discrete network health states in increasing order of quality.
Config(dns_timeout=1.0, tcp_timeout=1.0, internet_check_host='dns.google', internet_check_port=443, down_after_failures=3, up_after_successes=1, poll_ok_s=5.0, poll_min_s=1.0, poll_max_s=20.0, jitter_frac=0.1)
dataclass
¶
Configuration values controlling probes, hysteresis, and backoff.
Attributes:
| Name | Type | Description |
|---|---|---|
dns_timeout |
float
|
Timeout in seconds for DNS resolution probes. |
tcp_timeout |
float
|
Timeout in seconds for TCP connection probes. |
internet_check_host |
str
|
Hostname used for internet reachability checks. |
internet_check_port |
int
|
TCP port used for internet reachability checks. |
down_after_failures |
int
|
Consecutive probe failures before transitioning down. |
up_after_successes |
int
|
Consecutive probe successes before transitioning up. |
poll_ok_s |
float
|
Polling interval in seconds when the network is healthy. |
poll_min_s |
float
|
Minimum polling interval in seconds during backoff. |
poll_max_s |
float
|
Maximum polling interval in seconds during backoff. |
jitter_frac |
float
|
Fraction of the polling interval added as random jitter. |
Snapshot(status, detail, changed_at)
dataclass
¶
Immutable snapshot of the last stable network state.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
Status
|
Current network health level. |
detail |
str
|
Human-readable description of the last probe result. |
changed_at |
float
|
Monotonic timestamp when the status last changed. |
NetworkMonitor(cfg, iface=None)
¶
Stateful network monitor with hysteresis and backoff.
Initialize the monitor with configuration and optional interface pinning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cfg
|
Config
|
Configuration for network monitoring. |
required |
iface
|
Optional[str]
|
Optional network interface to monitor. |
None
|
snapshot
property
¶
Return the current stable snapshot.
wake()
¶
Wake the monitor loop early.
register_on_change_callback(on_network_change)
¶
Register a callback to be called on stable network state changes.
Note: Callbacks must be registered before the monitor is started.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_network_change
|
Callable[[Snapshot], None]
|
Callback function to register. |
required |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the monitor has already started. |
start()
¶
Start the monitor thread.
shutdown()
¶
Stop the monitor thread.
check_once()
¶
Run a connectivity check: local network, then internet reachability.
omgrab.runtime.device_status
¶
Centralized device status and system information manager.
This module provides a single source of truth for all device state, system metrics, and operational status. Can be queried by a display module or external consumers.
StateProvider
¶
Bases: Protocol
Minimal interface for querying workflow state.
Used by DeviceStatusManager to decouple from the full StateMachine.
get_current_state()
¶
Get the current workflow state as a string.
StorageInfo(total_bytes, used_bytes, available_bytes, used_percent)
dataclass
¶
Storage space information.
Attributes:
| Name | Type | Description |
|---|---|---|
total_bytes |
int
|
Total storage capacity in bytes. |
used_bytes |
int
|
Used storage in bytes. |
available_bytes |
int
|
Available storage in bytes. |
used_percent |
float
|
Percentage of storage used (0-100). |
CPUInfo(temperature_celsius, usage_percent, usage_per_core)
dataclass
¶
CPU metrics.
Attributes:
| Name | Type | Description |
|---|---|---|
temperature_celsius |
Optional[float]
|
CPU temperature in Celsius, or None if unavailable. |
usage_percent |
float
|
Overall CPU usage percentage (0-100). |
usage_per_core |
list[float]
|
Per-core usage percentages (0-100). |
__str__()
¶
Return human-readable CPU status.
MemoryInfo(total_bytes, used_bytes, available_bytes, used_percent)
dataclass
¶
Memory usage information.
Attributes:
| Name | Type | Description |
|---|---|---|
total_bytes |
int
|
Total RAM in bytes. |
used_bytes |
int
|
Used RAM in bytes. |
available_bytes |
int
|
Available RAM in bytes. |
used_percent |
float
|
Percentage of RAM used (0-100). |
__str__()
¶
Return human-readable memory status.
NetworkInfo(status, wifi_ssid, wifi_signal_strength)
dataclass
¶
Network connectivity information.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
str
|
Connection state, one of 'offline', 'network_only', or 'online'. |
wifi_ssid |
Optional[str]
|
Connected WiFi network name, or None if not on WiFi. |
wifi_signal_strength |
Optional[int]
|
Signal strength in dBm (e.g. -62), or None if unavailable. |
RecordingInfo(is_recording, recording_id, duration_seconds)
dataclass
¶
Active recording information.
Attributes:
| Name | Type | Description |
|---|---|---|
is_recording |
bool
|
Whether a recording is currently in progress. |
recording_id |
Optional[str]
|
Timestamp-based identifier for the active recording, or None. |
duration_seconds |
Optional[float]
|
Elapsed recording time in seconds, or None if not recording. |
BatteryInfo(percent, voltage_v, current_a, power_w, is_charging)
dataclass
¶
Battery status information.
Attributes:
| Name | Type | Description |
|---|---|---|
percent |
float
|
Battery percentage (0-100). |
voltage_v |
float
|
Battery voltage in volts. |
current_a |
float
|
Current draw in amps (positive when charging). |
power_w |
float
|
Power consumption in watts. |
is_charging |
bool
|
Whether the battery is currently charging. |
__str__()
¶
Return human-readable battery status.
DeviceStatus(device_id, software_version, uptime_seconds, state_machine_state, storage, cpu, memory, network, recording, device_healthy, device_error, battery)
dataclass
¶
Complete device status snapshot.
Aggregates all subsystem statuses into a single point-in-time reading, produced by DeviceStatusManager on each update cycle.
Attributes:
| Name | Type | Description |
|---|---|---|
device_id |
str
|
Unique identifier for this device. |
software_version |
str
|
Currently running software version string. |
uptime_seconds |
float
|
Seconds since the status manager started. |
state_machine_state |
str
|
Current workflow state (e.g. 'idle', 'recording'). |
storage |
StorageInfo
|
Disk space metrics for the spool directory. |
cpu |
CPUInfo
|
CPU temperature and usage metrics. |
memory |
MemoryInfo
|
RAM usage metrics. |
network |
NetworkInfo
|
Network connectivity and WiFi details. |
recording |
RecordingInfo
|
Active recording state and duration. |
device_healthy |
bool
|
Whether the device is operating normally. |
device_error |
Optional[str]
|
Human-readable error description, or None if healthy. |
battery |
Optional[BatteryInfo]
|
Battery metrics, or None if no battery hardware is present. |
SystemMetricsProvider
¶
Bases: Protocol
Interface for reading system-level hardware and OS metrics.
The default implementation reads from Linux /proc and /sys filesystems. Inject an alternative for testing or non-Linux development hosts.
DeviceStatusManager(device_id, software_version, spool_base_dir, system_metrics, enable_battery_monitor=True, status_file=None)
¶
Manages and aggregates device status information from multiple sources.
Initialize the device status manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
device_id
|
str
|
Unique device identifier. |
required |
software_version
|
str
|
Current software version. |
required |
spool_base_dir
|
Path
|
Base directory for storage calculations. |
required |
system_metrics
|
SystemMetricsProvider
|
Provider for system-level metrics (CPU, memory, storage, WiFi). |
required |
enable_battery_monitor
|
bool
|
Whether to enable battery monitoring. |
True
|
status_file
|
Optional[Path]
|
Path to write status JSON file for external readers. |
None
|
set_state_machine(state_machine)
¶
Set the state machine reference.
set_recording_manager(recording_manager)
¶
Set the recording manager reference.
set_network_monitor(network_monitor)
¶
Set the network monitor reference.
start()
¶
Start the background worker that updates status periodically.
shutdown()
¶
Stop the background worker.
get_status()
¶
Get a complete snapshot of device status.
Returns:
| Type | Description |
|---|---|
DeviceStatus
|
DeviceStatus dataclass with all current information. |
get_status_dict()
¶
Get the cached status dictionary.
The status is updated every second by the background worker. This method returns the most recent cached value for fast access.
Returns:
| Type | Description |
|---|---|
dict
|
Dictionary representation of device status. |
omgrab.runtime.battery_monitor
¶
Battery monitoring module for Waveshare UPS HAT B (INA219).
This module provides battery status information including voltage, current, power consumption, and battery percentage for devices with the Waveshare UPS HAT B.
Based on Waveshare's INA219 example code, adapted for this project's style.
BatteryStatus(voltage_v, current_a, power_w, percent)
dataclass
¶
Battery status information from the INA219 sensor.
Attributes:
| Name | Type | Description |
|---|---|---|
voltage_v |
float
|
Load voltage in volts. |
current_a |
float
|
Current draw in amps (positive when charging). |
power_w |
float
|
Power consumption in watts. |
percent |
float
|
Estimated battery percentage (0-100). |
__str__()
¶
Return human-readable battery status.
BatteryMonitor(i2c_bus=1, i2c_addr=66, min_voltage=6.0, max_voltage=8.4)
¶
Monitor battery status using INA219 chip on Waveshare UPS HAT B.
The INA219 measures voltage, current, and power. Battery percentage is calculated based on voltage (6V = 0%, 8.4V = 100% for typical 2S Li-ion).
Initialize the battery monitor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
i2c_bus
|
int
|
I2C bus number (default: 1). |
1
|
i2c_addr
|
int
|
I2C address of INA219 chip (default: 0x42 for UPS HAT B). |
66
|
min_voltage
|
float
|
Minimum battery voltage for 0% (default: 6.0V). |
6.0
|
max_voltage
|
float
|
Maximum battery voltage for 100% (default: 8.4V). |
8.4
|
available
property
¶
True if battery monitoring hardware is available.
get_status()
¶
Get current battery status.
If the battery hardware is not available, this method will periodically attempt to reconnect (every 30 seconds by default).
Returns:
| Type | Description |
|---|---|
Optional[BatteryStatus]
|
BatteryStatus with current measurements, or None if battery monitoring |
Optional[BatteryStatus]
|
is not available or read fails. |
omgrab.runtime.gpio_manager
¶
GPIO management for button monitoring and connection status.
ButtonConfig(long_press_threshold_s=5.0, double_press_window_s=0.5, check_interval_s=0.02)
dataclass
¶
Configuration for button press detection.
Attributes:
| Name | Type | Description |
|---|---|---|
long_press_threshold_s |
float
|
Hold duration to trigger WiFi reconfiguration. |
double_press_window_s |
float
|
Window after a first tap in which a second tap counts as a double-press. Only used in IDLE state. |
check_interval_s |
float
|
How often to poll the button GPIO (seconds). |
button_monitor_loop(gpio_controller, state_machine_obj, config=None, on_wifi_setup_complete=None, on_button_press=None, on_wifi_hotspot_started=None, wifi_manager=None)
¶
Monitor button presses and delegate to state machine.
Short press: Start/stop recording (handled by state machine) Double press (IDLE only): Camera preview (handled by state machine) Long press (5+ seconds): Trigger WiFi reconfiguration portal
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gpio_controller
|
GPIOController
|
GPIO controller instance. |
required |
state_machine_obj
|
StateMachine
|
State machine for managing workflow. |
required |
config
|
Optional[ButtonConfig]
|
Button detection configuration. |
None
|
on_wifi_setup_complete
|
Optional[Callable[[bool], None]]
|
Optional callback invoked when WiFi setup completes. |
None
|
on_button_press
|
Optional[Callable[[], None]]
|
Optional callback invoked on any button press (rising edge). |
None
|
on_wifi_hotspot_started
|
Optional[Callable[[], None]]
|
Optional callback invoked when WiFi hotspot is started. |
None
|
wifi_manager
|
Optional[WifiManager]
|
Optional WifiManager instance for WiFi setup. If None, long-press and WiFi cancel are no-ops. |
None
|
omgrab.runtime.wifi_connect
¶
WiFi Connect manager with injectable container management functions.
The WifiManager class owns the WiFi setup lifecycle (state transitions, callbacks) but delegates container operations to injected functions. This allows the SDK to be independent of Docker while applications can provide Docker-specific implementations.
WifiManager(start_container, stop_container, is_running)
¶
Manages WiFi setup lifecycle with injectable container functions.
The manager orchestrates the WiFi provisioning flow: 1. Check if the WiFi setup service is already running. 2. Start it via the injected function. 3. Wait for completion (via the start function's blocking behavior). 4. Invoke a callback when complete.
Container-specific logic (Docker API calls, process management, etc.) is provided by the caller via the constructor arguments.
Initialize the WiFi manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_container
|
Callable[[bool, Optional[Callable[[bool], None]]], bool]
|
Function to start the WiFi setup service. Signature: (force_mode, callback) -> bool. Should return True if the service was started, False otherwise. The callback (if provided) should be invoked with True/False when the service completes. |
required |
stop_container
|
Callable[[], bool]
|
Function to stop the WiFi setup service. Should return True if stopped, False otherwise. |
required |
is_running
|
Callable[[], bool]
|
Function to check if the service is currently running. Should return True if running, False otherwise. |
required |
start_wifi_connect(force_mode=True, callback=None)
¶
Start the WiFi setup service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
force_mode
|
bool
|
If True, forces the portal to start even if WiFi is connected. |
True
|
callback
|
Optional[Callable[[bool], None]]
|
Optional callback invoked when WiFi setup completes. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the service was started, False otherwise. |
stop_wifi_connect()
¶
Stop the WiFi setup service if running.
Returns:
| Type | Description |
|---|---|
bool
|
True if the service was stopped, False otherwise. |
is_wifi_connect_running()
¶
Check if the WiFi setup service is currently running.
Returns:
| Type | Description |
|---|---|
bool
|
True if running, False otherwise. |