Core API#
The openenv.core package provides the core abstractions for building and running environments. For an end-to-end tutorial on building environments with OpenEnv, see the building an environment guide.
Server#
Environment server primitives#
- class openenv.core.env_server.interfaces.Environment(transform: Transform[ObsT] | None = None, rubric: Rubric | None = None)#
Bases:
ABC,Generic[ActT,ObsT,StateT]Base class for all environment servers following Gym/Gymnasium API.
- Args:
transform: Optional transform to apply to observations rubric: Optional rubric for reward computation. When provided, the
rubric’s output can be used to set the observation’s reward in step().
- Class Attributes:
- SUPPORTS_CONCURRENT_SESSIONS: Whether this environment supports concurrent sessions.
When True, multiple WebSocket connections can each have their own environment instance (up to max_concurrent_envs). When False (default), the environment should only be used with a single session at a time.
Set this to True in your Environment subclass if: - The environment uses proper session isolation (e.g., unique working dirs) - No shared mutable state exists between instances - External resources (databases, APIs) can handle concurrent access
- Attributes:
- rubric: Optional rubric for computing rewards. Environments can set this
in __init__ and use it in step() to compute observation rewards. Training infrastructure can access it for introspection:
- for name, r in env.rubric.named_rubrics():
print(f”{name}: {r.last_score}”)
See RFC 004 for rubric design: rfcs/004-rubrics.md
- SUPPORTS_CONCURRENT_SESSIONS: bool = False#
- close() None#
Clean up resources used by the environment.
Override this method to implement custom cleanup logic. Called when the environment is being destroyed or reset.
- get_metadata() EnvironmentMetadata#
Get metadata about this environment.
Override this method to provide custom metadata for the environment. Default implementation returns basic metadata derived from class name.
- Returns:
EnvironmentMetadata with environment information
- abstract reset(seed: int | None = None, episode_id: str | None = None, **kwargs: Any) ObsT#
Reset the environment and return initial observation.
- async reset_async(seed: int | None = None, episode_id: str | None = None, **kwargs: Any) ObsT#
Async version of reset. Default implementation calls sync reset.
Override to provide true async implementation.
- abstract property state: StateT#
Get the current environment state.
- abstract step(action: ActT, timeout_s: float | None = None, **kwargs: Any) ObsT#
Take a step in the environment.
- async step_async(action: ActT, timeout_s: float | None = None, **kwargs: Any) ObsT#
Async version of step. Default implementation calls sync step.
Override to provide true async implementation.
- class openenv.core.env_server.interfaces.Message#
Bases:
TypedDictA message in a conversation.
Compatible with Huggingface chat template format.
- content: str#
- role: str#
- class openenv.core.env_server.interfaces.ModelTokenizer(*args, **kwargs)#
Bases:
ProtocolProtocol for tokenizers that support chat templates.
This protocol defines the interface that tokenizers must implement to work with chat-based environments. It’s compatible with Huggingface transformers tokenizers.
- apply_chat_template(conversation: list[Message], tokenize: bool = True, return_tensors: str | None = None, **kwargs: Any) Any#
Apply a chat template to format and optionally tokenize a conversation.
- Args:
conversation: List of message dictionaries with ‘role’ and ‘content’ tokenize: Whether to tokenize the output return_tensors: Format for returned tensors (‘pt’ for PyTorch) **kwargs: Additional arguments
- Returns:
Formatted and optionally tokenized conversation
- class openenv.core.env_server.interfaces.Transform#
Bases:
ABC,Generic[ObsT]Transform observations to add rewards, metrics, or other modifications.
Transforms follow the TorchRL pattern where they take an observation and return a (potentially modified) observation. This allows for flexible reward computation and observation augmentation.
Types#
- class openenv.core.env_server.types.Action(*, metadata: ~typing.Dict[str, ~typing.Any] = <factory>)#
Bases:
BaseModelBase class for all environment actions.
All action subclasses should inherit from this base class. Uses Pydantic for automatic validation and serialization.
- metadata: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class openenv.core.env_server.types.BaseMessage#
Bases:
BaseModelBase class for WebSocket messages with shared configuration.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class openenv.core.env_server.types.CodeExecResult(*, stdout: str, stderr: str, exit_code: int)#
Bases:
BaseMessageResult of code execution containing stdout, stderr, and exit code.
- exit_code: int#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- stderr: str#
- stdout: str#
- class openenv.core.env_server.types.ConcurrencyConfig(*, max_concurrent_envs: int = 1, session_timeout: float | None = None)#
Bases:
BaseMessageConfiguration for concurrent environment sessions.
- max_concurrent_envs: int#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- session_timeout: float | None#
- class openenv.core.env_server.types.EnvironmentMetadata(*, name: str, description: str, readme_content: str | None = None, version: str | None = None, author: str | None = None, documentation_url: str | None = None)#
Bases:
BaseMessageMetadata about an environment for documentation and UI purposes.
- author: str | None#
- description: str#
- documentation_url: str | None#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str#
- readme_content: str | None#
- version: str | None#
- class openenv.core.env_server.types.HealthResponse(*, status: HealthStatus = HealthStatus.HEALTHY)#
Bases:
BaseMessageResponse model for health check endpoint.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- status: HealthStatus#
- class openenv.core.env_server.types.HealthStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
str,EnumServer health status values.
- DEGRADED = 'degraded'#
- HEALTHY = 'healthy'#
- UNHEALTHY = 'unhealthy'#
- class openenv.core.env_server.types.Observation(*, done: bool = False, reward: bool | int | float | None = None, metadata: ~typing.Dict[str, ~typing.Any] = <factory>)#
Bases:
BaseModelBase class for all environment observations.
All observation subclasses should inherit from this base class. Uses Pydantic for automatic validation and serialization.
- done: bool#
- metadata: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- reward: bool | int | float | None#
- class openenv.core.env_server.types.ResetRequest(*, seed: int | None = None, episode_id: str | None = None, **extra_data: Any)#
Bases:
BaseModelRequest model for environment reset.
- episode_id: str | None#
- model_config: ClassVar[ConfigDict] = {'extra': 'allow', 'json_schema_extra': {'examples': [{'episode_id': 'episode-001', 'seed': 42}, {}]}}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- seed: int | None#
- class openenv.core.env_server.types.ResetResponse(*, observation: Dict[str, Any], reward: float | None = None, done: bool = False)#
Bases:
BaseModelResponse model for environment reset.
- done: bool#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- observation: Dict[str, Any]#
- reward: float | None#
- class openenv.core.env_server.types.SchemaResponse(*, action: Dict[str, Any], observation: Dict[str, Any], state: Dict[str, Any])#
Bases:
BaseMessageResponse model for the combined schema endpoint.
- action: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- observation: Dict[str, Any]#
- state: Dict[str, Any]#
- class openenv.core.env_server.types.ServerCapacityStatus(*, active_sessions: int, max_sessions: int)#
Bases:
BaseMessageStatus of server capacity for concurrent sessions.
- active_sessions: int#
- property available_slots: int#
Number of available session slots.
- check_capacity_bounds() ServerCapacityStatus#
- classmethod from_counts(active: int, max_sessions: int) ServerCapacityStatus#
Create status from active and max session counts.
- property is_at_capacity: bool#
Whether the server has reached maximum capacity.
- max_sessions: int#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class openenv.core.env_server.types.ServerMode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
str,EnumServer operation mode.
- PRODUCTION = 'production'#
- SIMULATION = 'simulation'#
- class openenv.core.env_server.types.SessionInfo(*, session_id: str, created_at: float, last_activity_at: float, step_count: int = 0, environment_type: str)#
Bases:
BaseMessageInformation about an active session.
- created_at: float#
- environment_type: str#
- last_activity_at: float#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- session_id: str#
- step_count: int#
- class openenv.core.env_server.types.State(*, episode_id: str | None = None, step_count: int = 0, **extra_data: Any)#
Bases:
BaseModelBase class for environment state.
Represents internal environment state, separate from observations.
- episode_id: str | None#
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'allow', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- step_count: int#
- class openenv.core.env_server.types.StepRequest(*, action: Dict[str, Any], timeout_s: float | None = None, request_id: str | None = None, **extra_data: Any)#
Bases:
BaseModelRequest model for environment step.
- action: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'extra': 'allow', 'json_schema_extra': {'examples': [{'action': {'value': 1}, 'timeout_s': 30.0}, {'action': {'value': 1}, 'render': True, 'verbose': False}]}}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- request_id: str | None#
- timeout_s: float | None#
- class openenv.core.env_server.types.StepResponse(*, observation: Dict[str, Any], reward: float | None = None, done: bool = False)#
Bases:
BaseModelResponse model for environment step.
- done: bool#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- observation: Dict[str, Any]#
- reward: float | None#
- class openenv.core.env_server.types.WSCloseMessage(*, type: Literal['close'] = 'close')#
Bases:
BaseMessageWebSocket message to close the session.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- type: Literal['close']#
- class openenv.core.env_server.types.WSErrorCode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
str,EnumWebSocket error codes for structured error handling.
- CAPACITY_REACHED = 'CAPACITY_REACHED'#
- EXECUTION_ERROR = 'EXECUTION_ERROR'#
- FACTORY_ERROR = 'FACTORY_ERROR'#
- INVALID_JSON = 'INVALID_JSON'#
- SESSION_ERROR = 'SESSION_ERROR'#
- UNKNOWN_TYPE = 'UNKNOWN_TYPE'#
- VALIDATION_ERROR = 'VALIDATION_ERROR'#
- class openenv.core.env_server.types.WSErrorResponse(*, type: Literal['error'] = 'error', data: Dict[str, Any])#
Bases:
BaseModelWebSocket response for errors.
- data: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- type: Literal['error']#
- class openenv.core.env_server.types.WSObservationResponse(*, type: Literal['observation'] = 'observation', data: Dict[str, Any])#
Bases:
BaseModelWebSocket response containing an observation.
- data: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- type: Literal['observation']#
- class openenv.core.env_server.types.WSResetMessage(*, type: ~typing.Literal['reset'] = 'reset', data: ~typing.Dict[str, ~typing.Any] = <factory>)#
Bases:
BaseMessageWebSocket message to reset the environment.
- data: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- type: Literal['reset']#
- class openenv.core.env_server.types.WSStateMessage(*, type: Literal['state'] = 'state')#
Bases:
BaseMessageWebSocket message to request current state.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- type: Literal['state']#
- class openenv.core.env_server.types.WSStateResponse(*, type: Literal['state'] = 'state', data: Dict[str, Any])#
Bases:
BaseModelWebSocket response containing environment state.
- data: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- type: Literal['state']#
- class openenv.core.env_server.types.WSStepMessage(*, type: Literal['step'] = 'step', data: Dict[str, Any])#
Bases:
BaseMessageWebSocket message to execute a step.
- data: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- type: Literal['step']#
Exceptions#
Custom exceptions for environment server operations.
- exception openenv.core.env_server.exceptions.ConcurrencyConfigurationError(environment_name: str, max_concurrent_envs: int, message: str | None = None)#
Bases:
OpenEnvErrorRaised when an environment is misconfigured for concurrent sessions.
This error is raised during server startup when max_concurrent_envs > 1 is specified for an environment that is not marked as SUPPORTS_CONCURRENT_SESSIONS.
- exception openenv.core.env_server.exceptions.EnvironmentFactoryError(factory_name: str, message: str | None = None)#
Bases:
OpenEnvErrorRaised when the environment factory fails to create an instance.
- exception openenv.core.env_server.exceptions.OpenEnvError#
Bases:
ExceptionBase exception for all OpenEnv errors.
- exception openenv.core.env_server.exceptions.SessionCapacityError(active_sessions: int, max_sessions: int, message: str | None = None)#
Bases:
OpenEnvErrorRaised when the server cannot accept new sessions due to capacity limits.
This error is raised when a new WebSocket connection is attempted but the server has already reached max_concurrent_envs active sessions.
- exception openenv.core.env_server.exceptions.SessionCreationError(reason: str, message: str | None = None)#
Bases:
OpenEnvErrorRaised when a session cannot be created.
- exception openenv.core.env_server.exceptions.SessionNotFoundError(session_id: str, message: str | None = None)#
Bases:
OpenEnvErrorRaised when attempting to access a session that does not exist.
HTTP server utilities#
HTTP server wrapper for Environment instances.
This module provides utilities to wrap any Environment subclass and expose it over HTTP and WebSocket endpoints that EnvClient can consume.
- class openenv.core.env_server.http_server.HTTPEnvServer(env: Callable[[], Environment], action_cls: Type[Action], observation_cls: Type[Observation], max_concurrent_envs: int | None = None, concurrency_config: ConcurrencyConfig | None = None)#
Bases:
objectHTTP server wrapper for Environment instances.
This class wraps an Environment and exposes its reset(), step(), and state methods as HTTP and WebSocket endpoints compatible with EnvClient.
The server expects: - Action deserialization: Converts JSON dict to Action subclass - Observation serialization: Converts Observation subclass to JSON dict
- Example:
>>> from core.env_server import HTTPEnvServer >>> from envs.coding_env.server import CodeExecutionEnvironment >>> from envs.coding_env.models import CodeAction, CodeObservation >>> >>> # Pass environment class (factory pattern) >>> server = HTTPEnvServer( ... env=CodeExecutionEnvironment, ... action_cls=CodeAction, ... observation_cls=CodeObservation, ... max_concurrent_envs=4, ... ) >>> >>> # Register routes with FastAPI >>> from fastapi import FastAPI >>> app = FastAPI() >>> server.register_routes(app)
- property active_sessions: int#
Return the number of active WebSocket sessions.
- property concurrency_config: ConcurrencyConfig#
Return the concurrency configuration.
- get_capacity_status() ServerCapacityStatus#
Get the current capacity status of the server.
- Returns:
ServerCapacityStatus with current session counts and availability.
- get_session_info(session_id: str) SessionInfo | None#
Get information about a specific session.
- Args:
session_id: The session ID to query
- Returns:
SessionInfo if the session exists, None otherwise
- property is_concurrency_safe: bool#
Return whether the environment is marked as concurrency safe.
- property max_concurrent_envs: int#
Return the maximum number of concurrent environments.
- register_routes(app: FastAPI, mode: ServerMode | str = ServerMode.SIMULATION) None#
Register HTTP routes on a FastAPI application.
- Args:
app: FastAPI application instance mode: Server mode - either SIMULATION or PRODUCTION (or string equivalents).
In production mode, simulation control endpoints (/reset, /step, /state) are NOT registered. Only safe endpoints (/health, /schema, /metadata, /ws) are available. Defaults to SIMULATION for backwards compatibility.
- Raises:
ValueError: If mode is not a valid ServerMode or string equivalent.
- openenv.core.env_server.http_server.create_app(env: Callable[[], Environment], action_cls: Type[Action], observation_cls: Type[Observation], env_name: str | None = None, max_concurrent_envs: int | None = None, concurrency_config: ConcurrencyConfig | None = None, gradio_builder: Callable[[...], Any] | None = None) FastAPI#
Create a FastAPI application with or without web interface.
This function creates a FastAPI app with the web interface enabled by default, including README integration for better user experience.
- Args:
env: Environment factory (callable) that creates new instances action_cls: The Action subclass this environment expects observation_cls: The Observation subclass this environment returns env_name: Optional environment name for README loading max_concurrent_envs: Maximum concurrent WebSocket sessions.
Mutually exclusive with concurrency_config.
- concurrency_config: Optional ConcurrencyConfig for advanced concurrency settings.
Mutually exclusive with max_concurrent_envs.
- gradio_builder: Optional callable to build a custom Gradio UI at /web.
Signature: (web_manager, action_fields, metadata, is_chat_env, title, quick_start_md) -> gr.Blocks. When None, the default Gradio app is used. See docs/customizing-web-ui.md.
- Returns:
FastAPI application instance with or without web interface and README integration
- openenv.core.env_server.http_server.create_fastapi_app(env: Callable[[], Environment], action_cls: Type[Action], observation_cls: Type[Observation], max_concurrent_envs: int | None = None, concurrency_config: ConcurrencyConfig | None = None) FastAPI#
Create a FastAPI application with comprehensive documentation.
- Args:
env: Environment factory (callable) that creates new instances action_cls: The Action subclass this environment expects observation_cls: The Observation subclass this environment returns max_concurrent_envs: Maximum concurrent WebSocket sessions.
Mutually exclusive with concurrency_config.
- concurrency_config: Optional ConcurrencyConfig for advanced concurrency settings.
Mutually exclusive with max_concurrent_envs.
- Returns:
FastAPI application instance
Web interface helpers#
Web interface for OpenEnv environments.
When ENABLE_WEB_INTERFACE is set, the server exposes a Gradio UI at /web for reset, step, and state observation. Controlled by the CLI enable_interface option (e.g. openenv push –enable-interface) or ENABLE_WEB_INTERFACE env var.
- class openenv.core.env_server.web_interface.ActionLog(*, timestamp: str, action: Dict[str, Any], observation: Dict[str, Any], reward: float | None = None, done: bool, step_count: int)#
Bases:
BaseModelLog entry for an action taken.
- action: Dict[str, Any]#
- done: bool#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- observation: Dict[str, Any]#
- reward: float | None#
- step_count: int#
- timestamp: str#
- class openenv.core.env_server.web_interface.EpisodeState(*, episode_id: str | None = None, step_count: int, current_observation: ~typing.Dict[str, ~typing.Any] | None = None, action_logs: ~typing.List[~openenv.core.env_server.web_interface.ActionLog] = <factory>, is_reset: bool = True)#
Bases:
BaseModelCurrent episode state for the web interface.
- current_observation: Dict[str, Any] | None#
- episode_id: str | None#
- is_reset: bool#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- step_count: int#
- class openenv.core.env_server.web_interface.WebInterfaceManager(env: Environment, action_cls: Type[Action], observation_cls: Type[Observation], metadata: EnvironmentMetadata | None = None)#
Bases:
objectManages the web interface for an environment.
- MAX_ACTION_LOGS = 1000#
- async connect_websocket(websocket: WebSocket)#
Connect a new WebSocket client.
- async disconnect_websocket(websocket: WebSocket)#
Disconnect a WebSocket client.
- get_state() Dict[str, Any]#
Get current environment state.
- async reset_environment() Dict[str, Any]#
Reset the environment and update state.
- async step_environment(action_data: Dict[str, Any]) Dict[str, Any]#
Execute a step in the environment and update state.
- openenv.core.env_server.web_interface.create_web_interface_app(env: Environment, action_cls: Type[Action], observation_cls: Type[Observation], env_name: str | None = None, max_concurrent_envs: int | None = None, concurrency_config: Any | None = None, gradio_builder: Callable[[...], Any] | None = None) FastAPI#
Create a FastAPI application with web interface for the given environment.
- Args:
env: The Environment instance to serve action_cls: The Action subclass this environment expects observation_cls: The Observation subclass this environment returns env_name: Optional environment name for README loading max_concurrent_envs: Maximum concurrent WebSocket sessions concurrency_config: Optional ConcurrencyConfig for advanced concurrency settings gradio_builder: Optional callable (web_manager, action_fields, metadata,
is_chat_env, title, quick_start_md) -> gr.Blocks to use instead of the default Gradio UI. Lets envs replace or customize the /web interface.
- Returns:
FastAPI application instance with web interface
- openenv.core.env_server.web_interface.get_quick_start_markdown(metadata: EnvironmentMetadata | None, action_cls: Type[Action], observation_cls: Type[Observation]) str#
Build Quick Start markdown with class names replaced from current env (init-style suffixes).
Uses the same placeholder names as the init template so that __ENV_CLASS_NAME__Env, __ENV_CLASS_NAME__Action, __ENV_CLASS_NAME__Observation and __ENV_NAME__ are replaced with the actual class/package names.
- openenv.core.env_server.web_interface.load_environment_metadata(env: Environment, env_name: str | None = None) EnvironmentMetadata#
Load environment metadata including README content.
- Args:
- env: The environment instance, class, or factory function.
If a class: used as a factory, won’t call instance methods
If a function: used as a factory, won’t call instance methods
If an instance: may call get_metadata() if available
env_name: Optional environment name for README file lookup
- Returns:
EnvironmentMetadata with loaded information
Serialization#
Shared serialization and deserialization utilities for OpenEnv HTTP servers.
This module provides common utilities for converting between JSON dictionaries and Pydantic models (Action/Observation) to eliminate code duplication across HTTP server and web interface implementations.
- openenv.core.env_server.serialization.deserialize_action(action_data: Dict[str, Any], action_cls: Type[Action]) Action#
Convert JSON dict to Action instance using Pydantic validation.
This is a basic deserialization that works for most environments. For special cases (e.g., tensor fields, custom type conversions), use deserialize_action_with_preprocessing().
- Args:
action_data: Dictionary containing action data action_cls: The Action subclass to instantiate
- Returns:
Action instance
- Raises:
ValidationError: If action_data is invalid for the action class
- Note:
This uses Pydantic’s model_validate() for automatic validation.
- openenv.core.env_server.serialization.deserialize_action_with_preprocessing(action_data: Dict[str, Any], action_cls: Type[Action]) Action#
Convert JSON dict to Action instance with preprocessing for special types.
This version handles common type conversions needed for web interfaces: - Converting lists/strings to tensors for ‘tokens’ field - Converting string action_id to int - Other custom preprocessing as needed
- Args:
action_data: Dictionary containing action data action_cls: The Action subclass to instantiate
- Returns:
Action instance
- Raises:
ValidationError: If action_data is invalid for the action class
- openenv.core.env_server.serialization.serialize_observation(observation: Observation) Dict[str, Any]#
Convert Observation instance to JSON-compatible dict using Pydantic.
- Args:
observation: Observation instance
- Returns:
Dictionary compatible with EnvClient._parse_result()
The format matches what EnvClient expects: {
“observation”: {…}, # Observation fields “reward”: float | None, “done”: bool,
}
Transforms#
Base transform implementations for composing environment-specific transforms.
Route configuration#
Route configuration utilities for declarative FastAPI route registration.
This module provides utilities to reduce boilerplate in route registration by using configuration objects instead of repeated function calls.
- class openenv.core.env_server.route_config.GetEndpointConfig(path: str, handler: Callable[[], BaseModel | dict], response_model: Type[BaseModel] | type[dict], tag: str, summary: str, description: str)#
Bases:
objectConfiguration for a simple GET endpoint.
- description: str#
- handler: Callable[[], BaseModel | dict]#
- path: str#
- response_model: Type[BaseModel] | type[dict]#
- summary: str#
- tag: str#
- openenv.core.env_server.route_config.register_get_endpoints(app: FastAPI, configs: List[GetEndpointConfig]) None#
Register multiple GET endpoints from configuration.
- Args:
app: FastAPI application instance configs: List of GET endpoint configurations
Clients#
Base client#
Environment client for persistent sessions.
This module provides a WebSocket-based client that maintains a persistent connection to an environment server, enabling efficient multi-step interactions without the overhead of HTTP request/response cycles.
The client is async by default. For synchronous usage, use the .sync() method to get a SyncEnvClient wrapper.
- Example (async):
>>> async with GenericEnvClient(base_url="ws://localhost:8000") as env: ... result = await env.reset() ... result = await env.step({"code": "print('hello')"})
- Example (sync wrapper):
>>> env = GenericEnvClient(base_url="ws://localhost:8000").sync() >>> with env: ... result = env.reset() ... result = env.step({"code": "print('hello')"})
- class openenv.core.env_client.EnvClient(base_url: str, connect_timeout_s: float = 10.0, message_timeout_s: float = 60.0, max_message_size_mb: float = 100.0, provider: 'ContainerProvider | RuntimeProvider' | None = None, mode: str | None = None)#
Bases:
ABC,Generic[ActT,ObsT,StateT]Async environment client for persistent sessions.
This client maintains a persistent WebSocket connection to an environment server, enabling efficient multi-step interactions. Each client instance corresponds to a dedicated environment session on the server.
The client is async by default. For synchronous usage, use the .sync() method to get a SyncEnvClient wrapper.
Features: - Lower latency for sequential interactions - Session state is maintained server-side - Better suited for long-running episodes - Async by default for modern Python async/await patterns
- Example (async):
>>> from envs.coding_env.client import CodingEnv >>> >>> # Connect to a server using async context manager >>> async with CodingEnv(base_url="ws://localhost:8000") as env: ... result = await env.reset(seed=42) ... while not result.done: ... action = agent.predict(result.observation) ... result = await env.step(action)
- Example (sync wrapper):
>>> env = CodingEnv(base_url="ws://localhost:8000").sync() >>> with env: ... result = env.reset(seed=42) ... result = env.step(action)
- async close() None#
Close the WebSocket connection and clean up resources.
If this client was created via from_docker_image() or from_env(), this will also stop and remove the associated container/process.
- async connect() EnvClient#
Establish WebSocket connection to the server.
- Returns:
self for method chaining
- Raises:
ConnectionError: If connection cannot be established
- async disconnect() None#
Close the WebSocket connection.
- async classmethod from_docker_image(image: str, provider: 'ContainerProvider' | None = None, **kwargs: Any) EnvClientT#
Create an environment client by spinning up a Docker container.
- Args:
image: Docker image name to run (e.g., “coding-env:latest”) provider: Container provider to use (defaults to LocalDockerProvider) **kwargs: Additional arguments to pass to provider.start_container()
- Returns:
Connected client instance
- async classmethod from_env(repo_id: str, *, use_docker: bool = True, provider: 'ContainerProvider | RuntimeProvider' | None = None, **provider_kwargs: Any) EnvClientT#
Create a client from a Hugging Face Space.
- Args:
repo_id: Hugging Face space identifier
{org}/{space}. use_docker: WhenTrue(default) pull from the HF registry andlaunch via
LocalDockerProvider. WhenFalserun the space locally withUVProvider.- provider: Optional provider instance to reuse. Must be a
ContainerProviderwhenuse_docker=Trueand aRuntimeProviderotherwise.- provider_kwargs: Additional keyword arguments forwarded to
either the container provider’s
start_container(docker) or to theUVProviderconstructor/start (uv). Whenuse_docker=False, theproject_pathargument can be used to override the default git URL (git+https://huggingface.co/spaces/{repo_id}).
- Returns:
Connected client instance
- Examples:
>>> # Pull and run from HF Docker registry >>> env = await MyEnv.from_env("openenv/echo-env") >>> >>> # Run locally with UV (clones the space) >>> env = await MyEnv.from_env("openenv/echo-env", use_docker=False) >>> >>> # Run from a local checkout >>> env = await MyEnv.from_env( ... "openenv/echo-env", ... use_docker=False, ... project_path="/path/to/local/checkout" ... )
- async reset(**kwargs: Any) StepResult[ObsT]#
Reset the environment with optional parameters.
- Args:
- **kwargs: Optional parameters passed to the environment’s reset method.
Common parameters include: - seed: Random seed for reproducibility - episode_id: Custom episode identifier
- Returns:
StepResult containing initial observation
- async state() StateT#
Get the current environment state from the server.
- Returns:
State object with environment state information
- async step(action: ActT, **kwargs: Any) StepResult[ObsT]#
Execute an action in the environment.
- Args:
action: The action to execute **kwargs: Optional parameters (currently ignored)
- Returns:
StepResult containing observation, reward, and done status
- sync() SyncEnvClient#
Return a synchronous wrapper around this async client.
Use this method when you need synchronous access to the environment without async/await syntax. This is useful for: - Integration with synchronous codebases - Interactive/REPL usage - Stopping async from “infecting” the call stack
- Returns:
SyncEnvClient wrapper that provides synchronous methods
- Example:
>>> # Create async client and get sync wrapper >>> async_client = GenericEnvClient(base_url="http://localhost:8000") >>> sync_client = async_client.sync() >>> >>> # Use synchronous API >>> with sync_client: ... result = sync_client.reset() ... result = sync_client.step({"code": "print('hello')"})
Synchronous client#
Synchronous wrapper for async EnvClient.
This module provides a SyncEnvClient that wraps an async EnvClient, allowing synchronous usage while the underlying client uses async I/O.
- Example:
>>> from openenv.core import GenericEnvClient >>> >>> # Create async client and get sync wrapper >>> async_client = GenericEnvClient(base_url="http://localhost:8000") >>> sync_client = async_client.sync() >>> >>> # Use synchronous API >>> with sync_client: ... result = sync_client.reset() ... result = sync_client.step({"code": "print('hello')"})
- class openenv.core.sync_client.SyncEnvClient(async_client: EnvClient[ActT, ObsT, StateT])#
Bases:
Generic[ActT,ObsT,StateT]Synchronous wrapper around an async EnvClient.
This class provides a synchronous interface to an async EnvClient, making it easier to use in synchronous code or to stop async from “infecting” the entire call stack.
The wrapper executes async operations on a dedicated background event loop so connection state remains bound to a single loop.
- Cleanup note:
For guaranteed resource cleanup, use with SyncEnvClient(…) or call close() explicitly. __del__ is best-effort only and may not run reliably (for example, during interpreter shutdown).
- Example:
>>> # From an async client >>> async_client = GenericEnvClient(base_url="http://localhost:8000") >>> sync_client = async_client.sync() >>> >>> # Use synchronous context manager >>> with sync_client: ... result = sync_client.reset() ... result = sync_client.step({"action": "test"})
- Attributes:
_async: The wrapped async EnvClient instance
- close() None#
Close the connection and clean up resources.
- connect() SyncEnvClient[ActT, ObsT, StateT]#
Establish connection to the server.
- Returns:
self for method chaining
- disconnect() None#
Close the connection.
- reset(**kwargs: Any) StepResult[ObsT]#
Reset the environment.
- Args:
**kwargs: Optional parameters passed to the environment’s reset method
- Returns:
StepResult containing initial observation
- state() StateT#
Get the current environment state.
- Returns:
State object with environment state information
- step(action: ActT, **kwargs: Any) StepResult[ObsT]#
Execute an action in the environment.
- Args:
action: The action to execute **kwargs: Optional parameters
- Returns:
StepResult containing observation, reward, and done status
Generic client#
Generic environment client that works with raw dictionaries.
This module provides a GenericEnvClient that doesn’t require installing environment-specific packages. It’s useful for connecting to remote servers without running any untrusted code locally.
- class openenv.core.generic_client.GenericAction(**kwargs: Any)#
Bases:
Dict[str,Any]A dictionary subclass for creating actions when using GenericEnvClient.
This provides a semantic wrapper around dictionaries to make code more readable when working with GenericEnvClient. It behaves exactly like a dict but signals intent that this is an action for an environment.
- Example:
>>> # Without GenericAction (works fine) >>> env.step({"code": "print('hello')"})
>>> # With GenericAction (more explicit) >>> action = GenericAction(code="print('hello')") >>> env.step(action)
>>> # With multiple fields >>> action = GenericAction(code="x = 1", timeout=30, metadata={"tag": "test"}) >>> env.step(action)
- Note:
GenericAction is just a dict with a constructor that accepts keyword arguments. It’s provided for symmetry with typed Action classes and to make code more readable.
- class openenv.core.generic_client.GenericEnvClient(base_url: str, connect_timeout_s: float = 10.0, message_timeout_s: float = 60.0, max_message_size_mb: float = 100.0, provider: 'ContainerProvider | RuntimeProvider' | None = None, mode: str | None = None)#
Bases:
EnvClient[Dict[str,Any],Dict[str,Any],Dict[str,Any]]Environment client that works with raw dictionaries instead of typed classes.
This client doesn’t require installing environment-specific packages, making it ideal for: - Connecting to remote servers without installing their packages - Quick prototyping and testing - Environments where type safety isn’t needed - Security-conscious scenarios where you don’t want to run remote code
The trade-off is that you lose type safety and IDE autocomplete for actions and observations. Instead of typed objects, you work with plain dictionaries.
- Example:
>>> # Direct connection to a running server (no installation needed) >>> with GenericEnvClient(base_url="http://localhost:8000") as env: ... result = env.reset() ... result = env.step({"code": "print('hello')"}) ... print(result.observation) # Dict[str, Any] ... print(result.observation.get("output"))
>>> # From local Docker image >>> env = GenericEnvClient.from_docker_image("coding-env:latest") >>> result = env.reset() >>> result = env.step({"code": "x = 1 + 2"}) >>> env.close()
>>> # From HuggingFace Hub (pulls Docker image, no pip install) >>> env = GenericEnvClient.from_env("user/my-env", use_docker=True) >>> result = env.reset() >>> env.close()
- Note:
GenericEnvClient inherits from_docker_image() and from_env() from EnvClient, so you can use it with Docker containers and HuggingFace Spaces without any package installation.
LLM client#
LLM client abstraction for calling LLM endpoints.
Provides a generic RPC abstraction: point it at an endpoint/port, tell it the protocol, and it works. OpenAI-compatible API is the first implementation, covering OpenAI, vLLM, TGI, Ollama, HuggingFace Inference API, etc.
- Usage:
client = OpenAIClient(”http://localhost”, 8000, model=”meta-llama/…”) response = await client.complete(“What is 2+2?”)
- class openenv.core.llm_client.LLMClient(endpoint: str, port: int)#
Bases:
ABCAbstract base for LLM endpoint clients.
Subclass and implement
complete()for your protocol.- Args:
endpoint: The base URL of the LLM service (e.g. “http://localhost”). port: The port the service listens on.
- property base_url: str#
Construct base URL from endpoint and port.
- class openenv.core.llm_client.OpenAIClient(endpoint: str, port: int, model: str, api_key: str | None = None, system_prompt: str | None = None, temperature: float = 0.0, max_tokens: int = 256)#
Bases:
LLMClientClient for OpenAI-compatible APIs.
Works with: OpenAI, vLLM, TGI, Ollama, HuggingFace Inference API, or any endpoint that speaks the OpenAI chat completions format.
- Args:
endpoint: The base URL (e.g. “http://localhost”). port: The port number. model: Model name to pass to the API. api_key: API key. Defaults to “not-needed” for local endpoints. system_prompt: Optional system message prepended to every request. temperature: Default sampling temperature. max_tokens: Default max tokens in the response.
Shared dataclasses#
- class openenv.core.client_types.StepResult(observation: ObsT, reward: float | None = None, done: bool = False)#
Bases:
Generic[ObsT]Represents the result of one environment step.
- Attributes:
observation: The environment’s observation after the action. reward: Scalar reward for this step (optional). done: Whether the episode is finished.
- done: bool = False#
- observation: ObsT#
- reward: float | None = None#
MCP (Model Context Protocol)#
MCP environment#
MCP Environment base class for OpenEnv.
This module provides the MCPEnvironment base class that integrates FastMCP servers with OpenEnv’s Gym-style Environment interface. It handles MCP tool discovery and invocation through the step() API, following RFC 003.
Key features: - Automatic routing of ListToolsAction and CallToolAction to MCP server - Reserved tool name validation (reset, step, state, close are protected) - Timeout handling for tool calls - Proper error categorization (tool not found, execution errors, timeouts) - Mode-aware tool registration (production vs simulation) - Code mode support via get_callables() and execute_code()
- Usage:
from fastmcp import FastMCP from openenv.core.env_server.mcp_environment import MCPEnvironment
- class MyMCPEnv(MCPEnvironment):
- def __init__(self):
mcp = FastMCP(“my-server”)
# Register mode-specific tools @self.tool(mode=”production”) def my_tool(arg: str) -> str:
return f”Production: {arg}”
@self.tool(mode=”simulation”) def my_tool(arg: str) -> str:
return f”Simulation: {arg}”
super().__init__(mcp)
- def reset(self, seed=None, episode_id=None, **kwargs):
# Reset logic here …
- def _step_impl(self, action):
# Handle non-MCP actions …
@property def state(self):
# Return current state …
- class openenv.core.env_server.mcp_environment.MCPEnvironment(mcp_server: Any, transform: Any | None = None)#
Bases:
EnvironmentBase class for environments that expose tools via MCP (Model Context Protocol).
MCPEnvironment bridges FastMCP servers with OpenEnv’s Gym-style API, allowing agents to discover and invoke MCP tools through the standard step() interface.
The class automatically handles: - ListToolsAction: Returns available tools from the MCP server - CallToolAction: Invokes a specific tool with arguments
All other actions are delegated to the abstract _step_impl() method, which subclasses must implement.
- Args:
- mcp_server: A FastMCP server instance containing tool definitions.
The server’s tools will be validated against reserved names.
transform: Optional transform to apply to observations (inherited from Environment).
- Raises:
- ValueError: If any tool in the MCP server uses a reserved name
(reset, step, state, close).
- Example:
>>> from fastmcp import FastMCP >>> mcp = FastMCP("calculator") >>> @mcp.tool() ... def add(a: int, b: int) -> int: ... return a + b >>> env = MyMCPEnvironment(mcp) >>> obs = env.step(ListToolsAction()) >>> obs.tools[0].name 'add'
- close() None#
Clean up resources used by the environment.
This method cleans up the MCP client and any other resources. Subclasses should call super().close() if they override this method.
- execute_code(code: str) Observation#
Execute Python code with tools available as callables.
This enables the CodeAct pattern where agents write Python code that calls tools directly as functions, avoiding JSON-RPC overhead.
- Args:
- code: Python code to execute. Tools are available as functions
in the execution namespace. Set a variable named ‘result’ to capture the return value.
- Returns:
Observation with result in metadata[“result”] or error in metadata[“error”].
- get_callables() Dict[str, Callable]#
Get callable functions for code mode.
Returns tool functions as direct Python callables, enabling code mode where agents write Python code that calls tools directly (no JSON-RPC overhead). Mode-specific tools are filtered by the current mode.
- Returns:
Dictionary mapping tool names to callables.
- step(action: Action, timeout_s: float | None = None, **kwargs: Any) Observation#
Execute an action in the environment.
This method routes MCP-specific actions (ListToolsAction, CallToolAction) to the appropriate handlers, while delegating all other actions to the subclass’s _step_impl() method.
- Args:
- action: The action to execute. Can be:
ListToolsAction: Returns available MCP tools
CallToolAction: Invokes a specific MCP tool
Any other Action: Delegated to _step_impl()
- timeout_s: Optional timeout in seconds for the action.
Defaults to MCP_TOOL_CALL_TIMEOUT (30s) for MCP actions.
**kwargs: Additional arguments passed to handlers.
- Returns:
- Observation appropriate to the action type:
ListToolsObservation for ListToolsAction
CallToolObservation for CallToolAction
Subclass-defined Observation for other actions
- property supports_code_mode: bool#
Check if this environment supports code mode (execute_code).
- tool(mode: str | None = None) Callable#
Decorator for registering mode-aware tools.
- Args:
- mode: Optional mode for the tool (“production” or “simulation”).
If None, tool is available in all modes.
- Returns:
A decorator function for registering tools.
- Raises:
ValueError: If mode is not None, “production”, or “simulation”.
- openenv.core.env_server.mcp_environment.get_server_tools(mcp_server: Any) Dict[str, Any]#
Get tools from a FastMCP server, compatible with both 2.x and 3.x.
- Returns:
Dictionary mapping tool names to tool objects.
MCP types#
MCP (Model Context Protocol) type definitions for OpenEnv.
This module defines strongly typed models for MCP tool discovery and invocation, following RFC 003. These types map MCP’s REST-like API (tools/list, tools/call) to Gym-style action types.
Key design decisions: - Tool discovery (list_tools) does NOT require reset() first - Reserved tool names (reset, step, state, close) are prohibited - Both step() and WebSocket /mcp paths are supported
- class openenv.core.env_server.mcp_types.CallToolAction(*, metadata: ~typing.Dict[str, ~typing.Any] = <factory>, type: ~typing.Literal['call_tool'] = 'call_tool', tool_name: str, arguments: ~typing.Dict[str, ~typing.Any] = <factory>)#
Bases:
ActionCall a specific tool via MCP.
This action triggers MCP’s tools/call operation with the specified tool name and arguments.
- arguments: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- tool_name: str#
- type: Literal['call_tool']#
- class openenv.core.env_server.mcp_types.CallToolObservation(*, done: bool = False, reward: bool | int | float | None = None, metadata: ~typing.Dict[str, ~typing.Any] = <factory>, tool_name: str, result: ~typing.Any = None, error: ~openenv.core.env_server.mcp_types.ToolError | None = None)#
Bases:
ObservationResponse from tool execution.
Contains the tool’s result or an error if the call failed. Tool-specific errors (from the tool itself) are included in the result. Transport/framework errors use the error field.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- result: Any#
- tool_name: str#
- class openenv.core.env_server.mcp_types.JsonRpcError(*, code: int, message: str, data: Any | None = None)#
Bases:
BaseModelJSON-RPC 2.0 error object.
See: https://www.jsonrpc.org/specification#error_object
- code: int#
- data: Any | None#
- classmethod from_code(code: JsonRpcErrorCode, message: str | None = None, data: Any = None) JsonRpcError#
Create an error from a standard error code.
- message: str#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class openenv.core.env_server.mcp_types.JsonRpcErrorCode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
int,EnumStandard JSON-RPC 2.0 error codes.
See: https://www.jsonrpc.org/specification#error_object
- INTERNAL_ERROR = -32603#
- INVALID_PARAMS = -32602#
- INVALID_REQUEST = -32600#
- METHOD_NOT_FOUND = -32601#
- PARSE_ERROR = -32700#
- SERVER_ERROR = -32000#
- class openenv.core.env_server.mcp_types.JsonRpcRequest(*, jsonrpc: ~typing.Literal['2.0'], method: str, params: ~typing.Dict[str, ~typing.Any] = <factory>, id: str | int | None = None)#
Bases:
BaseModelJSON-RPC 2.0 request object.
See: https://www.jsonrpc.org/specification#request_object
- id: str | int | None#
- jsonrpc: Literal['2.0']#
- method: str#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- params: Dict[str, Any]#
- class openenv.core.env_server.mcp_types.JsonRpcResponse(*, jsonrpc: Literal['2.0'] = '2.0', result: Any | None = None, error: JsonRpcError | None = None, id: str | int | None = None)#
Bases:
BaseModelJSON-RPC 2.0 response object.
Per JSON-RPC 2.0 spec, a response has either ‘result’ or ‘error’, not both. This model excludes None values during serialization to comply with the spec.
See: https://www.jsonrpc.org/specification#response_object
- error: JsonRpcError | None#
- classmethod error_response(code: JsonRpcErrorCode, message: str | None = None, data: Any = None, request_id: str | int | None = None) JsonRpcResponse#
Create an error response from a standard error code.
- id: str | int | None#
- jsonrpc: Literal['2.0']#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_dump(**kwargs) Dict[str, Any]#
Serialize to dict, excluding result or error when None (JSON-RPC compliance).
- model_dump_json(**kwargs) str#
Serialize to JSON string, excluding result or error when None (JSON-RPC compliance).
- result: Any | None#
- classmethod success(result: Any, request_id: str | int | None = None) JsonRpcResponse#
Create a success response.
- class openenv.core.env_server.mcp_types.ListToolsAction(*, metadata: ~typing.Dict[str, ~typing.Any] = <factory>, type: ~typing.Literal['list_tools'] = 'list_tools')#
Bases:
ActionRequest list of available tools from the environment.
This action triggers MCP’s tools/list operation and returns all available tools with their schemas.
Note: Does NOT require reset() to be called first.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- type: Literal['list_tools']#
- class openenv.core.env_server.mcp_types.ListToolsObservation(*, done: bool = False, reward: bool | int | float | None = None, metadata: ~typing.Dict[str, ~typing.Any] = <factory>, tools: ~typing.List[~openenv.core.env_server.mcp_types.Tool])#
Bases:
ObservationResponse containing available tools.
Returned when processing a ListToolsAction.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class openenv.core.env_server.mcp_types.McpMethod(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
str,EnumSupported MCP method names.
- TOOLS_CALL = 'tools/call'#
- TOOLS_LIST = 'tools/list'#
- class openenv.core.env_server.mcp_types.Tool(*, name: str, description: str, input_schema: Dict[str, Any])#
Bases:
BaseModelStrongly typed MCP tool specification.
Follows the MCP ToolSpec format for tool discovery. See: https://modelcontextprotocol.io/specification/2025-06-18/server/tools
- description: str#
- input_schema: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str#
- class openenv.core.env_server.mcp_types.ToolError(*, error_type: ToolErrorType, message: str)#
Bases:
BaseModelStructured error for tool execution failures.
This is used for transport/framework errors, NOT for errors returned by the tool itself (those go in the result field).
- error_type: ToolErrorType#
- message: str#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class openenv.core.env_server.mcp_types.ToolErrorType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
str,EnumTypes of errors that can occur during tool execution.
- EXECUTION_ERROR = 'execution_error'#
- INVALID_ARGS = 'invalid_args'#
- TIMEOUT = 'timeout'#
- TOOL_NOT_FOUND = 'tool_not_found'#
- TRANSPORT_ERROR = 'transport_error'#
- class openenv.core.env_server.mcp_types.WSMCPMessage(*, type: Literal['mcp'] = 'mcp', data: Dict[str, Any])#
Bases:
BaseMessageWebSocket message for MCP JSON-RPC requests.
Allows direct MCP access via WebSocket for production inference, bypassing the step() API.
- data: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- type: Literal['mcp']#
- class openenv.core.env_server.mcp_types.WSMCPResponse(*, type: str = 'mcp', data: Dict[str, Any])#
Bases:
BaseModelWebSocket response for MCP JSON-RPC.
Contains the JSON-RPC response from the MCP server.
- data: Dict[str, Any]#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- type: str#
MCP client#
MCP Client classes for tool-calling environments.
This module provides async client classes for interacting with MCP-enabled environments: - MCPClientBase: Base class with shared tool discovery - MCPToolClient: Client for tool-calling style (one tool per step)
These clients abstract away the MCP protocol details, providing a clean interface for listing and calling tools on remote environments. All clients are async by default.
Architecture Overview:
┌─────────────────────────────────────────────────────────┐
│ HTTPEnvServer │
├─────────────────────────────────────────────────────────┤
│ Simulation Mode (default): │
│ /ws → OpenEnv protocol (reset/step/state) │
│ /mcp → MCP JSON-RPC (tools/list, tools/call) │
│ /reset, /step, /state → HTTP endpoints │
├─────────────────────────────────────────────────────────┤
│ Production Mode (use_production_mode=True): │
│ /mcp → MCP JSON-RPC (tools/list, tools/call) │
│ Bypasses step() for direct tool access │
└─────────────────────────────────────────────────────────┘
Client Usage:
MCPToolClient (default) → /ws (step-based, with rewards)
MCPToolClient (production) → /mcp (direct tool access, no rewards)
- Example (async):
>>> from openenv.core.mcp_client import MCPToolClient >>> >>> async with MCPToolClient(base_url="http://localhost:8000") as env: ... # Discover available tools ... tools = await env.list_tools() ... print([t.name for t in tools]) ... ... # Call a tool ... result = await env.call_tool("echo_message", message="Hello!") ... print(result)
- Example (sync wrapper):
>>> env = MCPToolClient(base_url="http://localhost:8000").sync() >>> with env: ... tools = env.list_tools() ... result = env.call_tool("echo_message", message="Hello!")
- class openenv.core.mcp_client.MCPClientBase(base_url: str, connect_timeout_s: float = 10.0, message_timeout_s: float = 60.0, provider: Any | None = None, mode: str | None = None)#
Bases:
EnvClient[Any,Observation,State]Base class for MCP clients with tool discovery.
This class provides the common list_tools() method for discovering available tools from an MCP-enabled environment. Subclasses implement specific interaction patterns (tool-calling or CodeAct).
- Attributes:
_tools_cache: Cached list of tools (populated on first list_tools() call)
- async list_tools(use_cache: bool = True) List[Tool]#
Discover available tools from the environment.
- Args:
- use_cache: If True, return cached tools if available.
Set to False to force a fresh request.
- Returns:
List of Tool objects with name, description, and input_schema.
- Example:
>>> tools = await env.list_tools() >>> for tool in tools: ... print(f"{tool.name}: {tool.description}")
- class openenv.core.mcp_client.MCPToolClient(base_url: str, connect_timeout_s: float = 10.0, message_timeout_s: float = 60.0, provider: Any | None = None, mode: str | None = None)#
Bases:
MCPClientBaseAsync client for tool-calling style MCP interactions.
Each step invokes a single tool. Use this for traditional function-calling agent patterns where the agent decides which tool to call next.
This client provides convenience methods for tool discovery and invocation: - list_tools(): Get all available tools with their schemas - call_tool(name, **kwargs): Invoke a tool by name with arguments
- Example (async):
>>> async with MCPToolClient(base_url="http://localhost:8000") as env: ... # Reset the environment ... await env.reset() ... ... # Discover available tools ... tools = await env.list_tools() ... print([t.name for t in tools]) # ['echo_message', 'echo_with_length'] ... ... # Call a tool directly ... result = await env.call_tool("echo_message", message="Hello!") ... print(result) # "Hello!" ... ... # Or use the full action interface ... from openenv.core.env_server.mcp_types import CallToolAction ... step_result = await env.step(CallToolAction( ... tool_name="echo_with_length", ... arguments={"message": "Test"} ... )) ... print(step_result.observation.result)
- Example (sync wrapper):
>>> env = MCPToolClient(base_url="http://localhost:8000").sync() >>> with env: ... tools = env.list_tools() ... result = env.call_tool("echo_message", message="Hello!")
- async call_tool(name: str, **kwargs: Any) Any#
Call a tool by name.
This is a convenience method that creates a CallToolAction, executes it, and returns the result directly. For more control, use step() with a CallToolAction directly.
- Args:
name: Name of the tool to invoke (must match a tool from list_tools()). **kwargs: Arguments to pass to the tool. Must match the tool’s input_schema.
- Returns:
The tool’s result. The type depends on the tool being called.
- Raises:
RuntimeError: If the server returns an error response.
- Example:
>>> result = await env.call_tool("add", a=5, b=3) >>> print(result) # 8 >>> >>> result = await env.call_tool("greet", name="Claude") >>> print(result) # "Hello, Claude!"
- async get_tool(name: str) Tool | None#
Get a specific tool by name.
- Args:
name: Name of the tool to find.
- Returns:
The Tool object if found, None otherwise.
- Example:
>>> tool = await env.get_tool("echo_message") >>> if tool: ... print(tool.description) ... print(tool.input_schema)
- async has_tool(name: str) bool#
Check if a tool exists.
- Args:
name: Name of the tool to check.
- Returns:
True if the tool exists, False otherwise.
Rubrics#
Base Rubric class for reward computation.
Rubrics compute rewards from actions and observations. The API is modeled after PyTorch’s nn.Module: users implement forward(), and the framework handles child registration and hooks.
See RFC 004 for full design: rfcs/004-rubrics.md
- class openenv.core.rubrics.base.Rubric#
Bases:
ABCAbstract base class for reward computation.
A Rubric computes a reward signal from an action and observation. Subclasses implement forward() to define the reward logic.
- Usage:
- class MyRubric(Rubric):
- def forward(self, action, observation) -> float:
return 1.0 if action.valid else 0.0
rubric = MyRubric() reward = rubric(action, observation)
Child rubrics are auto-registered when assigned as attributes, enabling hierarchical composition and introspection.
- abstract forward(action: Any, observation: Any) float#
Compute the reward. Implement this in subclasses.
- Args:
action: The action taken by the agent. observation: The resulting observation.
- Returns:
Reward value (typically 0.0 to 1.0).
- get_rubric(path: str) Rubric#
Access a nested rubric by dot-separated path.
- Args:
path: Dot-separated path (e.g., “code.syntax”).
- Returns:
The rubric at the specified path.
- Raises:
KeyError: If the path does not exist.
- last_score: float | None#
- load_state_dict(state: Dict[str, Any]) None#
Load rubric configuration from checkpoint.
- named_rubrics(prefix: str = '') Iterator[Tuple[str, Rubric]]#
Iterate over all descendant rubrics with dot-separated names.
- register_forward_hook(hook: Callable[[Rubric, Any, Any, float], None]) None#
Register a hook called after forward().
- Args:
hook: Callable with signature (rubric, action, observation, result).
- register_forward_pre_hook(hook: Callable[[Rubric, Any, Any], None]) None#
Register a hook called before forward().
- Args:
hook: Callable with signature (rubric, action, observation).
- reset() None#
Reset any internal state. Override in subclasses if needed.
- state_dict() Dict[str, Any]#
Serialize rubric configuration for checkpointing.
Container rubrics for composing reward computations.
These containers provide common aggregation patterns for rubrics, similar to how PyTorch provides nn.Sequential alongside nn.Module.
See RFC 004 for full design: rfcs/004-rubrics.md
- class openenv.core.rubrics.containers.Gate(rubric: Rubric, threshold: float = 1.0)#
Bases:
RubricThreshold wrapper - returns 0 if child score is below threshold.
Useful for hard constraints like “must pass 50% of tests”.
- Usage:
rubric = Gate(PassesTests(), threshold=0.5) # Returns PassesTests() score if >= 0.5, else 0.0
- forward(action: Any, observation: Any) float#
Return child score if >= threshold, else 0. Sync version.
- class openenv.core.rubrics.containers.RubricDict(rubrics: Dict[str, Rubric] = None)#
Bases:
RubricContainer for named rubrics with keyed access.
Analogous to nn.ModuleDict. Enables keyed access for multi-task environments where different tasks require different rubrics.
- Usage:
- class AtariRubric(Rubric):
- def __init__(self):
super().__init__() self.games = RubricDict({
“pong”: PongRubric(), “breakout”: BreakoutRubric(), “space_invaders”: SpaceInvadersRubric(),
})
- def forward(self, action, obs) -> float:
return self.games[obs.game_id](action, obs)
# Access: env.rubric.games[“pong”]
- forward(action: Any, observation: Any) float#
RubricDict does not define aggregation - override in parent.
- keys() Iterator[str]#
Iterate over keys.
- class openenv.core.rubrics.containers.RubricList(rubrics: List[Rubric] = None)#
Bases:
RubricContainer for dynamic lists of rubrics.
Analogous to nn.ModuleList. Does not define aggregation - use within a parent rubric that implements custom logic.
- Usage:
- class MultiGameRubric(Rubric):
- def __init__(self, games: List[str]):
super().__init__() self.games = RubricList([GameRubric(g) for g in games])
- def forward(self, action, obs) -> float:
return self.games[obs.game_index](action, obs)
- forward(action: Any, observation: Any) float#
RubricList does not define aggregation - override in parent.
- class openenv.core.rubrics.containers.Sequential(*rubrics: Rubric)#
Bases:
RubricRun rubrics in order, fail-fast on zero.
Runs child rubrics in order. If any returns 0, stops immediately and returns 0. This implements hierarchical gating patterns where syntax checks run before execution checks.
- Usage:
- rubric = Sequential(
Gate(Compiles()), Gate(PassesTests(), threshold=0.5), WeightedSum([PassesTests(), StyleRubric()], weights=[0.7, 0.3])
)
- forward(action: Any, observation: Any) float#
Run rubrics in order, return 0 if any returns 0. Sync version.
- class openenv.core.rubrics.containers.WeightedSum(rubrics: List[Rubric], weights: List[float])#
Bases:
RubricWeighted combination of child rubrics.
Standard aggregation pattern for multi-criteria evaluation.
- Usage:
- rubric = WeightedSum(
[PassesTests(), StyleRubric()], weights=[0.7, 0.3]
)
- forward(action: Any, observation: Any) float#
Return weighted sum of child scores. Sync version.
- property weights: List[float]#
Get the weights (read-only copy).
Trajectory-based rubrics for delayed reward computation.
These rubrics accumulate trajectory data and compute rewards based on episode outcomes rather than individual steps. This supports scenarios where reward signals depend on future events:
Terminal games (chess, Go): Win/loss known only at game end
Plan execution: Plan quality depends on execution success
Multi-agent games: One player’s action quality depends on opponent response
See RFC 004 “Delayed Rewards” section for design rationale.
- class openenv.core.rubrics.trajectory.ExponentialDiscountingTrajectoryRubric(gamma: float = 0.99, intermediate_reward: float = 0.0)#
Bases:
TrajectoryRubricTrajectoryRubric with exponential discounting for credit assignment.
Per-step reward: r_t = gamma^(T-1-t) * R_final
With gamma=0.99, later steps get higher reward (they’re “closer” to the outcome). With gamma=1.0, all steps get equal reward. With gamma=0.0, only the final step gets reward.
This is the standard temporal discounting used in reinforcement learning, applied retroactively once the episode outcome is known.
- Usage:
- class ChessRubric(ExponentialDiscountingTrajectoryRubric):
- def score_trajectory(self, trajectory):
_, final_obs = trajectory[-1] outcome = final_obs.metadata.get(‘winner’) if outcome == ‘agent’: return 1.0 elif outcome == ‘opponent’: return 0.0 else: return 0.5 # Draw
rubric = ChessRubric(gamma=0.99) reward = rubric(action, obs) # 0.0 until done, then final score step_rewards = rubric.compute_step_rewards() # Discounted per-step rewards
- compute_step_rewards() List[float]#
Apply exponential discounting from final reward.
- Returns:
List of discounted rewards. step_rewards[t] = gamma^(T-1-t) * R_final where T is the trajectory length and R_final is score_trajectory().
- gamma: float#
- load_state_dict(state: Dict[str, Any]) None#
Load configuration from checkpoint.
- state_dict() Dict[str, Any]#
Serialize configuration.
- class openenv.core.rubrics.trajectory.TrajectoryRubric(intermediate_reward: float = 0.0)#
Bases:
RubricAbstract base for rubrics that score based on full trajectories.
Subclasses implement: - score_trajectory(): Compute final score from trajectory - compute_step_rewards(): Define credit assignment strategy
The __call__ method accumulates steps and returns rewards according to the subclass’s implementation.
IMPORTANT: Trajectories are stored in CPU memory to avoid GPU pressure. Environments with GPU tensors in observations must move them to CPU before returning from step().
Known limitation: Very long episodes (thousands of steps) may consume significant CPU memory. For such cases, consider streaming rubrics.
- Usage:
- class WinLossRubric(TrajectoryRubric):
- def score_trajectory(self, trajectory):
_, final_obs = trajectory[-1] return 1.0 if final_obs.metadata.get(‘won’) else 0.0
- def compute_step_rewards(self):
# Equal credit to all steps score = self.score_trajectory(self._trajectory) return [score] * len(self._trajectory)
rubric = WinLossRubric() for action, obs in episode:
reward = rubric(action, obs) # 0.0 until done
step_rewards = rubric.compute_step_rewards() # Credit assignment
- abstract compute_step_rewards() List[float]#
Compute per-step rewards from the accumulated trajectory.
- Returns:
List of rewards, one per step. Length matches len(trajectory).
Define your credit assignment strategy here (e.g., discounting, assigning all credit to specific steps, etc.).
- forward(action: Any, observation: Any) float#
Accumulate step and return reward.
Returns intermediate_reward until done, then computes trajectory score.
- Args:
action: The action taken. observation: The resulting observation. Must have a ‘done’ attribute.
- Returns:
intermediate_reward if not done, else score_trajectory() result.
- intermediate_reward: float#
- load_state_dict(state: Dict[str, Any]) None#
Load configuration from checkpoint.
- reset() None#
Clear accumulated trajectory. Call on env.reset().
- abstract score_trajectory(trajectory: List[Tuple[Any, Any]]) float#
Score the complete trajectory. Return 0.0-1.0.
Called when observation.done=True.
- Args:
trajectory: List of (action, observation) tuples.
- Returns:
Final trajectory score (typically 0.0 to 1.0).
- state_dict() Dict[str, Any]#
Serialize configuration (not trajectory data).
- property trajectory: List[Tuple[Any, Any]]#
Current trajectory (read-only copy).
LLM-as-a-judge rubric for reward computation.
Uses an LLM endpoint (via LLMClient) to evaluate agent actions/observations.
- Usage:
client = OpenAIClient(”http://localhost”, 8000, model=”meta-llama/…”) judge = LLMJudge(
prompt_template=”Rate this code solution:n{action}nnScore (0-1):”, client=client,
) score = await judge(action, observation)
See RFC 004 for full design: rfcs/004-rubrics.md
- class openenv.core.rubrics.llm_judge.LLMJudge(prompt_template: str, client: LLMClient, *, score_pattern: str | None = None, default_score: float = 0.0, normalize: bool = True)#
Bases:
RubricRubric that uses an LLM to evaluate agent actions/observations.
The prompt template is formatted with
{action}and{observation}placeholders. The LLM response is parsed for a numeric score.- Args:
prompt_template: Template string with {action} and {observation} placeholders. client: An LLMClient instance for making LLM calls. score_pattern: Regex to extract the score from the LLM response.
Defaults to matching the first decimal number.
default_score: Score returned when parsing fails. normalize: If True, clamp extracted score to [0, 1].
- async forward(action: Any, observation: Any) float#
Evaluate by sending a prompt to the LLM and parsing the score.
- Args:
action: The action taken by the agent. observation: The resulting observation.
- Returns:
Parsed score from the LLM response.
- load_state_dict(state: Dict[str, Any]) None#
Load rubric configuration from checkpoint.
- state_dict() Dict[str, Any]#
Serialize rubric configuration.
Tools#
Git Server Client for connecting to external Gitea instance.
This module provides a lightweight client for interacting with a shared Gitea service, optimized for task-based isolation where multiple environment instances share the same Gitea server but have isolated workspaces.
- class openenv.core.tools.git_server_client.GitServerClient(gitea_url: str, username: str, password: str, workspace_dir: str = '/workspace')#
Bases:
objectClient for connecting to an external Gitea server.
This client is optimized for task-based isolation where: - Multiple tasks share the same Gitea instance - Each task has its own isolated workspace - Fast reset() via git operations (no server restart) - Repos are pre-migrated to Gitea once
- Args:
gitea_url: URL of the Gitea server (e.g., “http://gitea:3000”) username: Gitea username for authentication password: Gitea password for authentication workspace_dir: Local workspace directory for cloning repos
- Example:
>>> # Connect to shared Gitea (credentials from environment) >>> import os >>> client = GitServerClient( ... gitea_url=os.getenv("GITEA_URL"), ... username=os.getenv("GITEA_USERNAME"), ... password=os.getenv("GITEA_PASSWORD") ... ) >>> client.wait_for_ready() >>> # Clone repo to workspace >>> path = client.clone_to_workspace("my-repo", commit="abc123") >>> # Fast reset to base state >>> client.reset_workspace("my-repo", commit="abc123")
- clone_to_workspace(repo_name: str, target_dir: str | None = None, commit: str = 'main') str#
Clone a repository to the workspace at a specific commit.
This creates a fresh clone optimized for task isolation.
- Args:
repo_name: Name of repository to clone target_dir: Target directory name (defaults to repo_name) commit: Commit hash or branch to check out
- Returns:
Path to cloned repository
- Raises:
RuntimeError: If clone fails
- execute_git_command(command: str, working_dir: str = '') tuple[int, str, str]#
Execute a git command in the workspace.
- Args:
command: Git command to execute (without ‘git’ prefix) working_dir: Working directory relative to workspace
- Returns:
Tuple of (exit_code, stdout, stderr)
- get_current_commit(repo_name: str) str#
Get current commit hash of a workspace repository.
- Args:
repo_name: Name of repository in workspace
- Returns:
Commit hash
- list_repositories() list[dict[str, str]]#
List all repositories in Gitea.
- Returns:
List of repository information dictionaries
- reset_workspace(repo_name: str, commit: str = 'main') bool#
Fast reset of workspace to base state (optimized for task resets).
This is much faster than re-cloning. It: 1. Checks out the target commit 2. Resets to that commit (hard) 3. Cleans untracked files
- Args:
repo_name: Name of repository (directory in workspace) commit: Commit hash or branch to reset to
- Returns:
True if reset successful
- Raises:
RuntimeError: If reset fails
- wait_for_ready(timeout: int = 30) bool#
Wait for Gitea server to be ready.
- Args:
timeout: Maximum seconds to wait
- Returns:
True if server is ready, False otherwise
- workspace_exists(repo_name: str) bool#
Check if a repository exists in workspace.
- class openenv.core.tools.git_server_client.RepoInfo(name: str, url: str, commit: str, clone_url: str)#
Bases:
objectInformation about a repository.
- clone_url: str#
- commit: str#
- name: str#
- url: str#
Local Python Executor (enhanced).
This module provides a safer wrapper around smolagents.LocalPythonExecutor with improved exception handling and a few helpful tools registered with the executor to make debugging executed code easier.
Key improvements: - Register a few helper utilities via send_tools so user code can use
them for reporting (e.g. format_exc).
More robust extraction of stdout/stderr/exit codes from the executor result object, tolerant to different versions of smolagents.
Detailed stderr on unexpected exceptions including full traceback.
Structured logging for operational visibility.
- class openenv.core.tools.local_python_executor.PyExecutor(additional_imports: list[str] | None = None)#
Bases:
objectWrapper around smolagents LocalPythonExecutor.
The wrapper registers a few non-privileged helper tools to the LocalPythonExecutor that can be used by the executed code to format exceptions and to safely stringify results for improved error reporting.
- run(code: str) CodeExecResult#
Execute Python code and return a CodeExecResult.
This method is intentionally defensive: it attempts to extract meaningful stdout/stderr/exit_code information from a variety of possible return shapes that different versions of smolagents may provide.
Container providers#
Container provider abstractions for running environment servers.
This module provides a pluggable architecture for different container providers (local Docker, Kubernetes, cloud providers, etc.) to be used with EnvClient.
- class openenv.core.containers.runtime.providers.ContainerProvider#
Bases:
ABCAbstract base class for container providers.
Providers implement this interface to support different container platforms: - LocalDockerProvider: Runs containers on local Docker daemon - KubernetesProvider: Runs containers in Kubernetes cluster - FargateProvider: Runs containers on AWS Fargate - CloudRunProvider: Runs containers on Google Cloud Run
The provider manages a single container lifecycle and provides the base URL for connecting to it.
- Example:
>>> provider = LocalDockerProvider() >>> base_url = provider.start_container("echo-env:latest") >>> print(base_url) # http://localhost:8000 >>> # Use the environment via base_url >>> provider.stop_container()
- abstract start_container(image: str, port: int | None = None, env_vars: Dict[str, str] | None = None, **kwargs: Any) str#
Start a container from the specified image.
- Args:
image: Container image name (e.g., “echo-env:latest”) port: Port to expose (if None, provider chooses) env_vars: Environment variables to pass to container **kwargs: Provider-specific options
- Returns:
Base URL to connect to the container (e.g., “http://localhost:8000”)
- Raises:
RuntimeError: If container fails to start
- abstract stop_container() None#
Stop and remove the running container.
This cleans up the container that was started by start_container().
- abstract wait_for_ready(base_url: str, timeout_s: float = 30.0) None#
Wait for the container to be ready to accept requests.
This typically polls the /health endpoint until it returns 200.
- Args:
base_url: Base URL of the container timeout_s: Maximum time to wait
- Raises:
TimeoutError: If container doesn’t become ready in time
- class openenv.core.containers.runtime.providers.DockerSwarmProvider(*, auto_init_swarm: bool = True, overlay_network: str | None = None)#
Bases:
ContainerProviderContainer provider that uses Docker Swarm services for local concurrency.
This provider creates a replicated Swarm service backed by the local Docker engine. The built-in load-balancer fans requests across the replicas, allowing multiple container instances to run concurrently on the developer workstation (mirroring the workflow described in the Docker stack docs).
- start_container(image: str, port: int | None = None, env_vars: Dict[str, str] | None = None, **kwargs: Any) str#
Start (or scale) a Swarm service for the given image.
- Supported kwargs:
replicas (int): Number of container replicas (default: 2). cpu_limit (float | str): CPU limit passed to
--limit-cpu. memory_limit (str): Memory limit passed to--limit-memory. constraints (Sequence[str]): Placement constraints. labels (Dict[str, str]): Service labels. command (Sequence[str] | str): Override container command.
- stop_container() None#
Remove the Swarm service (and keep the Swarm manager running).
- wait_for_ready(base_url: str, timeout_s: float = 30.0) None#
Wait for at least one replica to become healthy by polling /health.
Note: With Swarm’s load balancer, requests round-robin across replicas, so this only verifies that at least one replica is responding. Some replicas may still be starting when this returns.
- class openenv.core.containers.runtime.providers.KubernetesProvider#
Bases:
ContainerProviderContainer provider for Kubernetes clusters.
This provider creates pods in a Kubernetes cluster and exposes them via services or port-forwarding.
- Example:
>>> provider = KubernetesProvider(namespace="envtorch-dev") >>> base_url = provider.start_container("echo-env:latest") >>> # Pod running in k8s, accessible via service or port-forward >>> provider.stop_container()
- class openenv.core.containers.runtime.providers.LocalDockerProvider#
Bases:
ContainerProviderContainer provider for local Docker daemon.
This provider runs containers on the local machine using Docker. Useful for development and testing.
- Example:
>>> provider = LocalDockerProvider() >>> base_url = provider.start_container("echo-env:latest") >>> # Container running on http://localhost:<random-port> >>> provider.stop_container()
- start_container(image: str, port: int | None = None, env_vars: Dict[str, str] | None = None, **kwargs: Any) str#
Start a Docker container locally.
- Args:
image: Docker image name port: Port to expose (if None, finds available port) env_vars: Environment variables for the container **kwargs: Additional Docker run options
- Returns:
Base URL to connect to the container
- stop_container() None#
Stop and remove the Docker container.
- wait_for_ready(base_url: str, timeout_s: float = 30.0) None#
Wait for container to be ready by polling /health endpoint.
- Args:
base_url: Base URL of the container timeout_s: Maximum time to wait
- Raises:
TimeoutError: If container doesn’t become ready
- class openenv.core.containers.runtime.providers.RuntimeProvider#
Bases:
ABCAbstract base class for runtime providers that are not container providers. Providers implement this interface to support different runtime platforms: - UVProvider: Runs environments via uv run
The provider manages a single runtime lifecycle and provides the base URL for connecting to it.
- Example:
>>> provider = UVProvider(project_path="/path/to/env") >>> base_url = provider.start() >>> print(base_url) # http://localhost:8000 >>> provider.stop()
- abstract start(port: int | None = None, env_vars: Dict[str, str] | None = None, **kwargs: Any) str#
Start a runtime from the specified image.
- Args:
image: Runtime image name port: Port to expose (if None, provider chooses) env_vars: Environment variables for the runtime **kwargs: Additional runtime options
- abstract stop() None#
Stop the runtime.
- abstract wait_for_ready(timeout_s: float = 30.0) None#
Wait for the runtime to be ready to accept requests.
Providers for launching ASGI applications via uv run.
- class openenv.core.containers.runtime.uv_provider.UVProvider(*, project_path: str, app: str = 'server.app:app', host: str = '0.0.0.0', reload: bool = False, env_vars: Dict[str, str] | None = None, context_timeout_s: float = 60.0)#
Bases:
RuntimeProviderRuntimeProvider implementation backed by
uv run.- Args:
project_path: Local path to a uv project (passed to
uv run --project) app: ASGI application path for uvicorn (defaults toserver.app:app) host: Host interface to bind to (defaults to0.0.0.0) reload: Whether to enable uvicorn’s reload mode env_vars: Environment variables to pass through to the spawned process context_timeout_s: How long to wait for the environment to become ready- Example:
>>> provider = UVProvider(project_path="/path/to/env") >>> base_url = provider.start() >>> print(base_url) # http://localhost:8000 >>> # Use the environment via base_url >>> provider.stop()
- property base_url: str#
The base URL of the environment.
- Returns:
The base URL of the environment
- Raises:
RuntimeError: If the environment is not running
- start(port: int | None = None, env_vars: Dict[str, str] | None = None, workers: int = 1, **_: Dict[str, str]) str#
Start the environment via uv run.
- Args:
port: The port to bind the environment to env_vars: Environment variables to pass to the environment workers: The number of workers to use
- Returns:
The base URL of the environment
- Raises:
RuntimeError: If the environment is already running
- stop() None#
Stop the environment.
- Raises:
RuntimeError: If the environment is not running
- wait_for_ready(timeout_s: float = 60.0) None#
Wait for the environment to become ready.
- Args:
timeout_s: The timeout to wait for the environment to become ready
- Raises:
RuntimeError: If the environment is not running TimeoutError: If the environment does not become ready within the timeout