Skip to content

Gstreamer Environment

To see detailed implementation, skim over owa_env_gst. API Docs is being written WIP.

Examples

  • example of gst/screen listener

    from owa.core.registry import LISTENERS
    import cv2
    import numpy as np
    
    # Components automatically available - no activation needed!
    
    # Define a callback to process frames
    def process_frame(frame):
        # Display the frame
        cv2.imshow("Screen Capture", frame.frame_arr)
        cv2.waitKey(1)
    
    # Create and configure the listener
    screen = LISTENERS["gst/screen"]().configure(
        callback=process_frame,
        fps=30,
        show_cursor=True
    )
    
    # Run the screen capture
    with screen.session:
        input("Press Enter to stop")
    

    For performance metrics:

    def process_with_metrics(frame, metrics):
        print(f"FPS: {metrics.fps:.2f}, Latency: {metrics.latency*1000:.2f} ms")
        cv2.imshow("Screen", frame.frame_arr)
        cv2.waitKey(1)
    
    screen.configure(callback=process_with_metrics)
    

  • example of gst/screen_capture runnable

    from owa.core.registry import RUNNABLES
    
    # Components automatically available - no activation needed!
    screen_capture = RUNNABLES["gst/screen_capture"]().configure(fps=60)
    
    with screen_capture.session:
        for _ in range(10):
            frame = screen_capture.grab()
            print(f"Shape: {frame.frame_arr.shape}")
    

Known Issues

  • Currently, we only supports Windows OS. Other OS support is in TODO-list, but it's priority is not high.
  • Currently, we only supports device with NVIDIA GPU. This is also in TODO-list, it's priority is higher than multi-OS support.

  • When capturing some screen with WGC(Windows Graphics Capture API, it's being activate when you specify window handle), and with some desktop(not all), below issues are observed.

    • maximum FPS can't exceed maximum Hz of physical monitor.
    • When capturing Windows Terminal and Discord, the following case was reported. I also guess this phenomena is because of usage of WGC.
      • When there's no change in window, FPS drops to 1-5 frame.
      • When there's change(e.g. mouse movement) in window, FPS straightly recovers to 60+.

Auto-generated documentation

gst plugin 0.3.9.post1

High-performance GStreamer-based screen capture and recording plugin

Author: OWA Development Team

Listeners

Usage: To use listener components, import LISTENERS from owa.core and call the configure() method with a callback function:

from owa.core import LISTENERS

# Configure a listener component (replace 'component_name' with actual name)
listener = LISTENERS["gst/component_name"]
listener.configure(callback=my_callback, your_other_arguments)

# Use the listener in a context manager
with listener.session as active_listener:
    # The listener is now running and will call my_callback when events occur
    pass  # Your main code here

Note: The callback argument is required. The on_configure() method shown in the documentation is an internal method called by configure().

screen

Bases: GstPipelineRunner

High-performance GStreamer-based screen capture listener for real-time frame processing.

Captures screen content and delivers frames to a callback function. Can capture specific windows, monitors, or the entire screen.

Example:

from owa.core.registry import LISTENERS
import cv2
import numpy as np

# Define a callback to process frames
def process_frame(frame):
    # Display the frame
    cv2.imshow("Screen Capture", frame.frame_arr)
    cv2.waitKey(1)

# Create and configure the listener
screen = LISTENERS["screen"]().configure(
    callback=process_frame,
    fps=30,
    show_cursor=True
)

# Run the screen capture
with screen.session:
    input("Press Enter to stop")

For performance metrics:

def process_with_metrics(frame, metrics):
    print(f"FPS: {metrics.fps:.2f}, Latency: {metrics.latency*1000:.2f} ms")
    cv2.imshow("Screen", frame.frame_arr)
    cv2.waitKey(1)

screen.configure(callback=process_with_metrics)

on_configure
on_configure(
    *,
    callback: Callable,
    show_cursor: bool = True,
    fps: float = 60,
    window_name: str | None = None,
    monitor_idx: int | None = None,
    additional_properties: dict | None = None,
) -> bool

Configure the GStreamer pipeline for screen capture.

Other Parameters:

Name Type Description
callback Callable

Function to call with each captured frame

show_cursor bool

Whether to show the cursor in the capture.

fps float

Frames per second.

window_name str | None

(Optional) specific window to capture.

monitor_idx int | None

(Optional) specific monitor index.

additional_properties dict | None

(Optional) additional arguments to pass to the pipeline.

Source code in projects/owa-env-gst/owa/env/gst/screen/listeners.py
def on_configure(
    self,
    *,
    callback: Callable,
    show_cursor: bool = True,
    fps: float = 60,
    window_name: str | None = None,
    monitor_idx: int | None = None,
    additional_properties: dict | None = None,
) -> bool:
    """
    Configure the GStreamer pipeline for screen capture.

    Keyword Arguments:
        callback: Function to call with each captured frame
        show_cursor (bool): Whether to show the cursor in the capture.
        fps (float): Frames per second.
        window_name (str | None): (Optional) specific window to capture.
        monitor_idx (int | None): (Optional) specific monitor index.
        additional_properties (dict | None): (Optional) additional arguments to pass to the pipeline.
    """
    # Construct the pipeline description
    pipeline_description = screen_capture_pipeline(
        show_cursor=show_cursor,
        fps=fps,
        window_name=window_name,
        monitor_idx=monitor_idx,
        additional_properties=additional_properties,
    )
    logger.debug(f"Constructed pipeline: {pipeline_description}")
    super().on_configure(pipeline_description)

    wrapped_callback = build_screen_callback(callback)
    self.register_appsink_callback(wrapped_callback)

omnimodal.appsink_recorder

Bases: GstPipelineRunner

High-performance screen recorder using GStreamer appsink for real-time processing.

This recorder captures screen content and saves it to a file while providing real-time frame notifications through a callback mechanism. It supports hardware acceleration and various output formats.

Examples:

Basic screen recording to file:

>>> def on_frame(screen_data):
...     print(f"Recording frame at {screen_data.utc_ns}")
>>>
>>> recorder = AppsinkRecorder()
>>> recorder.configure(
...     filesink_location="output.mkv",
...     callback=on_frame
... )
>>> recorder.start()

Recording with custom resolution:

>>> recorder.configure(
...     filesink_location="recording.mkv",
...     callback=my_callback,
...     width=1920,
...     height=1080
... )
on_configure
on_configure(
    filesink_location: str,
    *args: Any,
    callback: Callable,
    **kwargs: Any,
) -> None

Configure the appsink recorder with output location and callback.

Parameters:

Name Type Description Default
filesink_location str

Path where the recording will be saved.

required
*args Any

Additional positional arguments for pipeline configuration.

()
callback Callable

Function to call for each recorded frame.

required
**kwargs Any

Additional keyword arguments for pipeline configuration.

{}

Returns:

Name Type Description
None None

Configuration is applied to the recorder instance.

Source code in projects/owa-env-gst/owa/env/gst/omnimodal/appsink_recorder.py
def on_configure(self, filesink_location: str, *args: Any, callback: Callable, **kwargs: Any) -> None:
    """
    Configure the appsink recorder with output location and callback.

    Args:
        filesink_location: Path where the recording will be saved.
        *args: Additional positional arguments for pipeline configuration.
        callback: Function to call for each recorded frame.
        **kwargs: Additional keyword arguments for pipeline configuration.

    Returns:
        None: Configuration is applied to the recorder instance.
    """
    # if filesink_location does not exist, create it and warn the user
    if not Path(filesink_location).parent.exists():
        Path(filesink_location).parent.mkdir(parents=True, exist_ok=True)
        logger.warning(f"Output directory {filesink_location} does not exist. Creating it.")

    # convert to posix path. this is required for gstreamer executable.
    filesink_location = Path(filesink_location).as_posix()

    pipeline_description = appsink_recorder_pipeline(filesink_location, *args, **kwargs)
    logger.debug(f"Constructed pipeline: {pipeline_description}")
    super().on_configure(pipeline_description)

    identity = self.pipeline.get_by_name("ts")

    notified_shape = None

    def parse_shape_from_scale():
        """Parse the shape from the d3d11scale element."""
        scale = self.pipeline.get_by_name("d3d11scale0")
        # Get the source and sink capabilities
        sink_caps = scale.get_static_pad("sink").get_current_caps()
        src_caps = scale.get_static_pad("src").get_current_caps()
        if sink_caps and src_caps:
            sink_structure = sink_caps.get_structure(0)
            src_structure = src_caps.get_structure(0)
            return (sink_structure.get_value("width"), sink_structure.get_value("height")), (
                src_structure.get_value("width"),
                src_structure.get_value("height"),
            )
        logger.warning("Failed to get sink or source capabilities.")
        return None, None

    def buffer_probe_callback(pad: Gst.Pad, info: Gst.PadProbeInfo):
        """Callback function to handle buffer probe events."""

        nonlocal notified_shape
        buf = info.get_buffer()
        frame_time_ns = time.time_ns()

        clock = self.pipeline.get_clock()
        elapsed = clock.get_time() - self.pipeline.get_base_time()
        latency = elapsed - buf.pts

        # warn if latency is too high, e.g. > 100ms
        if latency > 100 * Gst.MSECOND:
            logger.warning(f"High latency: {latency / Gst.MSECOND:.2f}ms")

        original_shape, shape = parse_shape_from_scale()
        if notified_shape != (original_shape, shape):
            logger.success(f"Video's original shape: {original_shape}, rescaled shape: {shape}")
            notified_shape = (original_shape, shape)

        # Create ScreenCaptured with external video reference
        from owa.msgs.desktop.screen import MediaRef

        screen_captured = ScreenCaptured(
            utc_ns=frame_time_ns,
            source_shape=original_shape,
            shape=shape,
            media_ref=MediaRef(uri=filesink_location, pts_ns=buf.pts),
        )
        callback(screen_captured)
        return Gst.PadProbeReturn.OK

    identity.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER, buffer_probe_callback)
    self.enable_fps_display()

Runnables

Usage: To use runnable components, import RUNNABLES from owa.core and call the configure() method (not on_configure()):

from owa.core import RUNNABLES

# Configure a runnable component (replace 'component_name' with actual name)
runnable = RUNNABLES["gst/component_name"]
runnable.configure(your_arguments)

# Use the runnable in a context manager
with runnable.session as active_runnable:
    # The runnable is now running in the background
    pass  # Your main code here

Note: The on_configure() method shown in the documentation is an internal method called by configure().

screen_capture

Bases: ScreenListener

High-performance screen capture runnable using GStreamer pipeline for continuous frame grabbing.

Captures screen frames continuously and makes the latest frame available through a thread-safe interface.

Example:

from owa.core.registry import RUNNABLES

screen_capture = RUNNABLES["screen_capture"]().configure(fps=60)

with screen_capture.session:
    for _ in range(10):
        frame = screen_capture.grab()
        print(f"Shape: {frame.frame_arr.shape}")

on_configure
on_configure(*args: Any, **kwargs: Any) -> ScreenCapture

Configure and start the screen listener.

Parameters:

Name Type Description Default
*args Any

Additional positional arguments for screen capture configuration.

()
fps float

Frames per second for capture.

required
window_name str

Window to capture. If None, captures entire screen.

required
monitor_idx int

Monitor index to capture.

required
**kwargs Any

Additional keyword arguments for screen capture configuration.

{}

Returns:

Name Type Description
ScreenCapture ScreenCapture

Configured screen capture instance.

Source code in projects/owa-env-gst/owa/env/gst/screen/runnable.py
def on_configure(self, *args: Any, **kwargs: Any) -> "ScreenCapture":
    """
    Configure and start the screen listener.

    Args:
        *args: Additional positional arguments for screen capture configuration.
        fps (float): Frames per second for capture.
        window_name (str, optional): Window to capture. If None, captures entire screen.
        monitor_idx (int, optional): Monitor index to capture.
        **kwargs: Additional keyword arguments for screen capture configuration.

    Returns:
        ScreenCapture: Configured screen capture instance.
    """
    self.queue = deque(maxlen=1)  # Holds the most recent frame
    self._event = threading.Event()

    def on_frame(frame):
        self.queue.append(frame)
        self._event.set()

    super().on_configure(callback=on_frame, *args, **kwargs)
    return self
grab
grab() -> ScreenCaptured

Get the most recent frame (blocks until frame is available).

Returns:

Name Type Description
ScreenCaptured ScreenCaptured

Latest captured frame with timestamp.

Raises:

Type Description
TimeoutError

If no frame is received within 1 second.

Source code in projects/owa-env-gst/owa/env/gst/screen/runnable.py
def grab(self) -> ScreenCaptured:
    """
    Get the most recent frame (blocks until frame is available).

    Returns:
        ScreenCaptured: Latest captured frame with timestamp.

    Raises:
        TimeoutError: If no frame is received within 1 second.
    """
    if not self._event.wait(timeout=1.0):
        raise TimeoutError("Timeout waiting for frame")
    self._event.clear()
    return self.queue[0]

omnimodal.subprocess_recorder

SubprocessRecorder(*args, **kwargs)

Bases: SubprocessRunner

High-performance screen and audio recorder using GStreamer subprocess.

This recorder runs GStreamer as a subprocess to capture screen content and audio, providing excellent performance and stability for long recordings. Supports various output formats and hardware acceleration.

Examples:

Basic screen recording with audio:

>>> recorder = SubprocessRecorder()
>>> recorder.configure(
...     filesink_location="recording.mkv",
...     record_audio=True,
...     record_video=True,
...     fps=30
... )
>>> recorder.start()
>>> # ... recording runs in background ...
>>> recorder.stop()

Video-only recording with custom settings:

>>> recorder.configure(
...     filesink_location="video_only.mp4",
...     record_audio=False,
...     record_video=True,
...     fps=60,
...     show_cursor=False
... )
Source code in projects/owa-core/owa/core/runnable.py
def __init__(self, *args, **kwargs):
    """
    Initialize a new RunnableThread. Whole arguments are passed to threading.Thread.

    To configure the runnable, write your own on_configure method instead.
    """
    super().__init__(*args, **kwargs)
    self._stop_event = threading.Event()
on_configure
on_configure(
    filesink_location: str,
    record_audio: bool = True,
    record_video: bool = True,
    record_timestamp: bool = True,
    enable_fpsdisplaysink: bool = True,
    show_cursor: bool = True,
    fps: float = 60,
    window_name: Optional[str] = None,
    monitor_idx: Optional[int] = None,
    additional_properties: Optional[dict] = None,
) -> None

Prepare the GStreamer pipeline command for subprocess recording.

Parameters:

Name Type Description Default
filesink_location str

Path where the recording will be saved.

required
record_audio bool

Whether to include audio in the recording.

True
record_video bool

Whether to include video in the recording.

True
record_timestamp bool

Whether to include timestamp information.

True
enable_fpsdisplaysink bool

Whether to enable FPS display during recording.

True
show_cursor bool

Whether to show the cursor in the recording.

True
fps float

Frames per second for video recording.

60
window_name Optional[str]

Specific window to record (optional).

None
monitor_idx Optional[int]

Monitor index to record from (optional).

None
additional_properties Optional[dict]

Additional pipeline properties (optional).

None

Returns:

Name Type Description
None None

Configuration is stored internally for subprocess execution.

Source code in projects/owa-env-gst/owa/env/gst/omnimodal/subprocess_recorder.py
def on_configure(
    self,
    filesink_location: str,
    record_audio: bool = True,
    record_video: bool = True,
    record_timestamp: bool = True,
    enable_fpsdisplaysink: bool = True,
    show_cursor: bool = True,
    fps: float = 60,
    window_name: Optional[str] = None,
    monitor_idx: Optional[int] = None,
    additional_properties: Optional[dict] = None,
) -> None:
    """
    Prepare the GStreamer pipeline command for subprocess recording.

    Args:
        filesink_location: Path where the recording will be saved.
        record_audio: Whether to include audio in the recording.
        record_video: Whether to include video in the recording.
        record_timestamp: Whether to include timestamp information.
        enable_fpsdisplaysink: Whether to enable FPS display during recording.
        show_cursor: Whether to show the cursor in the recording.
        fps: Frames per second for video recording.
        window_name: Specific window to record (optional).
        monitor_idx: Monitor index to record from (optional).
        additional_properties: Additional pipeline properties (optional).

    Returns:
        None: Configuration is stored internally for subprocess execution.
    """

    # if filesink_location does not exist, create it and warn the user
    if not Path(filesink_location).parent.exists():
        Path(filesink_location).parent.mkdir(parents=True, exist_ok=True)
        logger.warning(f"Output directory {filesink_location} does not exist. Creating it.")

    # convert to posix path. this is required for gstreamer executable.
    filesink_location = Path(filesink_location).as_posix()

    pipeline_description = subprocess_recorder_pipeline(
        filesink_location=filesink_location,
        record_audio=record_audio,
        record_video=record_video,
        record_timestamp=record_timestamp,
        enable_fpsdisplaysink=enable_fpsdisplaysink,
        show_cursor=show_cursor,
        fps=fps,
        window_name=window_name,
        monitor_idx=monitor_idx,
        additional_properties=additional_properties,
    )

    super().on_configure(f"gst-launch-1.0.exe -e -v {pipeline_description}".split())