Skip to content

Core API Reference

The openenv.core package provides the core abstractions for building and running environments. For an end-to-end tutorial on building environments with OpenEnv, see the building an environment guide.

Core runtime (core)

Environment server primitives

Message

Bases: TypedDict

A message in a conversation.

Compatible with Huggingface chat template format.

ModelTokenizer

Bases: Protocol

Protocol for tokenizers that support chat templates.

This protocol defines the interface that tokenizers must implement to work with chat-based environments. It's compatible with Huggingface transformers tokenizers.

apply_chat_template(conversation, tokenize=True, return_tensors=None, **kwargs)

Apply a chat template to format and optionally tokenize a conversation.

Parameters:

Name Type Description Default
conversation list[Message]

List of message dictionaries with 'role' and 'content'

required
tokenize bool

Whether to tokenize the output

True
return_tensors str | None

Format for returned tensors ('pt' for PyTorch)

None
**kwargs Any

Additional arguments

{}

Returns:

Type Description
Any

Formatted and optionally tokenized conversation

decode(token_ids, skip_special_tokens=False, **kwargs)

Decode token IDs back to text.

Parameters:

Name Type Description Default
token_ids Any

Token IDs to decode

required
skip_special_tokens bool

Whether to skip special tokens in output

False
**kwargs Any

Additional arguments

{}

Returns:

Type Description
str

Decoded text string

Transform

Bases: ABC, Generic[ObsT]

Transform observations to add rewards, metrics, or other modifications.

Transforms follow the TorchRL pattern where they take an observation and return a (potentially modified) observation. This allows for flexible reward computation and observation augmentation.

__call__(observation) abstractmethod

Transform an observation.

Parameters:

Name Type Description Default
observation ObsT

The input observation

required

Returns:

Type Description
ObsT

The transformed observation

Environment

Bases: ABC, Generic[ActT, ObsT, StateT]

Base class for all environment servers following Gym/Gymnasium API.

Parameters:

Name Type Description Default
transform Optional[Transform[ObsT]]

Optional transform to apply to observations

None
Class Attributes
Set this to True in your Environment subclass if:
- The environment uses proper session isolation (e.g., unique working dirs)
- No shared mutable state exists between instances
- External resources (databases, APIs) can handle concurrent access

state abstractmethod property

Get the current environment state.

reset(seed=None, episode_id=None, **kwargs) abstractmethod

Reset the environment and return initial observation.

reset_async(seed=None, episode_id=None, **kwargs) async

Async version of reset. Default implementation calls sync reset.

Override to provide true async implementation.

step(action, timeout_s=None, **kwargs) abstractmethod

Take a step in the environment.

step_async(action, timeout_s=None, **kwargs) async

Async version of step. Default implementation calls sync step.

Override to provide true async implementation.

get_metadata()

Get metadata about this environment.

Override this method to provide custom metadata for the environment. Default implementation returns basic metadata derived from class name.

Returns:

Type Description
EnvironmentMetadata

EnvironmentMetadata with environment information

close()

Clean up resources used by the environment.

Override this method to implement custom cleanup logic. Called when the environment is being destroyed or reset.

HTTP server utilities

HTTP server wrapper for Environment instances.

This module provides utilities to wrap any Environment subclass and expose it over HTTP and WebSocket endpoints that EnvClient can consume.

HTTPEnvServer

HTTP server wrapper for Environment instances.

This class wraps an Environment and exposes its reset(), step(), and state methods as HTTP and WebSocket endpoints compatible with EnvClient.

The server expects: - Action deserialization: Converts JSON dict to Action subclass - Observation serialization: Converts Observation subclass to JSON dict

Example

from core.env_server import HTTPEnvServer from envs.coding_env.server import CodeExecutionEnvironment from envs.coding_env.models import CodeAction, CodeObservation

Pass environment class (factory pattern)

server = HTTPEnvServer( ... env=CodeExecutionEnvironment, ... action_cls=CodeAction, ... observation_cls=CodeObservation, ... max_concurrent_envs=4, ... )

Register routes with FastAPI

from fastapi import FastAPI app = FastAPI() server.register_routes(app)

active_sessions property

Return the number of active WebSocket sessions.

max_concurrent_envs property

Return the maximum number of concurrent environments.

is_concurrency_safe property

Return whether the environment is marked as concurrency safe.

concurrency_config property

Return the concurrency configuration.

__init__(env, action_cls, observation_cls, max_concurrent_envs=None, concurrency_config=None)

Initialize HTTP server wrapper.

Parameters:

Name Type Description Default
env Callable[[], Environment]

Environment factory (callable) that creates new instances. Will be called to create a new environment for each WebSocket session.

required
action_cls Type[Action]

The Action subclass this environment expects

required
observation_cls Type[Observation]

The Observation subclass this environment returns

required
max_concurrent_envs Optional[int]

Maximum number of concurrent WebSocket sessions. Mutually exclusive with concurrency_config.

None
concurrency_config Optional[ConcurrencyConfig]

Optional ConcurrencyConfig for advanced concurrency settings. Mutually exclusive with max_concurrent_envs.

None

Raises:

Type Description
ValueError

If both max_concurrent_envs and concurrency_config are provided.

ConcurrencyConfigurationError

If max_concurrent_envs > 1 for an environment that is not marked as SUPPORTS_CONCURRENT_SESSIONS.

get_capacity_status()

Get the current capacity status of the server.

Returns:

Type Description
ServerCapacityStatus

ServerCapacityStatus with current session counts and availability.

get_session_info(session_id)

Get information about a specific session.

Parameters:

Name Type Description Default
session_id str

The session ID to query

required

Returns:

Type Description
Optional[SessionInfo]

SessionInfo if the session exists, None otherwise

register_routes(app)

Register HTTP routes on a FastAPI application.

Parameters:

Name Type Description Default
app FastAPI

FastAPI application instance

required

create_app(env, action_cls, observation_cls, env_name=None, max_concurrent_envs=None, concurrency_config=None)

Create a FastAPI application with or without web interface.

This function creates a FastAPI app with the web interface enabled by default, including README integration for better user experience.

Parameters:

Name Type Description Default
env Callable[[], Environment]

Environment factory (callable) that creates new instances

required
action_cls Type[Action]

The Action subclass this environment expects

required
observation_cls Type[Observation]

The Observation subclass this environment returns

required
env_name Optional[str]

Optional environment name for README loading

None
max_concurrent_envs Optional[int]

Maximum concurrent WebSocket sessions. Mutually exclusive with concurrency_config.

None
concurrency_config Optional[ConcurrencyConfig]

Optional ConcurrencyConfig for advanced concurrency settings. Mutually exclusive with max_concurrent_envs.

None

Returns:

Type Description
FastAPI

FastAPI application instance with or without web interface and README integration

create_fastapi_app(env, action_cls, observation_cls, max_concurrent_envs=None, concurrency_config=None)

Create a FastAPI application with comprehensive documentation.

Parameters:

Name Type Description Default
env Callable[[], Environment]

Environment factory (callable) that creates new instances

required
action_cls Type[Action]

The Action subclass this environment expects

required
observation_cls Type[Observation]

The Observation subclass this environment returns

required
max_concurrent_envs Optional[int]

Maximum concurrent WebSocket sessions. Mutually exclusive with concurrency_config.

None
concurrency_config Optional[ConcurrencyConfig]

Optional ConcurrencyConfig for advanced concurrency settings. Mutually exclusive with max_concurrent_envs.

None

Returns:

Type Description
FastAPI

FastAPI application instance

Web interface helpers

Web interface for OpenEnv environments.

This module provides a web-based interface for interacting with OpenEnv environments, including a two-pane layout for HumanAgent interaction and state observation.

ActionLog

Bases: BaseModel

Log entry for an action taken.

EpisodeState

Bases: BaseModel

Current episode state for the web interface.

WebInterfaceManager

Manages the web interface for an environment.

connect_websocket(websocket) async

Connect a new WebSocket client.

disconnect_websocket(websocket) async

Disconnect a WebSocket client.

reset_environment() async

Reset the environment and update state.

step_environment(action_data) async

Execute a step in the environment and update state.

get_state()

Get current environment state.

load_environment_metadata(env, env_name=None)

Load environment metadata including README content.

Parameters:

Name Type Description Default
env Environment

The environment instance

required
env_name Optional[str]

Optional environment name for README file lookup

None

Returns:

Type Description
EnvironmentMetadata

EnvironmentMetadata with loaded information

create_web_interface_app(env, action_cls, observation_cls, env_name=None)

Create a FastAPI application with web interface for the given environment.

Parameters:

Name Type Description Default
env Environment

The Environment instance to serve

required
action_cls Type[Action]

The Action subclass this environment expects

required
observation_cls Type[Observation]

The Observation subclass this environment returns

required
env_name Optional[str]

Optional environment name for README loading

None

Returns:

Type Description
FastAPI

FastAPI application instance with web interface

get_web_interface_html(action_cls, metadata=None)

Generate the HTML for the web interface.

Client contracts

Environment client for persistent sessions.

This module provides a WebSocket-based client that maintains a persistent connection to an environment server, enabling efficient multi-step interactions without the overhead of HTTP request/response cycles.

EnvClient

Bases: ABC, Generic[ActT, ObsT, StateT]

Environment client for persistent sessions.

This client maintains a persistent WebSocket connection to an environment server, enabling efficient multi-step interactions. Each client instance corresponds to a dedicated environment session on the server.

Features: - Lower latency for sequential interactions - Session state is maintained server-side - Better suited for long-running episodes

Example

from envs.coding_env.client import CodingEnv

Connect to a server

with CodingEnv(base_url="ws://localhost:8000") as env: ... result = env.reset(seed=42) ... while not result.done: ... action = agent.predict(result.observation) ... result = env.step(action)

__init__(base_url, connect_timeout_s=10.0, message_timeout_s=60.0, provider=None)

Initialize environment client.

Parameters:

Name Type Description Default
base_url str

Base URL of the environment server (http:// or ws://). Will be converted to ws:// if http:// is provided.

required
connect_timeout_s float

Timeout for establishing WebSocket connection

10.0
message_timeout_s float

Timeout for receiving responses to messages

60.0
provider Optional['ContainerProvider | RuntimeProvider']

Optional container/runtime provider for lifecycle management. Can be a ContainerProvider (Docker) or RuntimeProvider (UV).

None

connect()

Establish WebSocket connection to the server.

Returns:

Type Description
'EnvClient'

self for method chaining

Raises:

Type Description
ConnectionError

If connection cannot be established

disconnect()

Close the WebSocket connection.

from_docker_image(image, provider=None, **kwargs) classmethod

Create an environment client by spinning up a Docker container.

Parameters:

Name Type Description Default
image str

Docker image name to run (e.g., "coding-env:latest")

required
provider Optional['ContainerProvider']

Container provider to use (defaults to LocalDockerProvider)

None
**kwargs Any

Additional arguments to pass to provider.start_container()

{}

Returns:

Type Description
EnvClientT

Connected client instance

from_hub(repo_id, *, use_docker=True, provider=None, **provider_kwargs) classmethod

Create a client from a Hugging Face Space.

Parameters:

Name Type Description Default
repo_id str

Hugging Face space identifier {org}/{space}.

required
use_docker bool

When True (default) pull from the HF registry and launch via :class:LocalDockerProvider. When False run the space locally with :class:UVProvider.

True
provider Optional['ContainerProvider | RuntimeProvider']

Optional provider instance to reuse. Must be a :class:ContainerProvider when use_docker=True and a :class:RuntimeProvider otherwise.

None
provider_kwargs Any

Additional keyword arguments forwarded to either the container provider's start_container (docker) or to the UVProvider constructor/start (uv). When use_docker=False, the project_path argument can be used to override the default git URL (git+https://huggingface.co/spaces/{repo_id}).

{}

Returns:

Type Description
EnvClientT

Connected client instance

Examples:

>>> # Pull and run from HF Docker registry
>>> env = MyEnv.from_hub("openenv/echo-env")
>>>
>>> # Run locally with UV (clones the space)
>>> env = MyEnv.from_hub("openenv/echo-env", use_docker=False)
>>>
>>> # Run from a local checkout
>>> env = MyEnv.from_hub(
...     "openenv/echo-env",
...     use_docker=False,
...     project_path="/path/to/local/checkout"
... )

reset(**kwargs)

Reset the environment with optional parameters.

Parameters:

Name Type Description Default
**kwargs Any

Optional parameters passed to the environment's reset method. Common parameters include: - seed: Random seed for reproducibility - episode_id: Custom episode identifier

{}

Returns:

Type Description
StepResult[ObsT]

StepResult containing initial observation

step(action, **kwargs)

Execute an action in the environment.

Parameters:

Name Type Description Default
action ActT

The action to execute

required
**kwargs Any

Optional parameters (currently ignored)

{}

Returns:

Type Description
StepResult[ObsT]

StepResult containing observation, reward, and done status

state()

Get the current environment state from the server.

Returns:

Type Description
StateT

State object with environment state information

close()

Close the WebSocket connection and clean up resources.

If this client was created via from_docker_image() or from_hub(), this will also stop and remove the associated container/process.

__enter__()

Enter context manager, ensuring connection is established.

__exit__(exc_type, exc_val, exc_tb)

Exit context manager, closing connection.

Shared dataclasses

StepResult dataclass

Bases: Generic[ObsT]

Represents the result of one environment step.

Attributes:

Name Type Description
observation ObsT

The environment's observation after the action.

reward Optional[float]

Scalar reward for this step (optional).

done bool

Whether the episode is finished.

Container providers

Container provider abstractions for running environment servers.

This module provides a pluggable architecture for different container providers (local Docker, Kubernetes, cloud providers, etc.) to be used with EnvClient.

ContainerProvider

Bases: ABC

Abstract base class for container providers.

Providers implement this interface to support different container platforms: - LocalDockerProvider: Runs containers on local Docker daemon - KubernetesProvider: Runs containers in Kubernetes cluster - FargateProvider: Runs containers on AWS Fargate - CloudRunProvider: Runs containers on Google Cloud Run

The provider manages a single container lifecycle and provides the base URL for connecting to it.

Example

provider = LocalDockerProvider() base_url = provider.start_container("echo-env:latest") print(base_url) # http://localhost:8000

Use the environment via base_url

provider.stop_container()

start_container(image, port=None, env_vars=None, **kwargs) abstractmethod

Start a container from the specified image.

Parameters:

Name Type Description Default
image str

Container image name (e.g., "echo-env:latest")

required
port Optional[int]

Port to expose (if None, provider chooses)

None
env_vars Optional[Dict[str, str]]

Environment variables to pass to container

None
**kwargs Any

Provider-specific options

{}

Returns:

Type Description
str

Base URL to connect to the container (e.g., "http://localhost:8000")

Raises:

Type Description
RuntimeError

If container fails to start

stop_container() abstractmethod

Stop and remove the running container.

This cleans up the container that was started by start_container().

wait_for_ready(base_url, timeout_s=30.0) abstractmethod

Wait for the container to be ready to accept requests.

This typically polls the /health endpoint until it returns 200.

Parameters:

Name Type Description Default
base_url str

Base URL of the container

required
timeout_s float

Maximum time to wait

30.0

Raises:

Type Description
TimeoutError

If container doesn't become ready in time

LocalDockerProvider

Bases: ContainerProvider

Container provider for local Docker daemon.

This provider runs containers on the local machine using Docker. Useful for development and testing.

Example

provider = LocalDockerProvider() base_url = provider.start_container("echo-env:latest")

Container running on http://localhost:

provider.stop_container()

__init__()

Initialize the local Docker provider.

start_container(image, port=None, env_vars=None, **kwargs)

Start a Docker container locally.

Parameters:

Name Type Description Default
image str

Docker image name

required
port Optional[int]

Port to expose (if None, finds available port)

None
env_vars Optional[Dict[str, str]]

Environment variables for the container

None
**kwargs Any

Additional Docker run options

{}

Returns:

Type Description
str

Base URL to connect to the container

stop_container()

Stop and remove the Docker container.

wait_for_ready(base_url, timeout_s=30.0)

Wait for container to be ready by polling /health endpoint.

Parameters:

Name Type Description Default
base_url str

Base URL of the container

required
timeout_s float

Maximum time to wait

30.0

Raises:

Type Description
TimeoutError

If container doesn't become ready

DockerSwarmProvider

Bases: ContainerProvider

Container provider that uses Docker Swarm services for local concurrency.

This provider creates a replicated Swarm service backed by the local Docker engine. The built-in load-balancer fans requests across the replicas, allowing multiple container instances to run concurrently on the developer workstation (mirroring the workflow described in the Docker stack docs).

__init__(*, auto_init_swarm=True, overlay_network=None)

Parameters:

Name Type Description Default
auto_init_swarm bool

Whether to call docker swarm init when Swarm is not active. Otherwise, user must manually initialize Swarm.

True
overlay_network Optional[str]

Optional overlay network name for the service. When provided, the network is created with docker network create --driver overlay --attachable if it does not already exist.

None

start_container(image, port=None, env_vars=None, **kwargs)

Start (or scale) a Swarm service for the given image.

Supported kwargs

replicas (int): Number of container replicas (default: 2). cpu_limit (float | str): CPU limit passed to --limit-cpu. memory_limit (str): Memory limit passed to --limit-memory. constraints (Sequence[str]): Placement constraints. labels (Dict[str, str]): Service labels. command (Sequence[str] | str): Override container command.

stop_container()

Remove the Swarm service (and keep the Swarm manager running).

wait_for_ready(base_url, timeout_s=30.0)

Wait for at least one replica to become healthy by polling /health.

Note: With Swarm's load balancer, requests round-robin across replicas, so this only verifies that at least one replica is responding. Some replicas may still be starting when this returns.

KubernetesProvider

Bases: ContainerProvider

Container provider for Kubernetes clusters.

This provider creates pods in a Kubernetes cluster and exposes them via services or port-forwarding.

Example

provider = KubernetesProvider(namespace="envtorch-dev") base_url = provider.start_container("echo-env:latest")

Pod running in k8s, accessible via service or port-forward

provider.stop_container()

RuntimeProvider

Bases: ABC

Abstract base class for runtime providers that are not container providers. Providers implement this interface to support different runtime platforms: - UVProvider: Runs environments via uv run

The provider manages a single runtime lifecycle and provides the base URL for connecting to it.

Example

provider = UVProvider(project_path="/path/to/env") base_url = provider.start() print(base_url) # http://localhost:8000 provider.stop()

start(port=None, env_vars=None, **kwargs) abstractmethod

Start a runtime from the specified image.

Parameters:

Name Type Description Default
image

Runtime image name

required
port Optional[int]

Port to expose (if None, provider chooses)

None
env_vars Optional[Dict[str, str]]

Environment variables for the runtime

None
**kwargs Any

Additional runtime options

{}

stop() abstractmethod

Stop the runtime.

wait_for_ready(timeout_s=30.0) abstractmethod

Wait for the runtime to be ready to accept requests.

__enter__()

Enter the runtime provider.

__exit__(exc_type, exc, tb)

Exit the runtime provider.