recording¶
Stream configuration, chunked MKV writing, and stream merging.
omgrab.recording.stream_configs
¶
Stream configuration dataclasses for video and data streams.
VideoStreamConfig(width, height, fps, codec='libx264', bitrate=2000000, input_pixel_format='rgb24', output_pixel_format='yuv420p', stream_options=dict(), metadata=dict())
dataclass
¶
Configuration for a video stream.
Attributes:
| Name | Type | Description |
|---|---|---|
width |
int
|
Frame width in pixels. |
height |
int
|
Frame height in pixels. |
fps |
float | Fraction
|
Target frames per second. |
codec |
str
|
Video codec name (e.g. 'libx264', 'libx265'). |
bitrate |
int
|
Target bitrate in bits per second. |
input_pixel_format |
str
|
Pixel format of incoming frames (e.g. 'rgb24'). |
output_pixel_format |
str
|
Pixel format for the encoded stream (e.g. 'yuv420p'). |
stream_options |
dict[str, str]
|
Additional codec options passed to the encoder. |
metadata |
dict[str, Any]
|
Arbitrary metadata stored in the MKV stream header. |
DataStreamConfig(codec='ass', metadata=dict())
dataclass
¶
Configuration for a non-video data stream stored as an MKV subtitle track.
Attributes:
| Name | Type | Description |
|---|---|---|
codec |
str
|
Subtitle codec name (e.g. 'ass'). |
metadata |
dict[str, Any]
|
Arbitrary metadata stored in the MKV stream header. |
omgrab.recording.chunked_writer
¶
Chunked video writer with per-stream parallel encoding.
ChunkedWriter(name, output_directory, stream_configs, start_chunk_callback=None, sensor_stream_configs=None, chunk_length_s=60.0, max_encoder_queue_size=200, on_error=None)
¶
Chunked video writer with per-stream parallel encoding.
Each stream gets its own encoder thread AND its own single-stream MKV container. Chunks are rotated after chunk_length_s seconds of recording time, measured from the first frame's timestamp.
All streams share a common timestamp origin per chunk so PTS values are directly comparable across .part files. When the first frame arrives on any stream, its timestamp becomes the origin for the first chunk. On rotation, the origin advances by exactly chunk_length_s.
At finalization (chunk boundary or stop), per-stream .mkv.part files are merged into a single multi-stream MKV via ffmpeg -c copy.
Rotation protocol (barrier with generation counter): 1. Any encoder detects a frame whose timestamp exceeds the current chunk's end boundary and sets rotation_requested. 2. All encoders flush their encoder, close their container, and enter the rotation barrier. 3. The last encoder to arrive merges the per-stream files, advances the chunk origin, creates new per-stream containers, and advances the generation. 4. All encoders resume with new encoder/container objects.
Initialize the chunked writer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name for this writer (used in log messages and thread names). |
required |
output_directory
|
Path
|
Directory to write video files. |
required |
stream_configs
|
dict[str, VideoStreamConfig]
|
Dictionary mapping stream names to their config. |
required |
start_chunk_callback
|
Optional[StartChunkCallback]
|
Optional callback to generate chunk IDs. Signature: (name, started_at, file_extension) -> chunk_id. When None, sequential 5-digit IDs are generated automatically. |
None
|
sensor_stream_configs
|
Optional[dict[str, DataStreamConfig]]
|
Optional sensor stream configurations. |
None
|
chunk_length_s
|
float
|
Chunk duration in seconds. The first chunk starts when the first frame arrives; subsequent chunks advance by this amount. |
60.0
|
max_encoder_queue_size
|
int
|
Max queue size per encoder thread. |
200
|
on_error
|
Optional[Callable[[str], None]]
|
Optional callback invoked when an encoder thread crashes. Called with the stream name that crashed. Fired on a separate daemon thread to avoid deadlock. |
None
|
get_encoder_queue(stream_name)
¶
Get the input queue for a stream's encoder thread.
is_data_stream(stream_name)
¶
Check if a stream name refers to a data stream.
__enter__()
¶
Start the writer and return self for use as a context manager.
__exit__(exc_type, exc_value, traceback)
¶
Stop the writer on context manager exit.
start()
¶
Start all encoder threads.
Chunk creation is deferred until the first frame arrives, so the shared timestamp origin matches the actual frame domain rather than the wall clock.
stop()
¶
Signal all encoder threads to stop, join them, and merge the final chunk.
omgrab.recording.py_av_writer
¶
StreamEncoder
¶
VideoStreamEncoder(container, width, height, fps, codec, bitrate, input_pixel_format, output_pixel_format, options, timestamp_origin_s, metadata=None)
¶
Encodes video frames into packets without muxing.
All PTS values are calculated relative to a shared timestamp origin, ensuring synchronization across streams within the same clip.
Initialize the stream encoder.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
container
|
OutputContainer
|
The PyAV container (used only to add the stream). |
required |
width
|
int
|
Width of the video stream. |
required |
height
|
int
|
Height of the video stream. |
required |
fps
|
float | Fraction
|
Nominal frames per second. |
required |
codec
|
str
|
Codec name (e.g. 'libx264', 'ffv1'). |
required |
bitrate
|
int
|
Target bitrate. |
required |
input_pixel_format
|
str
|
Input pixel format (e.g. 'rgb24', 'gray16le'). |
required |
output_pixel_format
|
str
|
Output pixel format (e.g. 'yuv420p', 'gray16le'). |
required |
options
|
dict[str, str]
|
Codec options. |
required |
timestamp_origin_s
|
float
|
Shared timestamp origin in seconds since epoch. All PTS values are calculated relative to this origin. |
required |
metadata
|
Optional[dict[str, Any]]
|
Optional metadata dictionary to add to the stream. |
None
|
encode(frame, timestamp_s)
¶
Encode a frame and return the resulting packets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
frame
|
Frame
|
Frame data to encode. |
required |
timestamp_s
|
float
|
Timestamp in seconds since epoch. |
required |
Returns:
| Type | Description |
|---|---|
list[Packet]
|
List of encoded packets (may be empty due to encoder buffering). |
flush()
¶
Flush buffered frames from the encoder.
Returns:
| Type | Description |
|---|---|
list[Packet]
|
List of remaining encoded packets. |
DataStreamEncoder(container, timestamp_origin_s, codec='ass', metadata=None)
¶
Constructs data packets for a subtitle stream without muxing.
All PTS values are calculated relative to a shared timestamp origin, ensuring synchronization across streams within the same clip.
Initialize the data stream encoder.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
container
|
OutputContainer
|
The PyAV container (used only to add the stream). |
required |
timestamp_origin_s
|
float
|
Shared timestamp origin in seconds since epoch. |
required |
codec
|
str
|
Subtitle codec name. |
'ass'
|
metadata
|
Optional[dict[str, Any]]
|
Optional metadata dictionary to add to the stream. |
None
|
encode(data, timestamp_s)
¶
Construct a data packet.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
bytes
|
Raw bytes to store. |
required |
timestamp_s
|
float
|
Timestamp in seconds since epoch. |
required |
Returns:
| Type | Description |
|---|---|
list[Packet]
|
A single-element list containing the packet. |
flush()
¶
No-op: raw packet muxing has no encoder to flush.
merge_stream_files(part_paths, output_path)
¶
Merge per-stream MKV part files into a single multi-stream MKV.
Uses ffmpeg -c copy for a fast packet-level remux (no re-encoding). The merged file is written atomically via a .tmp rename + fsync.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
part_paths
|
list[Path]
|
Paths to single-stream .mkv.part files to merge. |
required |
output_path
|
Path
|
Final output path (must end in .mkv). |
required |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If ffmpeg fails. |
merge_recording_chunks(recording_dir, output_path)
¶
Merge sequential MKV chunks from a recording directory into one file.
Uses ffmpeg concat demuxer for a lossless stream copy. The merged file is written atomically via a .tmp rename + fsync, then the chunk directory is cleaned up.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
recording_dir
|
Path
|
Directory containing numbered .mkv chunk files. |
required |
output_path
|
Path
|
Final output path (e.g. /data/output/2026-03-17T00:47:42Z.mkv). |
required |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If ffmpeg fails. |