monarch#
These API functions define monarch’s distributed tensor computation API. See Distributed Tensors in Monarch for an overview.
- class monarch.Tensor(fake, mesh, stream)[source]#
Bases:
Referenceable
,BaseTensor
A distributed tensor for distributed computation across device meshes.
Tensor represents a distributed tensor that spans across multiple devices in a device mesh. It provides the same interface as PyTorch tensors but enables distributed operations and communication patterns.
- mesh: DeviceMesh#
- property dropped#
- to_mesh(mesh, stream=None)[source]#
Move data between one device mesh and another. Sizes of named dimensions must match. If mesh has dimensions that self.mesh does not, it will broadcast to those dimensions.
- broadcast:
t.slice_mesh(batch=0).to_mesh(t.mesh)
- reduce(dims, reduction='sum', scatter=False, mesh=None, _inplace=False, out=None)[source]#
Perform a reduction operation along dim, and move the data to mesh. If mesh=None, then mesh=self.mesh ‘stack’ (gather) will concat the values along dim, and produce a local result tensor with an addition outer dimension of len(dim). If scatter=True, the local result tensor will be evenly split across dim.
- allreduce:
t.reduce(dims=’gpu’, reduction=’sum’)
First reduces dim ‘gpu’ creating a local tensor with the ‘gpu’ dimension, then because output_mesh=input_mesh, and it still has dim ‘gpu’, we broadcast the result reduced tensor to all members of gpu.
- reducescatter:
t.reduce(dims=’gpu’, reduction=’sum’, scatter=True)
Same as above except that scatter=True introduces a new ‘gpu’ dimension that is the result of splitting the local tensor across ‘gpu’
- allgather:
t.reduce(dims=’gpu’, reduction=’stack’)
First reduces dim ‘gpu’ creating a bigger local tensor, then because output_mesh=input_mesh, and it still has dim ‘gpu’, broadcasts the result concatenated tensor to all members of gpu.
- alltoall:
t.reduce(dims=’gpu’, reduction=’stack’, scatter=True)
First reduces dim ‘gpu’ creating a bigger local tensor, then introduces a new ‘gpu’ dimension that is the result of splitting this (bigger) tensor across ‘gpu’. The result is the same dimension as the original tensor, but with each rank sending to all other ranks.
- gather (to dim 0):
t.reduce(dims=’gpu’, reduction=’stack’, mesh=device_mesh(gpu=0))
First gathers dim ‘gpu’ and then places it on the first rank. t.mesh.gpu[0] doesn’t have a ‘gpu’ dimension, but this is ok because we eliminated the ‘gpu’ dim via reduction.
- reduce:
t.reduce(dims=’gpu’, reduction=’sum’, mesh=device_mesh(gpu=0))
First reduces dim ‘gpu’ and then places it on the first rank. t.mesh.gpu[0] doesn’t have a ‘gpu’ dimension, but this is ok because we eliminated the ‘gpu’ dim via reduction.
- Parameters:
dims (Dims | str) – The dimensions along which to perform the reduction.
reduction (_valid_reduce) – The type of reduction to perform. Defaults to “sum”.
scatter (bool) – If True, the local result tensor will be evenly split across dimensions. Defaults to False.
mesh (Optional["DeviceMesh"], optional) – The target mesh to move the data to. If None, uses self.mesh. Defaults to None.
_inplace (bool) – If True, performs the operation in-place. Defaults to False. Note that not all the reduction operations support in-place.
out (Optional["Tensor"]) – The output tensor to store the result. If None, a new tensor will be created on the stream where the reduce operation executes. Defaults to None.
- Returns:
The result of the reduction operation.
- Return type:
- class monarch.Stream(name, _default=False)[source]#
Bases:
object
- borrow(t, mutable=False)[source]#
borrowed_tensor, borrow = self.borrow(t)
Borrows tensor ‘t’ for use on this stream. The memory of t will stay alive until borrow.drop() is called, which will free t and and any of its alises on stream self and will cause t.stream to wait on self at that point so that the memory of t can be reused.
If mutable then self can write to the storage of t, but t.stream cannot read or write t until, the borrow is returned (becomes free and a wait_for has been issued).
If not mutable both self and t.stream can read from t’s storage but neither can write to it.
- monarch.remote(function: Callable[P, R], *, propagate: Propagator = None) Remote[P, R] [source]#
- monarch.remote(function: str, *, propagate: Literal['mocked', 'cached', 'inspect'] | None = None) Remote
- monarch.remote(function: str, *, propagate: Callable[P, R]) Remote[P, R]
- monarch.remote(*, propagate: Propagator = None) RemoteIfy
- monarch.no_mesh()#
- monarch.slice_mesh(tensors, **kwargs)[source]#
Performs the slice_mesh operation for each tensor in tensors.
- monarch.reduce(tensors, dims, reduction='sum', scatter=False, mesh=None, _inplace=False)[source]#
Performs the tensor reduction operation for each tensor in tensors. :param tensors: The pytree of input tensors to reduce. :type tensors: pytree[“Tensor”] :param dims: The dimensions along which to perform the reduction. :type dims: Dims | str :param reduction: The type of reduction to perform. Defaults to “sum”. :type reduction: _valid_reduce :param scatter: If True, the local result tensor will be evenly split across dimensions.
Defaults to False.
- Parameters:
mesh (Optional["DeviceMesh"], optional) – The target mesh to move the data to. If None, uses self.mesh. Defaults to None.
_inplace (bool) – If True, performs the operation in-place. Defaults to False. Note that not all the reduction operations support in-place.
- monarch.fetch_shard(obj, shard=None, **kwargs)[source]#
Retrieve the shard at coordinates of the current device mesh of each tensor in obj. All tensors in obj will be fetched to the CPU device.
obj - a pytree containing the tensors the fetch shard - a dictionary from mesh dimension name to coordinate of the shard
If None, this will fetch from coordinate 0 for all dimensions (useful after all_reduce/all_gather)
preprocess - a **kwargs - additional keyword arguments are added as entries to the shard dictionary