Skip to content

GStreamer Environment

High-performance screen capture and multimedia processing (6x faster than alternatives).

Installation

pip install owa-env-gst
# Requires GStreamer dependencies - see installation guide

Requirements

  • OS: Windows (Linux/macOS support planned)
  • GPU: NVIDIA GPU required (our GStreamer implementation is NVIDIA-specific)

Components

Component Type Description
gst/screen Listener Real-time screen capture with callbacks
gst/screen_capture Runnable On-demand screen capture
gst/omnimodal.appsink_recorder Listener Omnimodal recording with appsink
gst/omnimodal.subprocess_recorder Runnable Omnimodal recording via subprocess

Performance

Powered by GStreamer and Windows API, our implementation is 6x faster than alternatives:

Library Avg. Time per Frame Relative Speed
owa.env.gst 5.7 ms ⚑ 1Γ— (Fastest)
pyscreenshot 33 ms πŸšΆβ€β™‚οΈ 5.8Γ— slower
PIL 34 ms πŸšΆβ€β™‚οΈ 6.0Γ— slower
MSS 37 ms πŸšΆβ€β™‚οΈ 6.5Γ— slower
PyQt5 137 ms 🐒 24Γ— slower

πŸ“Œ Tested on: Intel i5-11400, GTX 1650

Not only does owa.env.gst achieve higher FPS, but it also maintains lower CPU/GPU usage, making it the ideal choice for screen recording. Same applies for ocap, since it internally imports owa.env.gst.

Benchmark Details

These performance measurements were generated using our comprehensive benchmark script: benchmark_screen_captures.py

The script tests multiple screen capture libraries under identical conditions to ensure fair comparison. You can run it yourself to verify performance on your hardware.

Usage Examples

from owa.core import LISTENERS
import cv2

def process_frame(frame):
    cv2.imshow("Screen Capture", frame.frame_arr)
    cv2.waitKey(1)

screen = LISTENERS["gst/screen"]().configure(
    callback=process_frame,
    fps=60,
    show_cursor=True
)

with screen.session:
    input("Press Enter to stop")
def process_with_metrics(frame, metrics):
    print(f"FPS: {metrics.fps:.2f}, Latency: {metrics.latency*1000:.2f}ms")
    cv2.imshow("Screen", frame.frame_arr)
    cv2.waitKey(1)

screen = LISTENERS["gst/screen"]().configure(callback=process_with_metrics)
from owa.core import RUNNABLES

capture = RUNNABLES["gst/screen_capture"]().configure(fps=60)

with capture.session:
    for i in range(10):
        frame = capture.grab()
        print(f"Frame {i}: {frame.frame_arr.shape}")

Known Limitations

Current Limitations

  • Windows only (Linux/macOS support planned)
  • NVIDIA GPU required (our GStreamer implementation is NVIDIA-specific)

Windows Graphics Capture API (WGC) Issues

When capturing some screen with WGC (Windows Graphics Capture API, activated when you specify window handle), the following issues are observed:

  • FPS Limitation: Maximum FPS can't exceed maximum Hz of physical monitor
  • Variable FPS with specific applications: When capturing Windows Terminal and Discord, the following behavior was reported:

    • When there's no change in window, FPS drops to 1-5 frames
    • When there's change (e.g. mouse movement) in window, FPS immediately recovers to 60+

This phenomenon is likely due to WGC's optimization behavior.

Implementation

See owa-env-gst source for detailed implementation.

API Reference

gst plugin 0.5.7

High-performance GStreamer-based screen capture and recording plugin

Author: OWA Development Team

Listeners

Usage: To use listener components, import LISTENERS from owa.core and call the configure() method with a callback function:

from owa.core import LISTENERS

# Configure a listener component (replace 'component_name' with actual name)
listener = LISTENERS["gst/component_name"]
listener.configure(callback=my_callback, your_other_arguments)

# Use the listener in a context manager
with listener.session as active_listener:
    # The listener is now running and will call my_callback when events occur
    pass  # Your main code here

Note: The callback argument is required. The on_configure() method shown in the documentation is an internal method called by configure().

screen

Bases: GstPipelineRunner

High-performance GStreamer-based screen capture listener for real-time frame processing.

Captures screen content and delivers frames to a callback function. Can capture specific windows, monitors, or the entire screen.

Example:

from owa.core.registry import LISTENERS
import cv2
import numpy as np

# Define a callback to process frames
def process_frame(frame):
    # Display the frame
    cv2.imshow("Screen Capture", frame.frame_arr)
    cv2.waitKey(1)

# Create and configure the listener
screen = LISTENERS["screen"]().configure(
    callback=process_frame,
    fps=30,
    show_cursor=True
)

# Run the screen capture
with screen.session:
    input("Press Enter to stop")

For performance metrics:

def process_with_metrics(frame, metrics):
    print(f"FPS: {metrics.fps:.2f}, Latency: {metrics.latency*1000:.2f} ms")
    cv2.imshow("Screen", frame.frame_arr)
    cv2.waitKey(1)

screen.configure(callback=process_with_metrics)

on_configure
on_configure(
    *,
    callback: Callable,
    show_cursor: bool = True,
    fps: float = 60,
    window_name: str | None = None,
    monitor_idx: int | None = None,
    additional_properties: dict | None = None,
) -> bool

Configure the GStreamer pipeline for screen capture.

Other Parameters:

Name Type Description
callback Callable

Function to call with each captured frame

show_cursor bool

Whether to show the cursor in the capture.

fps float

Frames per second.

window_name str | None

(Optional) specific window to capture.

monitor_idx int | None

(Optional) specific monitor index.

additional_properties dict | None

(Optional) additional arguments to pass to the pipeline.

Source code in projects/owa-env-gst/owa/env/gst/screen/listeners.py
def on_configure(
    self,
    *,
    callback: Callable,
    show_cursor: bool = True,
    fps: float = 60,
    window_name: str | None = None,
    monitor_idx: int | None = None,
    additional_properties: dict | None = None,
) -> bool:
    """
    Configure the GStreamer pipeline for screen capture.

    Keyword Arguments:
        callback: Function to call with each captured frame
        show_cursor (bool): Whether to show the cursor in the capture.
        fps (float): Frames per second.
        window_name (str | None): (Optional) specific window to capture.
        monitor_idx (int | None): (Optional) specific monitor index.
        additional_properties (dict | None): (Optional) additional arguments to pass to the pipeline.
    """
    # Construct the pipeline description
    pipeline_description = screen_capture_pipeline(
        show_cursor=show_cursor,
        fps=fps,
        window_name=window_name,
        monitor_idx=monitor_idx,
        additional_properties=additional_properties,
    )
    logger.debug(f"Constructed pipeline: {pipeline_description}")
    super().on_configure(pipeline_description)

    wrapped_callback = build_screen_callback(callback)
    self.register_appsink_callback(wrapped_callback)

omnimodal.appsink_recorder

Bases: GstPipelineRunner

High-performance screen recorder using GStreamer appsink for real-time processing.

This recorder captures screen content and saves it to a file while providing real-time frame notifications through a callback mechanism. It supports hardware acceleration and various output formats.

Examples:

Basic screen recording to file:

>>> def on_frame(screen_data):
...     print(f"Recording frame at {screen_data.utc_ns}")
>>>
>>> recorder = AppsinkRecorder()
>>> recorder.configure(
...     filesink_location="output.mkv",
...     callback=on_frame
... )
>>> recorder.start()

Recording with custom resolution:

>>> recorder.configure(
...     filesink_location="recording.mkv",
...     callback=my_callback,
...     width=1920,
...     height=1080
... )
on_configure
on_configure(
    filesink_location: str,
    *args: Any,
    callback: Callable,
    **kwargs: Any,
) -> None

Configure the appsink recorder with output location and callback.

Parameters:

Name Type Description Default
filesink_location str

Path where the recording will be saved.

required
*args Any

Additional positional arguments for pipeline configuration.

()
callback Callable

Function to call for each recorded frame.

required
**kwargs Any

Additional keyword arguments for pipeline configuration.

{}

Returns:

Name Type Description
None None

Configuration is applied to the recorder instance.

Source code in projects/owa-env-gst/owa/env/gst/omnimodal/appsink_recorder.py
def on_configure(self, filesink_location: str, *args: Any, callback: Callable, **kwargs: Any) -> None:
    """
    Configure the appsink recorder with output location and callback.

    Args:
        filesink_location: Path where the recording will be saved.
        *args: Additional positional arguments for pipeline configuration.
        callback: Function to call for each recorded frame.
        **kwargs: Additional keyword arguments for pipeline configuration.

    Returns:
        None: Configuration is applied to the recorder instance.
    """
    # if filesink_location does not exist, create it and warn the user
    if not Path(filesink_location).parent.exists():
        Path(filesink_location).parent.mkdir(parents=True, exist_ok=True)
        logger.warning(f"Output directory {filesink_location} does not exist. Creating it.")

    # convert to posix path. this is required for gstreamer executable.
    filesink_location = Path(filesink_location).as_posix()

    pipeline_description = appsink_recorder_pipeline(filesink_location, *args, **kwargs)
    logger.debug(f"Constructed pipeline: {pipeline_description}")
    super().on_configure(pipeline_description)

    identity = self.pipeline.get_by_name("ts")

    notified_shape = None

    def parse_shape_from_scale():
        """Parse the shape from the d3d11scale element."""
        scale = self.pipeline.get_by_name("d3d11scale0")
        # Get the source and sink capabilities
        sink_caps = scale.get_static_pad("sink").get_current_caps()
        src_caps = scale.get_static_pad("src").get_current_caps()
        if sink_caps and src_caps:
            sink_structure = sink_caps.get_structure(0)
            src_structure = src_caps.get_structure(0)
            return (sink_structure.get_value("width"), sink_structure.get_value("height")), (
                src_structure.get_value("width"),
                src_structure.get_value("height"),
            )
        logger.warning("Failed to get sink or source capabilities.")
        return None, None

    def buffer_probe_callback(pad: Gst.Pad, info: Gst.PadProbeInfo):
        """Callback function to handle buffer probe events."""

        nonlocal notified_shape
        buf = info.get_buffer()
        frame_time_ns = time.time_ns()

        clock = self.pipeline.get_clock()
        elapsed = clock.get_time() - self.pipeline.get_base_time()
        latency = elapsed - buf.pts

        # warn if latency is too high, e.g. > 100ms
        if latency > 100 * Gst.MSECOND:
            logger.warning(f"High latency: {latency / Gst.MSECOND:.2f}ms")

        original_shape, shape = parse_shape_from_scale()
        if notified_shape != (original_shape, shape):
            logger.success(f"Video's original shape: {original_shape}, rescaled shape: {shape}")
            notified_shape = (original_shape, shape)

        # Create ScreenCaptured with external video reference
        from owa.msgs.desktop.screen import MediaRef

        screen_captured = ScreenCaptured(
            utc_ns=frame_time_ns,
            source_shape=original_shape,
            shape=shape,
            media_ref=MediaRef(uri=filesink_location, pts_ns=buf.pts),
        )
        callback(screen_captured)
        return Gst.PadProbeReturn.OK

    identity.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER, buffer_probe_callback)
    self.enable_fps_display()

Runnables

Usage: To use runnable components, import RUNNABLES from owa.core and call the configure() method (not on_configure()):

from owa.core import RUNNABLES

# Configure a runnable component (replace 'component_name' with actual name)
runnable = RUNNABLES["gst/component_name"]
runnable.configure(your_arguments)

# Use the runnable in a context manager
with runnable.session as active_runnable:
    # The runnable is now running in the background
    pass  # Your main code here

Note: The on_configure() method shown in the documentation is an internal method called by configure().

screen_capture

Bases: ScreenListener

High-performance screen capture runnable using GStreamer pipeline for continuous frame grabbing.

Captures screen frames continuously and makes the latest frame available through a thread-safe interface.

Example:

from owa.core.registry import RUNNABLES

screen_capture = RUNNABLES["screen_capture"]().configure(fps=60)

with screen_capture.session:
    for _ in range(10):
        frame = screen_capture.grab()
        print(f"Shape: {frame.frame_arr.shape}")

on_configure
on_configure(*args: Any, **kwargs: Any) -> ScreenCapture

Configure and start the screen listener.

Parameters:

Name Type Description Default
*args Any

Additional positional arguments for screen capture configuration.

()
fps float

Frames per second for capture.

required
window_name str

Window to capture. If None, captures entire screen.

required
monitor_idx int

Monitor index to capture.

required
**kwargs Any

Additional keyword arguments for screen capture configuration.

{}

Returns:

Name Type Description
ScreenCapture ScreenCapture

Configured screen capture instance.

Source code in projects/owa-env-gst/owa/env/gst/screen/runnable.py
def on_configure(self, *args: Any, **kwargs: Any) -> "ScreenCapture":
    """
    Configure and start the screen listener.

    Args:
        *args: Additional positional arguments for screen capture configuration.
        fps (float): Frames per second for capture.
        window_name (str, optional): Window to capture. If None, captures entire screen.
        monitor_idx (int, optional): Monitor index to capture.
        **kwargs: Additional keyword arguments for screen capture configuration.

    Returns:
        ScreenCapture: Configured screen capture instance.
    """
    self.queue = deque(maxlen=1)  # Holds the most recent frame
    self._event = threading.Event()

    def on_frame(frame):
        self.queue.append(frame)
        self._event.set()

    super().on_configure(callback=on_frame, *args, **kwargs)
    return self
grab
grab() -> ScreenCaptured

Get the most recent frame (blocks until frame is available).

Returns:

Name Type Description
ScreenCaptured ScreenCaptured

Latest captured frame with timestamp.

Raises:

Type Description
TimeoutError

If no frame is received within 1 second.

Source code in projects/owa-env-gst/owa/env/gst/screen/runnable.py
def grab(self) -> ScreenCaptured:
    """
    Get the most recent frame (blocks until frame is available).

    Returns:
        ScreenCaptured: Latest captured frame with timestamp.

    Raises:
        TimeoutError: If no frame is received within 1 second.
    """
    if not self._event.wait(timeout=1.0):
        raise TimeoutError("Timeout waiting for frame")
    self._event.clear()
    return self.queue[0]

omnimodal.subprocess_recorder

SubprocessRecorder(*args, **kwargs)

Bases: SubprocessRunner

High-performance screen and audio recorder using GStreamer subprocess.

This recorder runs GStreamer as a subprocess to capture screen content and audio, providing excellent performance and stability for long recordings. Supports various output formats and hardware acceleration.

Examples:

Basic screen recording with audio:

>>> recorder = SubprocessRecorder()
>>> recorder.configure(
...     filesink_location="recording.mkv",
...     record_audio=True,
...     record_video=True,
...     fps=30
... )
>>> recorder.start()
>>> # ... recording runs in background ...
>>> recorder.stop()

Video-only recording with custom settings:

>>> recorder.configure(
...     filesink_location="video_only.mp4",
...     record_audio=False,
...     record_video=True,
...     fps=60,
...     show_cursor=False
... )
Source code in projects/owa-core/owa/core/runnable.py
def __init__(self, *args, **kwargs):
    """
    Initialize a new RunnableThread. Whole arguments are passed to threading.Thread.

    To configure the runnable, write your own on_configure method instead.
    """
    super().__init__(*args, **kwargs)
    self._stop_event = threading.Event()
on_configure
on_configure(
    filesink_location: str,
    record_audio: bool = True,
    record_video: bool = True,
    record_timestamp: bool = True,
    enable_fpsdisplaysink: bool = True,
    show_cursor: bool = True,
    fps: float = 60,
    window_name: Optional[str] = None,
    monitor_idx: Optional[int] = None,
    additional_properties: Optional[dict] = None,
) -> None

Prepare the GStreamer pipeline command for subprocess recording.

Parameters:

Name Type Description Default
filesink_location str

Path where the recording will be saved.

required
record_audio bool

Whether to include audio in the recording.

True
record_video bool

Whether to include video in the recording.

True
record_timestamp bool

Whether to include timestamp information.

True
enable_fpsdisplaysink bool

Whether to enable FPS display during recording.

True
show_cursor bool

Whether to show the cursor in the recording.

True
fps float

Frames per second for video recording.

60
window_name Optional[str]

Specific window to record (optional).

None
monitor_idx Optional[int]

Monitor index to record from (optional).

None
additional_properties Optional[dict]

Additional pipeline properties (optional).

None

Returns:

Name Type Description
None None

Configuration is stored internally for subprocess execution.

Source code in projects/owa-env-gst/owa/env/gst/omnimodal/subprocess_recorder.py
def on_configure(
    self,
    filesink_location: str,
    record_audio: bool = True,
    record_video: bool = True,
    record_timestamp: bool = True,
    enable_fpsdisplaysink: bool = True,
    show_cursor: bool = True,
    fps: float = 60,
    window_name: Optional[str] = None,
    monitor_idx: Optional[int] = None,
    additional_properties: Optional[dict] = None,
) -> None:
    """
    Prepare the GStreamer pipeline command for subprocess recording.

    Args:
        filesink_location: Path where the recording will be saved.
        record_audio: Whether to include audio in the recording.
        record_video: Whether to include video in the recording.
        record_timestamp: Whether to include timestamp information.
        enable_fpsdisplaysink: Whether to enable FPS display during recording.
        show_cursor: Whether to show the cursor in the recording.
        fps: Frames per second for video recording.
        window_name: Specific window to record (optional).
        monitor_idx: Monitor index to record from (optional).
        additional_properties: Additional pipeline properties (optional).

    Returns:
        None: Configuration is stored internally for subprocess execution.
    """

    # if filesink_location does not exist, create it and warn the user
    if not Path(filesink_location).parent.exists():
        Path(filesink_location).parent.mkdir(parents=True, exist_ok=True)
        logger.warning(f"Output directory {filesink_location} does not exist. Creating it.")

    # convert to posix path. this is required for gstreamer executable.
    filesink_location = Path(filesink_location).as_posix()

    pipeline_description = subprocess_recorder_pipeline(
        filesink_location=filesink_location,
        record_audio=record_audio,
        record_video=record_video,
        record_timestamp=record_timestamp,
        enable_fpsdisplaysink=enable_fpsdisplaysink,
        show_cursor=show_cursor,
        fps=fps,
        window_name=window_name,
        monitor_idx=monitor_idx,
        additional_properties=additional_properties,
    )

    super().on_configure(f"gst-launch-1.0.exe -e -v {pipeline_description}".split())