Skip to content

recording

Stream configuration, chunked MKV writing, and stream merging.

omgrab.recording.stream_configs

Stream configuration dataclasses for video and data streams.

VideoStreamConfig(width, height, fps, codec='libx264', bitrate=2000000, input_pixel_format='rgb24', output_pixel_format='yuv420p', stream_options=dict(), metadata=dict()) dataclass

Configuration for a video stream.

Attributes:

Name Type Description
width int

Frame width in pixels.

height int

Frame height in pixels.

fps float | Fraction

Target frames per second.

codec str

Video codec name (e.g. 'libx264', 'libx265').

bitrate int

Target bitrate in bits per second.

input_pixel_format str

Pixel format of incoming frames (e.g. 'rgb24').

output_pixel_format str

Pixel format for the encoded stream (e.g. 'yuv420p').

stream_options dict[str, str]

Additional codec options passed to the encoder.

metadata dict[str, Any]

Arbitrary metadata stored in the MKV stream header.

DataStreamConfig(codec='ass', metadata=dict()) dataclass

Configuration for a non-video data stream stored as an MKV subtitle track.

Attributes:

Name Type Description
codec str

Subtitle codec name (e.g. 'ass').

metadata dict[str, Any]

Arbitrary metadata stored in the MKV stream header.

omgrab.recording.chunked_writer

Chunked video writer with per-stream parallel encoding.

ChunkedWriter(name, output_directory, stream_configs, start_chunk_callback=None, sensor_stream_configs=None, chunk_length_s=60.0, max_encoder_queue_size=200, on_error=None)

Chunked video writer with per-stream parallel encoding.

Each stream gets its own encoder thread AND its own single-stream MKV container. Chunks are rotated after chunk_length_s seconds of recording time, measured from the first frame's timestamp.

All streams share a common timestamp origin per chunk so PTS values are directly comparable across .part files. When the first frame arrives on any stream, its timestamp becomes the origin for the first chunk. On rotation, the origin advances by exactly chunk_length_s.

At finalization (chunk boundary or stop), per-stream .mkv.part files are merged into a single multi-stream MKV via ffmpeg -c copy.

Rotation protocol (barrier with generation counter): 1. Any encoder detects a frame whose timestamp exceeds the current chunk's end boundary and sets rotation_requested. 2. All encoders flush their encoder, close their container, and enter the rotation barrier. 3. The last encoder to arrive merges the per-stream files, advances the chunk origin, creates new per-stream containers, and advances the generation. 4. All encoders resume with new encoder/container objects.

Initialize the chunked writer.

Parameters:

Name Type Description Default
name str

Name for this writer (used in log messages and thread names).

required
output_directory Path

Directory to write video files.

required
stream_configs dict[str, VideoStreamConfig]

Dictionary mapping stream names to their config.

required
start_chunk_callback Optional[StartChunkCallback]

Optional callback to generate chunk IDs. Signature: (name, started_at, file_extension) -> chunk_id. When None, sequential 5-digit IDs are generated automatically.

None
sensor_stream_configs Optional[dict[str, DataStreamConfig]]

Optional sensor stream configurations.

None
chunk_length_s float

Chunk duration in seconds. The first chunk starts when the first frame arrives; subsequent chunks advance by this amount.

60.0
max_encoder_queue_size int

Max queue size per encoder thread.

200
on_error Optional[Callable[[str], None]]

Optional callback invoked when an encoder thread crashes. Called with the stream name that crashed. Fired on a separate daemon thread to avoid deadlock.

None

get_encoder_queue(stream_name)

Get the input queue for a stream's encoder thread.

is_data_stream(stream_name)

Check if a stream name refers to a data stream.

__enter__()

Start the writer and return self for use as a context manager.

__exit__(exc_type, exc_value, traceback)

Stop the writer on context manager exit.

start()

Start all encoder threads.

Chunk creation is deferred until the first frame arrives, so the shared timestamp origin matches the actual frame domain rather than the wall clock.

stop()

Signal all encoder threads to stop, join them, and merge the final chunk.

omgrab.recording.py_av_writer

StreamEncoder

Bases: Protocol

Common interface for all stream encoders (video and data).

encode(data, timestamp_s)

Encode data and return the resulting packets.

flush()

Flush any buffered data and return remaining packets.

VideoStreamEncoder(container, width, height, fps, codec, bitrate, input_pixel_format, output_pixel_format, options, timestamp_origin_s, metadata=None)

Encodes video frames into packets without muxing.

All PTS values are calculated relative to a shared timestamp origin, ensuring synchronization across streams within the same clip.

Initialize the stream encoder.

Parameters:

Name Type Description Default
container OutputContainer

The PyAV container (used only to add the stream).

required
width int

Width of the video stream.

required
height int

Height of the video stream.

required
fps float | Fraction

Nominal frames per second.

required
codec str

Codec name (e.g. 'libx264', 'ffv1').

required
bitrate int

Target bitrate.

required
input_pixel_format str

Input pixel format (e.g. 'rgb24', 'gray16le').

required
output_pixel_format str

Output pixel format (e.g. 'yuv420p', 'gray16le').

required
options dict[str, str]

Codec options.

required
timestamp_origin_s float

Shared timestamp origin in seconds since epoch. All PTS values are calculated relative to this origin.

required
metadata Optional[dict[str, Any]]

Optional metadata dictionary to add to the stream.

None

encode(frame, timestamp_s)

Encode a frame and return the resulting packets.

Parameters:

Name Type Description Default
frame Frame

Frame data to encode.

required
timestamp_s float

Timestamp in seconds since epoch.

required

Returns:

Type Description
list[Packet]

List of encoded packets (may be empty due to encoder buffering).

flush()

Flush buffered frames from the encoder.

Returns:

Type Description
list[Packet]

List of remaining encoded packets.

DataStreamEncoder(container, timestamp_origin_s, codec='ass', metadata=None)

Constructs data packets for a subtitle stream without muxing.

All PTS values are calculated relative to a shared timestamp origin, ensuring synchronization across streams within the same clip.

Initialize the data stream encoder.

Parameters:

Name Type Description Default
container OutputContainer

The PyAV container (used only to add the stream).

required
timestamp_origin_s float

Shared timestamp origin in seconds since epoch.

required
codec str

Subtitle codec name.

'ass'
metadata Optional[dict[str, Any]]

Optional metadata dictionary to add to the stream.

None

encode(data, timestamp_s)

Construct a data packet.

Parameters:

Name Type Description Default
data bytes

Raw bytes to store.

required
timestamp_s float

Timestamp in seconds since epoch.

required

Returns:

Type Description
list[Packet]

A single-element list containing the packet.

flush()

No-op: raw packet muxing has no encoder to flush.

merge_stream_files(part_paths, output_path)

Merge per-stream MKV part files into a single multi-stream MKV.

Uses ffmpeg -c copy for a fast packet-level remux (no re-encoding). The merged file is written atomically via a .tmp rename + fsync.

Parameters:

Name Type Description Default
part_paths list[Path]

Paths to single-stream .mkv.part files to merge.

required
output_path Path

Final output path (must end in .mkv).

required

Raises:

Type Description
RuntimeError

If ffmpeg fails.

merge_recording_chunks(recording_dir, output_path)

Merge sequential MKV chunks from a recording directory into one file.

Uses ffmpeg concat demuxer for a lossless stream copy. The merged file is written atomically via a .tmp rename + fsync, then the chunk directory is cleaned up.

Parameters:

Name Type Description Default
recording_dir Path

Directory containing numbered .mkv chunk files.

required
output_path Path

Final output path (e.g. /data/output/2026-03-17T00:47:42Z.mkv).

required

Raises:

Type Description
RuntimeError

If ffmpeg fails.