Developing an Operator¶
This guide walks through creating a new operator type for MOSAIC.
Unlike Workers (which are standalone packages in 3rd_party/workers/),
Operators are lightweight, they need an InteractiveRuntime inside
an existing worker subprocess plus a few GUI integration points.
Prerequisites¶
Python 3.10+
MOSAIC installed (
pip install -e .)An existing worker package (or create a new one following the Worker development guide)
Overview¶
Adding a new operator requires changes in three areas:
%%{init: {"flowchart": {"curve": "linear"}} }%%
graph LR
A["1. InteractiveRuntime<br/>(Worker subprocess)"] --> B["2. OperatorLauncher<br/>(Command dispatch)"]
B --> C["3. GUI Integration<br/>(Config widget)"]
style A fill:#ff7f50,stroke:#cc5500,color:#fff
style B fill:#50c878,stroke:#2e8b57,color:#fff
style C fill:#4a90d9,stroke:#2e5a87,color:#fff
Step 1: Create the InteractiveRuntime¶
The InteractiveRuntime is the core of your operator. It reads
JSON commands from stdin, interacts with the environment and your
decision-making logic, and writes JSON responses to stdout.
# 3rd_party/workers/my_worker/my_worker/runtime.py
import json
import sys
from typing import Any, Optional
class InteractiveRuntime:
"""Interactive runtime for GUI step-by-step control.
Protocol:
Input (stdin):
{"cmd": "reset", "seed": 42}
{"cmd": "step"}
{"cmd": "stop"}
Output (stdout):
{"type": "ready", "run_id": "...", "env_id": "...", "seed": 42}
{"type": "step", "step_index": 0, "action": 2, "reward": 0.5, ...}
{"type": "episode_end", "total_reward": 0.95, "episode_length": 15}
{"type": "error", "message": "..."}
"""
def __init__(self, config):
self._config = config
self._env = None
self._obs = None
self._step_idx = 0
self._episode_reward = 0.0
def _emit(self, data: dict) -> None:
"""Emit a JSON response to stdout."""
print(json.dumps(data), flush=True)
def _handle_reset(self, seed: Optional[int] = None) -> None:
"""Reset the environment and prepare for stepping."""
import gymnasium as gym
if self._env is None:
self._env = gym.make(
self._config.env_id,
render_mode="rgb_array",
)
self._obs, info = self._env.reset(seed=seed)
self._step_idx = 0
self._episode_reward = 0.0
self._emit({
"type": "ready",
"run_id": self._config.run_id,
"env_id": self._config.env_id,
"seed": seed,
})
def _select_action(self, observation: Any) -> int:
"""Your decision-making logic goes here."""
# Example: random action
return self._env.action_space.sample()
def _handle_step(self) -> None:
"""Execute one environment step."""
action = self._select_action(self._obs)
obs_new, reward, terminated, truncated, info = self._env.step(action)
self._episode_reward += reward
self._step_idx += 1
# Build step response
step_data = {
"type": "step",
"step_index": self._step_idx,
"action": int(action),
"reward": float(reward),
"terminated": terminated,
"truncated": truncated,
"episode_reward": self._episode_reward,
}
# Add render payload for GUI display
try:
rgb_frame = self._env.render()
if rgb_frame is not None:
import numpy as np
step_data["render_payload"] = {
"mode": "rgb",
"rgb": rgb_frame.tolist(),
"width": rgb_frame.shape[1],
"height": rgb_frame.shape[0],
}
except Exception:
pass
self._emit(step_data)
# Check for episode end
if terminated or truncated:
self._emit({
"type": "episode_end",
"total_reward": self._episode_reward,
"episode_length": self._step_idx,
"terminated": terminated,
"truncated": truncated,
})
# Auto-reset for next episode
self._obs, _ = self._env.reset()
self._step_idx = 0
self._episode_reward = 0.0
else:
self._obs = obs_new
def run(self) -> None:
"""Main loop -- read commands from stdin, dispatch."""
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
cmd = json.loads(line)
except json.JSONDecodeError as e:
self._emit({"type": "error", "message": f"Invalid JSON: {e}"})
continue
cmd_type = cmd.get("cmd")
if cmd_type == "reset":
self._handle_reset(cmd.get("seed"))
elif cmd_type == "step":
self._handle_step()
elif cmd_type == "stop":
self._emit({"type": "stopped"})
break
else:
self._emit({
"type": "error",
"message": f"Unknown command: {cmd_type}",
})
Important
Always use flush=True when printing responses. The GUI reads
stdout line-by-line – buffered output causes delayed or missing
responses.
Step 2: Add the --interactive CLI Flag¶
Add the interactive mode flag to your worker’s CLI entry point:
# 3rd_party/workers/my_worker/my_worker/cli.py
import argparse
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--run-id", required=True)
parser.add_argument("--env-id", default="CartPole-v1")
parser.add_argument("--render-mode", default=None)
parser.add_argument(
"--interactive",
action="store_true",
help="Run in interactive mode for GUI step-by-step control.",
)
args = parser.parse_args(argv)
config = MyWorkerConfig(
run_id=args.run_id,
env_id=args.env_id,
render_mode=args.render_mode,
)
if args.interactive:
from my_worker.runtime import InteractiveRuntime
runtime = InteractiveRuntime(config)
runtime.run()
else:
# Normal training/evaluation mode
from my_worker.runtime import MyWorkerRuntime
runtime = MyWorkerRuntime(config)
runtime.run()
if __name__ == "__main__":
main()
Step 3: Register with OperatorLauncher¶
Add your operator type to the command dispatch in
gym_gui/services/operator_launcher.py:
def _build_my_operator_command(
self,
config: OperatorConfig,
run_id: str,
*,
interactive: bool = False,
) -> list[str]:
settings = config.settings or {}
cmd = [
self._python_executable,
"-m", "my_worker.cli",
"--run-id", run_id,
"--env-id", config.task,
"--render-mode", "rgb_array",
]
if interactive:
cmd.append("--interactive")
# Add operator-specific settings
if settings.get("my_param"):
cmd.extend(["--my-param", str(settings["my_param"])])
return cmd
Then add the dispatch case in launch_operator():
elif config.operator_type == "my_type":
cmd = self._build_my_operator_command(config, run_id, interactive=interactive)
Step 4: Add to Worker Catalog¶
Register your operator’s worker in the GUI catalog so it appears in the operator type dropdown:
# gym_gui/ui/worker_catalog/catalog.py
WorkerDefinition(
worker_id="my_worker",
display_name="My Custom Operator",
description="Description of what your operator does.",
supports_training=False,
supports_policy_load=True,
requires_live_telemetry=True,
supports_multi_agent=False,
)
Step 5: Add Constants (Optional)¶
For a polished integration, add constants for your operator:
# gym_gui/constants/constants_operator.py
OPERATOR_CATEGORY_MY_TYPE = "my_type"
DEFAULT_OPERATOR_ID_MY_TYPE = "my_type_default"
OPERATOR_DISPLAY_NAME_MY_TYPE = "My Custom Operator"
OPERATOR_DESCRIPTION_MY_TYPE = (
"Description shown in the operator dropdown."
)
Testing¶
Test your operator’s interactive mode end-to-end:
# tests/test_my_operator_interactive.py
import json
import subprocess
def test_interactive_protocol():
"""Test the stdin/stdout JSON protocol."""
proc = subprocess.Popen(
["python", "-m", "my_worker.cli",
"--run-id", "test-001",
"--env-id", "CartPole-v1",
"--interactive"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
)
# Send reset
proc.stdin.write('{"cmd": "reset", "seed": 42}\n')
proc.stdin.flush()
response = json.loads(proc.stdout.readline())
assert response["type"] == "ready"
assert response["seed"] == 42
# Send step
proc.stdin.write('{"cmd": "step"}\n')
proc.stdin.flush()
response = json.loads(proc.stdout.readline())
assert response["type"] == "step"
assert "action" in response
# Send stop
proc.stdin.write('{"cmd": "stop"}\n')
proc.stdin.flush()
response = json.loads(proc.stdout.readline())
assert response["type"] == "stopped"
proc.wait(timeout=5)
For Qt integration testing, use pytest-qt:
def test_step_triggers_next_step(qtbot, execution_manager):
"""Test that step response triggers paced next step."""
with qtbot.waitSignal(
execution_manager.step_operator, timeout=1000
) as blocker:
execution_manager.on_step_received("op1")
assert blocker.signal_triggered
assert blocker.args == ["op1"]
Checklist¶
Use this checklist to verify your operator is complete:
InteractiveRuntimereads stdin commands and writes stdout responses--interactiveCLI flag dispatches toInteractiveRuntimeResponses include
render_payloadfor GUI displayflush=Trueon all stdout writesOperatorLauncherhas a_build_*_command()method for your typeWorker appears in
worker_catalog.pySubprocess protocol test passes (reset/step/stop cycle)
Episodes auto-reset after termination
Adding an Environment Family¶
An environment family groups related environments under a single dropdown entry in the Operators tab. Adding a new family requires changes in two files:
gym_gui/ui/widgets/operator_config_widget.py– family registry and UI logicgym_gui/ui/main_window.py– environment creation and preview rendering
This section walks through each step. The mosaic_multigrid /
ini_multigrid split is used as a running example.
Step 1: Add to ENV_FAMILIES¶
Open gym_gui/ui/widgets/operator_config_widget.py and add a new
key to the ENV_FAMILIES dict:
ENV_FAMILIES: Dict[str, Tuple[str, ...]] = {
# ... existing families ...
"my_family": (
"MyEnv-TaskA-v0",
"MyEnv-TaskB-v0",
"MyEnv-TaskC-v0",
),
}
Each string is a gym-registered environment ID that appears in the Task dropdown when the family is selected.
If your environments are registered dynamically (like babyai or
meltingpot), use an empty tuple and populate it at runtime:
"my_family": (), # Loaded dynamically
Tip
Use the correct gym ID strings. If a package registers environments
via gym.register(id="MyEnv-TaskA-v0", ...), the ENV_FAMILIES
tuple must use "MyEnv-TaskA-v0" exactly.
Step 2: Update _auto_detect_agent_count¶
If your family is multi-agent, add detection logic so the GUI knows how many agents to display:
def _auto_detect_agent_count(env_family: str, env_id: str) -> int:
# ... existing checks ...
elif env_family == "my_family":
# Instantiate and query agent count
import gym
env = gym.make(env_id)
num_agents = getattr(env, 'num_agents', 1)
env.close()
return num_agents
For single-agent families, this function can return 0 (the
default) and no multi-agent panel will be shown.
Step 3: Update _get_execution_mode¶
Declare whether your family uses turn-based (AEC) or simultaneous (parallel) stepping:
def _get_execution_mode(env_family: str) -> str:
if env_family in ("pettingzoo", "pettingzoo_classic", "open_spiel"):
return "aec"
elif env_family in ("mosaic_multigrid", "ini_multigrid",
"meltingpot", "overcooked", "my_family"):
return "parallel"
return "aec"
Step 4: Update multi-agent guard tuples¶
Several functions use tuples to identify multi-agent or simultaneous-only families. Add your family name to each:
_is_multiagent_env_selected()– determines whether to show the player assignment panel:return env_family in ( "pettingzoo", "pettingzoo_classic", "open_spiel", "mosaic_multigrid", "ini_multigrid", "meltingpot", "overcooked", "my_family", # <-- add here )
simultaneous_only_envsin_update_multiagent_panel()– disables the AEC option for families that only support parallel mode:simultaneous_only_envs = ( "overcooked", "mosaic_multigrid", "ini_multigrid", "meltingpot", "my_family", # <-- if simultaneous-only )
Agent ID generation block – generates
agent_0,agent_1, etc. for simultaneous families:elif env_family in ("mosaic_multigrid", "ini_multigrid", "meltingpot", "overcooked", "my_family"): agent_ids = [f"agent_{i}" for i in range(num_agents)]
_is_parallel_multiagent()inmain_window.py– identifies parallel environments for the shared-environment execution path:if first_config.env_name in ( "mosaic_multigrid", "ini_multigrid", "meltingpot", "overcooked", "my_family", # <-- add here ):
Step 5: Add environment creation¶
In main_window.py, add a branch to
_create_parallel_multiagent_env() so the runtime can instantiate
your environments:
elif env_name == "my_family":
import gymnasium
import my_package.envs # triggers gymnasium.register() calls
env = gymnasium.make(task)
env.render_mode = 'rgb_array'
return env
Important
Use gymnasium.make(task) with the registered environment ID.
Do not hardcode specific class imports – this ensures all
variants (current and future) are handled correctly. See
Operator-Specific Errors
for the preview-hang bug caused by hardcoded class imports.
Both mosaic_multigrid and ini_multigrid use the modern
Gymnasium API (import gymnasium), not the deprecated OpenAI
Gym (import gym).
Step 6: Add preview rendering¶
In the _on_initialize_operator() method of main_window.py,
add an elif branch so “Load Environment” shows a preview frame:
elif env_name == "my_family":
try:
import gymnasium
import my_package.envs # noqa: F401
env = gymnasium.make(task)
env.render_mode = 'rgb_array'
env.reset()
rgb_frame = env.render()
env.close()
except ImportError as import_err:
self._status_bar.showMessage(
f"my_package not installed - cannot preview: {import_err}",
5000
)
return
except Exception as e:
self._status_bar.showMessage(
f"Cannot preview my_family {task}: {e}",
5000
)
return
Step 7: Add settings panel (optional)¶
If your family needs family-specific UI controls (like the multigrid
observation mode / coordination strategy panel), create a container
widget in the _create_multiagent_controls() method and
show/hide it based on the selected family:
if env_family in ("mosaic_multigrid", "ini_multigrid"):
self._multigrid_settings_container.show()
elif env_family == "my_family":
self._my_family_settings_container.show()
else:
self._multigrid_settings_container.hide()
self._my_family_settings_container.hide()
Environment Family Checklist¶
Use this checklist when adding a new environment family:
[ ]
ENV_FAMILIESdict has a key with all gym-registered IDs[ ]
_auto_detect_agent_count()handles the family (if multi-agent)[ ]
_get_execution_mode()returns"aec"or"parallel"[ ]
_is_multiagent_env_selected()includes the family[ ]
simultaneous_only_envstuple includes it (if parallel-only)[ ] Agent ID generation block includes it
[ ]
_is_parallel_multiagent()inmain_window.pyincludes it[ ]
_create_parallel_multiagent_env()has a creation branch[ ]
_on_initialize_operator()has a preview rendering branch[ ] All gym IDs use
gym.make()– no hardcoded class imports[ ] Preview error messages name the family clearly
[ ]
gym_gui/tests/has passing tests for the new family
Example: mosaic_multigrid / ini_multigrid Split¶
The original "multigrid" family mixed two distinct environment
sets. It was split into two independent families:
Property |
|
|
|---|---|---|
Source |
||
Type |
Competitive team sports |
Cooperative exploration |
view_size |
3 |
7 |
Env count |
13 (Soccer, Collect, Basketball) |
13 (Empty, RedBlueDoors, LockedHallway, etc.) |
ID prefix |
|
|
API |
Gymnasium ( |
Gymnasium ( |
Install |
|
|
Role assignment |
Yes (Soccer forward/defender) |
No |
Key lesson: Always use gymnasium.make(task) for environment
creation rather than importing specific classes. The mosaic_multigrid package
has 13 environments across 4 variant tiers (Original, IndAgObs,
TeamObs, Basketball). Hardcoding class imports like
SoccerGame4HEnv10x15N2 will silently break when the user selects a
different variant (e.g., MosaicMultiGrid-Soccer-2vs2-IndAgObs-v0
uses SoccerGame4HIndAgObsEnv16x11N2).