Rate this Page

Service#

class forge.controller.service.service.Service(cfg, actor_def, actor_args, actor_kwargs)[source]#

Actor-less implementation of Service.

This is a temporary solution to disable nested actors, which is proving problematic temporarily.

Parameters:
  • cfg – Service configuration including number of replicas, GPUs per replica, and health polling rate

  • actor_def – Actor class definition to instantiate on each replica

  • *actor_args – Positional arguments passed to actor constructor

  • **actor_kwargs – Keyword arguments passed to actor constructor

Members:

call_all, start_session, get_metrics, get_metrics_summary, terminate_session, stop

Show-inheritance:

async call_all(function, *args, **kwargs)[source]#

Broadcasts a function call to all healthy replicas and returns results as a list.

Return type:

list

Parameters:
  • function – Name of the actor endpoint to call

  • *args – Positional arguments to pass to the endpoint

  • **kwargs – Keyword arguments to pass to the endpoint

Returns:

List of results from all healthy replicas

Raises:

RuntimeError – If no healthy replicas are available

get_metrics()[source]#

Get comprehensive service metrics for monitoring and analysis.

Returns detailed metrics including per-replica performance data, service-wide aggregations, and health status information.

Return type:

ServiceMetrics

Returns:

Complete metrics object with replica and service data

Return type:

ServiceMetrics

Example

>>> metrics = service.get_metrics()
>>> print(f"Request rate: {metrics.get_total_request_rate():.1f} req/s")
>>> print(f"Queue depth: {metrics.get_avg_queue_depth():.1f}")
get_metrics_summary()[source]#

Get a summary of key metrics for monitoring and debugging.

Provides a structured summary of service and replica metrics in a format suitable for monitoring dashboards, logging, or debugging purposes.

Return type:

dict

Returns:

Structured metrics summary with service and per-replica data

Return type:

dict

Example

>>> summary = service.get_metrics_summary()
>>> print(f"Healthy replicas: {summary['service']['healthy_replicas']}")
>>> for idx, metrics in summary['replicas'].items():
...     print(f"Replica {idx}: {metrics['request_rate']:.1f} req/s")
async start_session()[source]#

Starts a new session for stateful request handling.

Sessions enable request affinity to specific replicas, maintaining state consistency for workloads that require it. Each session gets a unique ID and is automatically assigned to the least loaded replica.

Return type:

str

Returns:

Unique session identifier for use in subsequent requests

Return type:

str

Example

>>> session_id = await service.start_session()
>>> result = await service.my_endpoint(session_id, arg1, arg2)
>>> await service.terminate_session(session_id)
async stop()[source]#

Stops the service and all managed replicas. This method should be called when the service is no longer needed.

async terminate_session(sess_id)[source]#

Terminates an active session and cleans up associated resources.

Removes the session from active tracking, clears replica assignments, and updates service metrics. Sessions should be terminated when no longer needed to free up resources.

Parameters:

sess_id – The unique session identifier to terminate

Example

>>> session_id = await service.start_session()
>>> # ... use session for requests ...
>>> await service.terminate_session(session_id)