Core API Reference¶
The openenv.core package provides the core abstractions for building and running environments. For an end-to-end tutorial on building environments with OpenEnv, see the building an environment guide.
Core runtime (core)¶
Environment server primitives¶
Message
¶
Bases: TypedDict
A message in a conversation.
Compatible with Huggingface chat template format.
ModelTokenizer
¶
Bases: Protocol
Protocol for tokenizers that support chat templates.
This protocol defines the interface that tokenizers must implement to work with chat-based environments. It's compatible with Huggingface transformers tokenizers.
apply_chat_template(conversation, tokenize=True, return_tensors=None, **kwargs)
¶
Apply a chat template to format and optionally tokenize a conversation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conversation
|
list[Message]
|
List of message dictionaries with 'role' and 'content' |
required |
tokenize
|
bool
|
Whether to tokenize the output |
True
|
return_tensors
|
str | None
|
Format for returned tensors ('pt' for PyTorch) |
None
|
**kwargs
|
Any
|
Additional arguments |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
Formatted and optionally tokenized conversation |
decode(token_ids, skip_special_tokens=False, **kwargs)
¶
Decode token IDs back to text.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_ids
|
Any
|
Token IDs to decode |
required |
skip_special_tokens
|
bool
|
Whether to skip special tokens in output |
False
|
**kwargs
|
Any
|
Additional arguments |
{}
|
Returns:
| Type | Description |
|---|---|
str
|
Decoded text string |
Transform
¶
Bases: ABC, Generic[ObsT]
Transform observations to add rewards, metrics, or other modifications.
Transforms follow the TorchRL pattern where they take an observation and return a (potentially modified) observation. This allows for flexible reward computation and observation augmentation.
__call__(observation)
abstractmethod
¶
Transform an observation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
observation
|
ObsT
|
The input observation |
required |
Returns:
| Type | Description |
|---|---|
ObsT
|
The transformed observation |
Environment
¶
Bases: ABC, Generic[ActT, ObsT, StateT]
Base class for all environment servers following Gym/Gymnasium API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
transform
|
Optional[Transform[ObsT]]
|
Optional transform to apply to observations |
None
|
Class Attributes
Set this to True in your Environment subclass if:
- The environment uses proper session isolation (e.g., unique working dirs)
- No shared mutable state exists between instances
- External resources (databases, APIs) can handle concurrent access
state
abstractmethod
property
¶
Get the current environment state.
reset(seed=None, episode_id=None, **kwargs)
abstractmethod
¶
Reset the environment and return initial observation.
reset_async(seed=None, episode_id=None, **kwargs)
async
¶
Async version of reset. Default implementation calls sync reset.
Override to provide true async implementation.
step(action, timeout_s=None, **kwargs)
abstractmethod
¶
Take a step in the environment.
step_async(action, timeout_s=None, **kwargs)
async
¶
Async version of step. Default implementation calls sync step.
Override to provide true async implementation.
get_metadata()
¶
Get metadata about this environment.
Override this method to provide custom metadata for the environment. Default implementation returns basic metadata derived from class name.
Returns:
| Type | Description |
|---|---|
EnvironmentMetadata
|
EnvironmentMetadata with environment information |
close()
¶
Clean up resources used by the environment.
Override this method to implement custom cleanup logic. Called when the environment is being destroyed or reset.
HTTP server utilities¶
HTTP server wrapper for Environment instances.
This module provides utilities to wrap any Environment subclass and expose it over HTTP and WebSocket endpoints that EnvClient can consume.
HTTPEnvServer
¶
HTTP server wrapper for Environment instances.
This class wraps an Environment and exposes its reset(), step(), and state methods as HTTP and WebSocket endpoints compatible with EnvClient.
The server expects: - Action deserialization: Converts JSON dict to Action subclass - Observation serialization: Converts Observation subclass to JSON dict
Example
from core.env_server import HTTPEnvServer from envs.coding_env.server import CodeExecutionEnvironment from envs.coding_env.models import CodeAction, CodeObservation
Pass environment class (factory pattern)¶
server = HTTPEnvServer( ... env=CodeExecutionEnvironment, ... action_cls=CodeAction, ... observation_cls=CodeObservation, ... max_concurrent_envs=4, ... )
Register routes with FastAPI¶
from fastapi import FastAPI app = FastAPI() server.register_routes(app)
active_sessions
property
¶
Return the number of active WebSocket sessions.
max_concurrent_envs
property
¶
Return the maximum number of concurrent environments.
is_concurrency_safe
property
¶
Return whether the environment is marked as concurrency safe.
concurrency_config
property
¶
Return the concurrency configuration.
__init__(env, action_cls, observation_cls, max_concurrent_envs=None, concurrency_config=None)
¶
Initialize HTTP server wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
Callable[[], Environment]
|
Environment factory (callable) that creates new instances. Will be called to create a new environment for each WebSocket session. |
required |
action_cls
|
Type[Action]
|
The Action subclass this environment expects |
required |
observation_cls
|
Type[Observation]
|
The Observation subclass this environment returns |
required |
max_concurrent_envs
|
Optional[int]
|
Maximum number of concurrent WebSocket sessions. Mutually exclusive with concurrency_config. |
None
|
concurrency_config
|
Optional[ConcurrencyConfig]
|
Optional ConcurrencyConfig for advanced concurrency settings. Mutually exclusive with max_concurrent_envs. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If both max_concurrent_envs and concurrency_config are provided. |
ConcurrencyConfigurationError
|
If max_concurrent_envs > 1 for an environment that is not marked as SUPPORTS_CONCURRENT_SESSIONS. |
get_capacity_status()
¶
Get the current capacity status of the server.
Returns:
| Type | Description |
|---|---|
ServerCapacityStatus
|
ServerCapacityStatus with current session counts and availability. |
get_session_info(session_id)
¶
Get information about a specific session.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
str
|
The session ID to query |
required |
Returns:
| Type | Description |
|---|---|
Optional[SessionInfo]
|
SessionInfo if the session exists, None otherwise |
register_routes(app)
¶
Register HTTP routes on a FastAPI application.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
app
|
FastAPI
|
FastAPI application instance |
required |
create_app(env, action_cls, observation_cls, env_name=None, max_concurrent_envs=None, concurrency_config=None)
¶
Create a FastAPI application with or without web interface.
This function creates a FastAPI app with the web interface enabled by default, including README integration for better user experience.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
Callable[[], Environment]
|
Environment factory (callable) that creates new instances |
required |
action_cls
|
Type[Action]
|
The Action subclass this environment expects |
required |
observation_cls
|
Type[Observation]
|
The Observation subclass this environment returns |
required |
env_name
|
Optional[str]
|
Optional environment name for README loading |
None
|
max_concurrent_envs
|
Optional[int]
|
Maximum concurrent WebSocket sessions. Mutually exclusive with concurrency_config. |
None
|
concurrency_config
|
Optional[ConcurrencyConfig]
|
Optional ConcurrencyConfig for advanced concurrency settings. Mutually exclusive with max_concurrent_envs. |
None
|
Returns:
| Type | Description |
|---|---|
FastAPI
|
FastAPI application instance with or without web interface and README integration |
create_fastapi_app(env, action_cls, observation_cls, max_concurrent_envs=None, concurrency_config=None)
¶
Create a FastAPI application with comprehensive documentation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
Callable[[], Environment]
|
Environment factory (callable) that creates new instances |
required |
action_cls
|
Type[Action]
|
The Action subclass this environment expects |
required |
observation_cls
|
Type[Observation]
|
The Observation subclass this environment returns |
required |
max_concurrent_envs
|
Optional[int]
|
Maximum concurrent WebSocket sessions. Mutually exclusive with concurrency_config. |
None
|
concurrency_config
|
Optional[ConcurrencyConfig]
|
Optional ConcurrencyConfig for advanced concurrency settings. Mutually exclusive with max_concurrent_envs. |
None
|
Returns:
| Type | Description |
|---|---|
FastAPI
|
FastAPI application instance |
Web interface helpers¶
Web interface for OpenEnv environments.
This module provides a web-based interface for interacting with OpenEnv environments, including a two-pane layout for HumanAgent interaction and state observation.
ActionLog
¶
Bases: BaseModel
Log entry for an action taken.
EpisodeState
¶
Bases: BaseModel
Current episode state for the web interface.
WebInterfaceManager
¶
Manages the web interface for an environment.
connect_websocket(websocket)
async
¶
Connect a new WebSocket client.
disconnect_websocket(websocket)
async
¶
Disconnect a WebSocket client.
reset_environment()
async
¶
Reset the environment and update state.
step_environment(action_data)
async
¶
Execute a step in the environment and update state.
get_state()
¶
Get current environment state.
load_environment_metadata(env, env_name=None)
¶
Load environment metadata including README content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
Environment
|
The environment instance |
required |
env_name
|
Optional[str]
|
Optional environment name for README file lookup |
None
|
Returns:
| Type | Description |
|---|---|
EnvironmentMetadata
|
EnvironmentMetadata with loaded information |
create_web_interface_app(env, action_cls, observation_cls, env_name=None)
¶
Create a FastAPI application with web interface for the given environment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
env
|
Environment
|
The Environment instance to serve |
required |
action_cls
|
Type[Action]
|
The Action subclass this environment expects |
required |
observation_cls
|
Type[Observation]
|
The Observation subclass this environment returns |
required |
env_name
|
Optional[str]
|
Optional environment name for README loading |
None
|
Returns:
| Type | Description |
|---|---|
FastAPI
|
FastAPI application instance with web interface |
get_web_interface_html(action_cls, metadata=None)
¶
Generate the HTML for the web interface.
Client contracts¶
Environment client for persistent sessions.
This module provides a WebSocket-based client that maintains a persistent connection to an environment server, enabling efficient multi-step interactions without the overhead of HTTP request/response cycles.
EnvClient
¶
Bases: ABC, Generic[ActT, ObsT, StateT]
Environment client for persistent sessions.
This client maintains a persistent WebSocket connection to an environment server, enabling efficient multi-step interactions. Each client instance corresponds to a dedicated environment session on the server.
Features: - Lower latency for sequential interactions - Session state is maintained server-side - Better suited for long-running episodes
Example
from envs.coding_env.client import CodingEnv
Connect to a server¶
with CodingEnv(base_url="ws://localhost:8000") as env: ... result = env.reset(seed=42) ... while not result.done: ... action = agent.predict(result.observation) ... result = env.step(action)
__init__(base_url, connect_timeout_s=10.0, message_timeout_s=60.0, provider=None)
¶
Initialize environment client.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_url
|
str
|
Base URL of the environment server (http:// or ws://). Will be converted to ws:// if http:// is provided. |
required |
connect_timeout_s
|
float
|
Timeout for establishing WebSocket connection |
10.0
|
message_timeout_s
|
float
|
Timeout for receiving responses to messages |
60.0
|
provider
|
Optional['ContainerProvider | RuntimeProvider']
|
Optional container/runtime provider for lifecycle management. Can be a ContainerProvider (Docker) or RuntimeProvider (UV). |
None
|
connect()
¶
Establish WebSocket connection to the server.
Returns:
| Type | Description |
|---|---|
'EnvClient'
|
self for method chaining |
Raises:
| Type | Description |
|---|---|
ConnectionError
|
If connection cannot be established |
disconnect()
¶
Close the WebSocket connection.
from_docker_image(image, provider=None, **kwargs)
classmethod
¶
Create an environment client by spinning up a Docker container.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
str
|
Docker image name to run (e.g., "coding-env:latest") |
required |
provider
|
Optional['ContainerProvider']
|
Container provider to use (defaults to LocalDockerProvider) |
None
|
**kwargs
|
Any
|
Additional arguments to pass to provider.start_container() |
{}
|
Returns:
| Type | Description |
|---|---|
EnvClientT
|
Connected client instance |
from_hub(repo_id, *, use_docker=True, provider=None, **provider_kwargs)
classmethod
¶
Create a client from a Hugging Face Space.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
repo_id
|
str
|
Hugging Face space identifier |
required |
use_docker
|
bool
|
When |
True
|
provider
|
Optional['ContainerProvider | RuntimeProvider']
|
Optional provider instance to reuse. Must be a
:class: |
None
|
provider_kwargs
|
Any
|
Additional keyword arguments forwarded to
either the container provider's |
{}
|
Returns:
| Type | Description |
|---|---|
EnvClientT
|
Connected client instance |
Examples:
>>> # Pull and run from HF Docker registry
>>> env = MyEnv.from_hub("openenv/echo-env")
>>>
>>> # Run locally with UV (clones the space)
>>> env = MyEnv.from_hub("openenv/echo-env", use_docker=False)
>>>
>>> # Run from a local checkout
>>> env = MyEnv.from_hub(
... "openenv/echo-env",
... use_docker=False,
... project_path="/path/to/local/checkout"
... )
reset(**kwargs)
¶
Reset the environment with optional parameters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Any
|
Optional parameters passed to the environment's reset method. Common parameters include: - seed: Random seed for reproducibility - episode_id: Custom episode identifier |
{}
|
Returns:
| Type | Description |
|---|---|
StepResult[ObsT]
|
StepResult containing initial observation |
step(action, **kwargs)
¶
Execute an action in the environment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
action
|
ActT
|
The action to execute |
required |
**kwargs
|
Any
|
Optional parameters (currently ignored) |
{}
|
Returns:
| Type | Description |
|---|---|
StepResult[ObsT]
|
StepResult containing observation, reward, and done status |
state()
¶
Get the current environment state from the server.
Returns:
| Type | Description |
|---|---|
StateT
|
State object with environment state information |
close()
¶
Close the WebSocket connection and clean up resources.
If this client was created via from_docker_image() or from_hub(), this will also stop and remove the associated container/process.
__enter__()
¶
Enter context manager, ensuring connection is established.
__exit__(exc_type, exc_val, exc_tb)
¶
Exit context manager, closing connection.
Shared dataclasses¶
StepResult
dataclass
¶
Bases: Generic[ObsT]
Represents the result of one environment step.
Attributes:
| Name | Type | Description |
|---|---|---|
observation |
ObsT
|
The environment's observation after the action. |
reward |
Optional[float]
|
Scalar reward for this step (optional). |
done |
bool
|
Whether the episode is finished. |
Container providers¶
Container provider abstractions for running environment servers.
This module provides a pluggable architecture for different container providers (local Docker, Kubernetes, cloud providers, etc.) to be used with EnvClient.
ContainerProvider
¶
Bases: ABC
Abstract base class for container providers.
Providers implement this interface to support different container platforms: - LocalDockerProvider: Runs containers on local Docker daemon - KubernetesProvider: Runs containers in Kubernetes cluster - FargateProvider: Runs containers on AWS Fargate - CloudRunProvider: Runs containers on Google Cloud Run
The provider manages a single container lifecycle and provides the base URL for connecting to it.
Example
provider = LocalDockerProvider() base_url = provider.start_container("echo-env:latest") print(base_url) # http://localhost:8000
Use the environment via base_url¶
provider.stop_container()
start_container(image, port=None, env_vars=None, **kwargs)
abstractmethod
¶
Start a container from the specified image.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
str
|
Container image name (e.g., "echo-env:latest") |
required |
port
|
Optional[int]
|
Port to expose (if None, provider chooses) |
None
|
env_vars
|
Optional[Dict[str, str]]
|
Environment variables to pass to container |
None
|
**kwargs
|
Any
|
Provider-specific options |
{}
|
Returns:
| Type | Description |
|---|---|
str
|
Base URL to connect to the container (e.g., "http://localhost:8000") |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If container fails to start |
stop_container()
abstractmethod
¶
Stop and remove the running container.
This cleans up the container that was started by start_container().
wait_for_ready(base_url, timeout_s=30.0)
abstractmethod
¶
Wait for the container to be ready to accept requests.
This typically polls the /health endpoint until it returns 200.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_url
|
str
|
Base URL of the container |
required |
timeout_s
|
float
|
Maximum time to wait |
30.0
|
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If container doesn't become ready in time |
LocalDockerProvider
¶
Bases: ContainerProvider
Container provider for local Docker daemon.
This provider runs containers on the local machine using Docker. Useful for development and testing.
Example
provider = LocalDockerProvider() base_url = provider.start_container("echo-env:latest")
Container running on http://localhost:
¶ provider.stop_container()
__init__()
¶
Initialize the local Docker provider.
start_container(image, port=None, env_vars=None, **kwargs)
¶
Start a Docker container locally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
str
|
Docker image name |
required |
port
|
Optional[int]
|
Port to expose (if None, finds available port) |
None
|
env_vars
|
Optional[Dict[str, str]]
|
Environment variables for the container |
None
|
**kwargs
|
Any
|
Additional Docker run options |
{}
|
Returns:
| Type | Description |
|---|---|
str
|
Base URL to connect to the container |
stop_container()
¶
Stop and remove the Docker container.
wait_for_ready(base_url, timeout_s=30.0)
¶
Wait for container to be ready by polling /health endpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_url
|
str
|
Base URL of the container |
required |
timeout_s
|
float
|
Maximum time to wait |
30.0
|
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If container doesn't become ready |
DockerSwarmProvider
¶
Bases: ContainerProvider
Container provider that uses Docker Swarm services for local concurrency.
This provider creates a replicated Swarm service backed by the local Docker engine. The built-in load-balancer fans requests across the replicas, allowing multiple container instances to run concurrently on the developer workstation (mirroring the workflow described in the Docker stack docs).
__init__(*, auto_init_swarm=True, overlay_network=None)
¶
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
auto_init_swarm
|
bool
|
Whether to call |
True
|
overlay_network
|
Optional[str]
|
Optional overlay network name for the service.
When provided, the network is created with
|
None
|
start_container(image, port=None, env_vars=None, **kwargs)
¶
Start (or scale) a Swarm service for the given image.
Supported kwargs
replicas (int): Number of container replicas (default: 2).
cpu_limit (float | str): CPU limit passed to --limit-cpu.
memory_limit (str): Memory limit passed to --limit-memory.
constraints (Sequence[str]): Placement constraints.
labels (Dict[str, str]): Service labels.
command (Sequence[str] | str): Override container command.
stop_container()
¶
Remove the Swarm service (and keep the Swarm manager running).
wait_for_ready(base_url, timeout_s=30.0)
¶
Wait for at least one replica to become healthy by polling /health.
Note: With Swarm's load balancer, requests round-robin across replicas, so this only verifies that at least one replica is responding. Some replicas may still be starting when this returns.
KubernetesProvider
¶
Bases: ContainerProvider
Container provider for Kubernetes clusters.
This provider creates pods in a Kubernetes cluster and exposes them via services or port-forwarding.
Example
provider = KubernetesProvider(namespace="envtorch-dev") base_url = provider.start_container("echo-env:latest")
Pod running in k8s, accessible via service or port-forward¶
provider.stop_container()
RuntimeProvider
¶
Bases: ABC
Abstract base class for runtime providers that are not container providers.
Providers implement this interface to support different runtime platforms:
- UVProvider: Runs environments via uv run
The provider manages a single runtime lifecycle and provides the base URL for connecting to it.
Example
provider = UVProvider(project_path="/path/to/env") base_url = provider.start() print(base_url) # http://localhost:8000 provider.stop()
start(port=None, env_vars=None, **kwargs)
abstractmethod
¶
Start a runtime from the specified image.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image
|
Runtime image name |
required | |
port
|
Optional[int]
|
Port to expose (if None, provider chooses) |
None
|
env_vars
|
Optional[Dict[str, str]]
|
Environment variables for the runtime |
None
|
**kwargs
|
Any
|
Additional runtime options |
{}
|
stop()
abstractmethod
¶
Stop the runtime.
wait_for_ready(timeout_s=30.0)
abstractmethod
¶
Wait for the runtime to be ready to accept requests.
__enter__()
¶
Enter the runtime provider.
__exit__(exc_type, exc, tb)
¶
Exit the runtime provider.