Service#
- class forge.controller.service.service.Service(cfg, actor_def, actor_args, actor_kwargs)[source]#
Actor-less implementation of Service.
This is a temporary solution to disable nested actors, which is proving problematic temporarily.
- Parameters:
cfg – Service configuration including number of replicas, GPUs per replica, and health polling rate
actor_def – Actor class definition to instantiate on each replica
*actor_args – Positional arguments passed to actor constructor
**actor_kwargs – Keyword arguments passed to actor constructor
- Members:
call_all, start_session, get_metrics, get_metrics_summary, terminate_session, stop
- Show-inheritance:
- async call_all(function, *args, **kwargs)[source]#
Broadcasts a function call to all healthy replicas and returns results as a list.
- Return type:
list- Parameters:
function – Name of the actor endpoint to call
*args – Positional arguments to pass to the endpoint
**kwargs – Keyword arguments to pass to the endpoint
- Returns:
List of results from all healthy replicas
- Raises:
RuntimeError – If no healthy replicas are available
- get_metrics()[source]#
Get comprehensive service metrics for monitoring and analysis.
Returns detailed metrics including per-replica performance data, service-wide aggregations, and health status information.
- Return type:
ServiceMetrics- Returns:
Complete metrics object with replica and service data
- Return type:
ServiceMetrics
Example
>>> metrics = service.get_metrics() >>> print(f"Request rate: {metrics.get_total_request_rate():.1f} req/s") >>> print(f"Queue depth: {metrics.get_avg_queue_depth():.1f}")
- get_metrics_summary()[source]#
Get a summary of key metrics for monitoring and debugging.
Provides a structured summary of service and replica metrics in a format suitable for monitoring dashboards, logging, or debugging purposes.
- Return type:
dict- Returns:
Structured metrics summary with service and per-replica data
- Return type:
dict
Example
>>> summary = service.get_metrics_summary() >>> print(f"Healthy replicas: {summary['service']['healthy_replicas']}") >>> for idx, metrics in summary['replicas'].items(): ... print(f"Replica {idx}: {metrics['request_rate']:.1f} req/s")
- async start_session()[source]#
Starts a new session for stateful request handling.
Sessions enable request affinity to specific replicas, maintaining state consistency for workloads that require it. Each session gets a unique ID and is automatically assigned to the least loaded replica.
- Return type:
str- Returns:
Unique session identifier for use in subsequent requests
- Return type:
str
Example
>>> session_id = await service.start_session() >>> result = await service.my_endpoint(session_id, arg1, arg2) >>> await service.terminate_session(session_id)
- async stop()[source]#
Stops the service and all managed replicas. This method should be called when the service is no longer needed.
- async terminate_session(sess_id)[source]#
Terminates an active session and cleans up associated resources.
Removes the session from active tracking, clears replica assignments, and updates service metrics. Sessions should be terminated when no longer needed to free up resources.
- Parameters:
sess_id – The unique session identifier to terminate
Example
>>> session_id = await service.start_session() >>> # ... use session for requests ... >>> await service.terminate_session(session_id)