Source code for gym_gui.services.operator_script_execution_manager

"""Script Execution Manager for automated baseline operator experiments.

This module provides the OperatorScriptExecutionManager class that orchestrates
automated execution of baseline experiments defined in Python scripts.

Key responsibilities:
- Episode state machine (launch → reset → step → episode_end → next_episode)
- Seed progression across episodes
- Operator subprocess lifecycle management
- Progress tracking and reporting

Architecture:
    ScriptExperimentWidget (UI)
        → OperatorScriptExecutionManager (Logic)
        → MainWindow (Communication)
        → Operator Subprocesses

Separation from Manual Mode:
    Manual Mode uses OperatorsTab with shared seed execution.
    Script Mode uses this manager with predefined seed sequences.
    They operate completely independently without shared state.
"""

from __future__ import annotations

import logging
from typing import Any, Dict, List, Optional

from PyQt6 import QtCore
from PyQt6.QtCore import QTimer, pyqtSignal

from gym_gui.services.operator import OperatorConfig

_LOGGER = logging.getLogger(__name__)


[docs] class OperatorScriptExecutionManager(QtCore.QObject): """Manages automated execution of baseline operator experiments from scripts. This class implements an event-driven state machine for running multiple episodes with different seeds automatically. It handles: - Launching operators with initial seeds - Stepping operators until episode completion - Advancing to next episode with new seed - Tracking progress and emitting updates Signals: launch_operator: Request to launch operator subprocess reset_operator: Request to reset operator with new seed step_operator: Request to step operator stop_operator: Request to stop operator progress_updated: Progress notification for UI experiment_completed: Experiment finished notification """ # Operator control signals (sent to MainWindow) launch_operator = pyqtSignal(str, object, int) # operator_id, config, seed reset_operator = pyqtSignal(str, int) # operator_id, seed step_operator = pyqtSignal(str) # operator_id stop_operator = pyqtSignal(str) # operator_id # Progress signals (sent to ScriptExperimentWidget) progress_updated = pyqtSignal(int, int, int) # episode_num, total_episodes, seed experiment_completed = pyqtSignal(int) # num_episodes def __init__(self, parent: Optional[QtCore.QObject] = None): """Initialize script execution manager. Args: parent: Optional parent QObject for Qt ownership. """ super().__init__(parent) # Execution state self._is_running: bool = False self._operator_configs: List[OperatorConfig] = [] self._seeds: List[int] = [] self._current_episode_idx: int = 0 self._waiting_for_response: bool = False # Step pacing: delay between steps to allow Qt paint events to render each frame. # Without this, steps fire faster than the UI can repaint, causing visual jitter. self._step_delay_ms: int = 50 # default 50ms between steps # Environment mode: "procedural" = different seed per episode (ProcGen-style), # "fixed" = same seed every episode (isolate agent variance). self._environment_mode: str = "procedural" # Track which operators are running self._operator_states: Dict[str, str] = {} # operator_id -> "stopped", "running"
[docs] def start_experiment( self, operator_configs: List[OperatorConfig], execution_config: Dict[str, Any] ) -> None: """Start automated experiment execution. Args: operator_configs: List of operator configurations from script. execution_config: Execution settings (num_episodes, seeds). """ if self._is_running: _LOGGER.warning("Experiment already running, ignoring start request") return # Extract execution parameters num_episodes = execution_config.get("num_episodes", 10) seeds = list(execution_config.get("seeds", range(1000, 1000 + num_episodes))) self._step_delay_ms = execution_config.get("step_delay_ms", 50) self._environment_mode = execution_config.get("environment_mode", "procedural") # Extend seeds if user requested more episodes than script defined if len(seeds) < num_episodes: last_seed = seeds[-1] if seeds else 1000 seeds.extend(range(last_seed + 1, last_seed + 1 + (num_episodes - len(seeds)))) # Initialize state self._is_running = True self._operator_configs = operator_configs self._seeds = seeds[:num_episodes] self._current_episode_idx = 0 self._waiting_for_response = False # Initialize operator states self._operator_states = { cfg.operator_id: "stopped" for cfg in operator_configs } _LOGGER.info( f"Starting script experiment: {len(operator_configs)} operators, " f"{len(self._seeds)} episodes, seeds {self._seeds[0]}-{self._seeds[-1]}, " f"environment_mode={self._environment_mode}" ) # Check if operators are already running (from previous experiment) operators_already_running = all( self._operator_states.get(cfg.operator_id) == "running" for cfg in operator_configs ) # Emit progress for first episode first_seed = self._seeds[0] self.progress_updated.emit(1, len(self._seeds), first_seed) if operators_already_running: # Operators exist - reset with first seed _LOGGER.info("Operators already running, resetting with first seed") self._waiting_for_response = True for config in operator_configs: self.reset_operator.emit(config.operator_id, first_seed) else: # Launch new operators with first seed _LOGGER.info(f"Launching {len(operator_configs)} operators with seed {first_seed}") self._waiting_for_response = True for config in operator_configs: self.launch_operator.emit(config.operator_id, config, first_seed)
[docs] def stop_experiment(self) -> None: """Stop running experiment.""" if not self._is_running: return _LOGGER.info("Stopping script experiment") self._is_running = False self._waiting_for_response = False # Stop all running operators for operator_id in list(self._operator_states.keys()): if self._operator_states.get(operator_id) == "running": self.stop_operator.emit(operator_id) self._operator_states[operator_id] = "stopped"
[docs] def on_ready_received(self, operator_id: str) -> None: """Handle ready response from operator (after reset/launch). This triggers automatic stepping. Args: operator_id: ID of the operator that is ready. """ # Mark operator as running self._operator_states[operator_id] = "running" _LOGGER.info( f"on_ready_received: operator_id={operator_id}, " f"is_running={self._is_running}, waiting={self._waiting_for_response}" ) if not self._is_running or not self._waiting_for_response: _LOGGER.info("Not in automatic execution mode, ignoring ready") return # Start automatic stepping for this operator _LOGGER.info(f"Starting automatic stepping for {operator_id}") self.step_operator.emit(operator_id)
[docs] def on_step_received(self, operator_id: str) -> None: """Handle step response from operator. Triggers the next step after a pacing delay, giving Qt's event loop time to process paint events so the render view updates each frame. Args: operator_id: ID of the operator that completed a step. """ _LOGGER.debug( f"on_step_received: operator_id={operator_id}, " f"is_running={self._is_running}, waiting={self._waiting_for_response}" ) if not self._is_running or not self._waiting_for_response: return # Pace the step loop: delay before sending next step so Qt can paint # the current frame. Without this, paint events pile up and the render # view shows visual jumps (e.g., step 26 → 37). def _emit_next_step(): if self._is_running and self._waiting_for_response: self.step_operator.emit(operator_id) QTimer.singleShot(self._step_delay_ms, _emit_next_step)
[docs] def on_episode_ended( self, operator_id: str, terminated: bool, truncated: bool ) -> None: """Handle episode end from operator. This advances to the next episode or completes the experiment. Args: operator_id: ID of the operator that finished an episode. terminated: Whether episode terminated naturally. truncated: Whether episode was truncated. """ _LOGGER.info( f"on_episode_ended: operator_id={operator_id}, " f"terminated={terminated}, truncated={truncated}, " f"current_episode={self._current_episode_idx + 1}" ) if not self._is_running: _LOGGER.info("Not running, ignoring episode end") return # Move to next episode self._current_episode_idx += 1 self._waiting_for_response = False # Check if all episodes complete if self._current_episode_idx >= len(self._seeds): _LOGGER.info(f"All {len(self._seeds)} episodes completed") self._is_running = False self.experiment_completed.emit(len(self._seeds)) return # Start next episode self._start_next_episode()
def _start_next_episode(self) -> None: """Start the next episode with the next seed.""" if not self._is_running: return if self._current_episode_idx >= len(self._seeds): # Should not happen, but safety check _LOGGER.warning("Attempted to start episode beyond seed list") return # Fixed mode: reuse seeds[0] every episode (identical layout for credit assignment) # Procedural mode: advance seed per episode (generalization across layouts) if self._environment_mode == "fixed": seed = self._seeds[0] else: seed = self._seeds[self._current_episode_idx] episode_num = self._current_episode_idx + 1 _LOGGER.info(f"Starting episode {episode_num}/{len(self._seeds)} with seed {seed}") # Emit progress update self.progress_updated.emit(episode_num, len(self._seeds), seed) # Send reset to all operators self._waiting_for_response = True for config in self._operator_configs: self.reset_operator.emit(config.operator_id, seed) @property def is_running(self) -> bool: """Check if experiment is currently running.""" return self._is_running @property def current_episode(self) -> int: """Get current episode number (1-indexed).""" return self._current_episode_idx + 1 if self._is_running else 0 @property def total_episodes(self) -> int: """Get total number of episodes.""" return len(self._seeds) @property def step_delay_ms(self) -> int: """Get step pacing delay in milliseconds.""" return self._step_delay_ms @step_delay_ms.setter def step_delay_ms(self, value: int) -> None: """Set step pacing delay (can be adjusted during execution).""" self._step_delay_ms = max(0, value)