Backend: Worker Package¶
This guide walks through creating the backend package for a new
MOSAIC worker under 3rd_party/workers/. By the end you will have a
fully functional worker that the Daemon can spawn and that emits
standardized JSONL telemetry.
Prerequisites¶
Python 3.10+
MOSAIC installed (
pip install -e .)Your RL library or agent framework installed
Overview¶
Every worker consists of five components:
graph LR
A["1. Config<br/>config.py"] --> B["2. Runtime<br/>runtime.py"]
B --> C["3. Telemetry<br/>telemetry.py"]
B --> D["4. Analytics<br/>analytics.py"]
E["5. Entry Point<br/>pyproject.toml"] -.->|"discovery"| A
style A fill:#4a90d9,stroke:#2e5a87,color:#fff
style B fill:#ff7f50,stroke:#cc5500,color:#fff
style C fill:#50c878,stroke:#2e8b57,color:#fff
style D fill:#50c878,stroke:#2e8b57,color:#fff
style E fill:#9370db,stroke:#6a0dad,color:#fff
Step 1: Create the Package¶
mkdir -p 3rd_party/workers/my_worker/my_worker
touch 3rd_party/workers/my_worker/my_worker/__init__.py
Step 2: Configuration (config.py)¶
Implement the WorkerConfig protocol, A dataclass with run_id,
seed, to_dict(), and from_dict():
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict
@dataclass
class MyWorkerConfig:
# Protocol-required fields
run_id: str
seed: int | None = None
# Worker-specific fields
env_id: str = "CartPole-v1"
algorithm: str = "dqn"
total_steps: int = 100_000
learning_rate: float = 1e-4
def __post_init__(self) -> None:
if not self.run_id:
raise ValueError("run_id is required")
def to_dict(self) -> Dict[str, Any]:
return {
"run_id": self.run_id,
"seed": self.seed,
"env_id": self.env_id,
"algorithm": self.algorithm,
"total_steps": self.total_steps,
"learning_rate": self.learning_rate,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MyWorkerConfig":
fields = cls.__dataclass_fields__
return cls(**{k: v for k, v in data.items() if k in fields})
Step 3: Runtime (runtime.py)¶
Manage the worker lifecycle, Emit run_started, run the training
loop, and emit run_completed or run_failed:
import json
import sys
import time
from typing import Any, Dict
class MyWorkerRuntime:
def __init__(self, config: MyWorkerConfig):
self.config = config
def run(self) -> Dict[str, Any]:
self._emit_lifecycle("run_started", {
"env_id": self.config.env_id,
"algorithm": self.config.algorithm,
})
try:
result = self._train()
summary = {"status": "completed", **result}
self._emit_lifecycle("run_completed", summary)
return summary
except Exception as e:
self._emit_lifecycle("run_failed", {"error": str(e)})
raise
def _train(self) -> Dict[str, Any]:
"""Your training logic goes here."""
for step in range(self.config.total_steps):
# ... train one step ...
# Emit step telemetry every N steps
if step % 100 == 0:
self._emit_step(step, reward=1.0)
# Emit heartbeat every 60s
if step % 10_000 == 0:
self._emit_lifecycle("heartbeat", {"step": step})
return {"total_steps": self.config.total_steps}
def _emit_step(self, step: int, reward: float):
event = {
"event_type": "step",
"run_id": self.config.run_id,
"step_index": step,
"reward": reward,
}
print(json.dumps(event), file=sys.stdout, flush=True)
def _emit_lifecycle(self, event_name: str, payload: dict):
event = {
"event": event_name,
"run_id": self.config.run_id,
"timestamp": time.time(),
"payload": payload,
}
print(json.dumps(event), file=sys.stdout, flush=True)
Important
Always use flush=True when printing telemetry. The Telemetry
Proxy reads stdout line-by-line, buffered output causes
delayed or missing telemetry.
Step 4: Worker Metadata (__init__.py)¶
Register the worker’s identity and capabilities for automatic discovery:
def get_worker_metadata() -> tuple:
from gym_gui.core.worker import WorkerMetadata, WorkerCapabilities
metadata = WorkerMetadata(
name="My Worker",
version="1.0.0",
description="My RL worker for MOSAIC",
author="Your Name",
homepage="https://github.com/...",
upstream_library="mylib",
upstream_version="1.0.0",
license="MIT",
)
capabilities = WorkerCapabilities(
worker_type="myworker",
supported_paradigms=("sequential",),
env_families=("gymnasium",),
action_spaces=("discrete", "continuous"),
observation_spaces=("vector", "image"),
max_agents=1,
supports_checkpointing=True,
requires_gpu=False,
)
return metadata, capabilities
Step 5: Entry Point (pyproject.toml)¶
Register the worker so MOSAIC discovers it automatically:
[project]
name = "my-worker"
version = "1.0.0"
requires-python = ">=3.10"
[project.entry-points."mosaic.workers"]
myworker = "my_worker:get_worker_metadata"
[build-system]
requires = ["setuptools>=64"]
build-backend = "setuptools.backends._legacy:_Backend"
Then install in development mode:
cd 3rd_party/workers/my_worker
pip install -e .
Step 6: CLI Entry Point (cli.py)¶
Create the command-line interface that the Daemon invokes:
import argparse
import json
from my_worker.config import MyWorkerConfig
from my_worker.runtime import MyWorkerRuntime
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--config", required=True)
parser.add_argument("--grpc", action="store_true")
parser.add_argument("--grpc-target", default="127.0.0.1:50055")
args = parser.parse_args(argv)
# Load config from JSON file
with open(args.config) as f:
config = MyWorkerConfig.from_dict(json.load(f))
# Run the worker
runtime = MyWorkerRuntime(config)
runtime.run()
if __name__ == "__main__":
main()
Step 7: Add Requirements¶
Create requirements/myworker.txt in the MOSAIC root:
# My Worker dependencies
mylib>=1.0.0
-e 3rd_party/workers/my_worker
And add to MOSAIC’s pyproject.toml optional dependencies:
[project.optional-dependencies]
myworker = ["-r requirements/myworker.txt"]
Step 8: FastLane Integration (Optional)¶
FastLane is a high-performance shared-memory telemetry path for real-time
rendering. When enabled, frames bypass the gRPC/SQLite slow lane and go
directly to the GUI via /dev/shm shared memory segments.
Important
Always call ``unlink()`` after ``close()`` when closing FastLane
writers. Failure to unlink shared memory segments will leave orphaned
resource_tracker processes that consume CPU and memory even after
the GUI closes.
Architecture¶
graph LR
subgraph "Worker Process"
ENV["Gymnasium Env"]
W["FastLaneWrapper"]
WR["FastLaneWriter"]
end
subgraph "Shared Memory (/dev/shm)"
SHM["FastLane Buffer<br/>psm_*"]
end
subgraph "GUI Process"
RD["FastLaneReader"]
CON["FastLaneConsumer<br/>16ms poll"]
TAB["FastLaneTab"]
end
ENV -->|"render()"| W
W -->|"publish()"| WR
WR -->|"mmap write"| SHM
SHM -->|"mmap read"| RD
RD -->|"QImage"| CON
CON -->|"frame_ready"| TAB
style SHM fill:#ffd700,stroke:#b8860b,color:#000
Implementation¶
Create a fastlane.py module in your worker package:
from __future__ import annotations
import os
from dataclasses import dataclass
from multiprocessing import shared_memory
from typing import Any, Optional
import gymnasium as gym
import numpy as np
try:
from gym_gui.fastlane import FastLaneWriter, FastLaneConfig, FastLaneMetrics
from gym_gui.fastlane.buffer import create_fastlane_name
from gym_gui.telemetry.semconv import TelemetryEnv, VideoModes
except ImportError:
FastLaneWriter = None # type: ignore
FastLaneConfig = None # type: ignore
FastLaneMetrics = None # type: ignore
create_fastlane_name = None # type: ignore
@dataclass(frozen=True)
class FastLaneConfig:
enabled: bool
slot: int
run_id: str
video_mode: str
grid_limit: int
def _resolve_config() -> FastLaneConfig:
enabled = os.getenv(TelemetryEnv.FASTLANE_ONLY, "0") == "1"
slot = int(os.getenv(TelemetryEnv.FASTLANE_SLOT, "0"))
run_id = os.getenv("MY_WORKER_RUN_ID", "my-worker-run")
video_mode = os.getenv(TelemetryEnv.FASTLANE_VIDEO_MODE, VideoModes.SINGLE)
grid_limit = int(os.getenv(TelemetryEnv.FASTLANE_GRID_LIMIT, "4"))
return FastLaneConfig(enabled, slot, run_id, video_mode, grid_limit)
_CONFIG = _resolve_config()
def is_fastlane_enabled() -> bool:
return _CONFIG.enabled
class FastLaneTelemetryWrapper(gym.Wrapper):
"""Gym wrapper that publishes frames to FastLane shared memory."""
def __init__(self, env: gym.Env, config: FastLaneConfig) -> None:
super().__init__(env)
self._config = config
self._writer: Optional[FastLaneWriter] = None
def step(self, action: Any):
obs, reward, terminated, truncated, info = self.env.step(action)
if self._config.enabled:
self._publish_frame(reward)
return obs, reward, terminated, truncated, info
def _publish_frame(self, reward: float) -> None:
if FastLaneWriter is None:
return
frame = self.env.render()
if frame is None:
return
arr = np.ascontiguousarray(frame.astype(np.uint8))
height, width, channels = arr.shape
writer = self._writer
if writer is None:
config = FastLaneConfig(
width=width, height=height, channels=channels,
pixel_format="RGB" if channels == 3 else "RGBA",
)
writer = self._create_writer(config)
if writer is None:
return
self._writer = writer
metrics = FastLaneMetrics(
last_reward=float(reward),
rolling_return=0.0,
step_rate_hz=0.0,
)
writer.publish(arr.tobytes(), metrics=metrics)
def _create_writer(self, config: Any) -> Optional[FastLaneWriter]:
try:
return FastLaneWriter.create(self._config.run_id, config)
except FileExistsError:
try:
name = create_fastlane_name(self._config.run_id)
shm = shared_memory.SharedMemory(name=name, create=False)
return FastLaneWriter(shm, config)
except Exception:
return None
def close(self) -> None:
self._close_writer()
return self.env.close()
def _close_writer(self) -> None:
"""Close FastLane writer and unlink shared memory."""
if self._writer is not None:
try:
self._writer.close()
self._writer.unlink() # CRITICAL: Remove shared memory
except FileNotFoundError:
pass # Already unlinked
except Exception as exc:
pass # Log if needed
finally:
self._writer = None
def maybe_wrap_env(env: gym.Env) -> gym.Env:
"""Wrap environment with FastLane if enabled."""
if not _CONFIG.enabled:
return env
return FastLaneTelemetryWrapper(env, _CONFIG)
Cleanup Requirements¶
Warning
The _close_writer() method MUST call both close() and
unlink(). The sequence is:
writer.close()- releases the memory mappingwriter.unlink()- removes the shared memory segment from/dev/shm
Without unlink(), the shared memory segment persists and the
multiprocessing.resource_tracker process keeps running to track it.
This leads to:
Accumulating
resource_trackerprocesses consuming CPUMemory leaks in
/dev/shmNew worker spawns creating additional trackers
Environment Variables¶
The dispatcher sets these environment variables when FastLane is enabled:
Variable |
Description |
|---|---|
|
Set to |
|
Vectorized env index to stream (default: |
|
|
|
Max env slots for grid mode (default: |
Debugging Orphaned Processes¶
If you see resource_tracker processes after closing the GUI:
# List orphaned processes
ps aux | grep resource_tracker
# Kill all resource_tracker processes
pkill -f "resource_tracker"
# Clean up orphaned shared memory
rm -f /dev/shm/psm_*
Testing¶
Verify your worker passes the MOSAIC standardization tests:
# tests/test_my_worker_standardization.py
def test_config_protocol():
"""Config implements WorkerConfig protocol."""
from my_worker.config import MyWorkerConfig
config = MyWorkerConfig(run_id="test-001", seed=42)
assert config.run_id == "test-001"
d = config.to_dict()
restored = MyWorkerConfig.from_dict(d)
assert restored.run_id == config.run_id
def test_metadata():
"""Worker provides valid metadata."""
from my_worker import get_worker_metadata
metadata, capabilities = get_worker_metadata()
assert metadata.name
assert capabilities.worker_type
def test_lifecycle_events(capsys):
"""Worker emits required lifecycle events."""
from my_worker.config import MyWorkerConfig
from my_worker.runtime import MyWorkerRuntime
import json
config = MyWorkerConfig(run_id="test-001", total_steps=10)
runtime = MyWorkerRuntime(config)
runtime.run()
output = capsys.readouterr().out
lines = [json.loads(l) for l in output.strip().split("\\n")]
events = [l.get("event") for l in lines if "event" in l]
assert "run_started" in events
assert "run_completed" in events