monarch_hyperactor/
bootstrap.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use futures::future::try_join_all;
10use hyperactor::channel::ChannelAddr;
11use hyperactor_mesh::Bootstrap;
12use hyperactor_mesh::Name;
13use hyperactor_mesh::bootstrap::BootstrapCommand;
14use hyperactor_mesh::bootstrap::bootstrap;
15use hyperactor_mesh::host_mesh::HostMesh;
16use monarch_types::MapPyErr;
17use pyo3::Bound;
18use pyo3::PyAny;
19use pyo3::PyResult;
20use pyo3::Python;
21use pyo3::exceptions::PyException;
22use pyo3::exceptions::PyRuntimeError;
23use pyo3::pyfunction;
24use pyo3::types::PyAnyMethods;
25use pyo3::types::PyModule;
26use pyo3::types::PyModuleMethods;
27use pyo3::wrap_pyfunction;
28
29use crate::host_mesh::PyHostMesh;
30use crate::pytokio::PyPythonTask;
31use crate::runtime::monarch_with_gil;
32
33#[pyfunction]
34#[pyo3(signature = ())]
35pub fn bootstrap_main(py: Python) -> PyResult<Bound<PyAny>> {
36    // SAFETY: this is a correct use of this function.
37    let _ = unsafe {
38        fbinit::perform_init();
39    };
40
41    hyperactor::internal_macro_support::tracing::debug!("entering async bootstrap");
42    crate::runtime::future_into_py::<_, i32>(py, async move {
43        // SAFETY:
44        // - Only one of these is ever created.
45        // - This is the entry point of this program, so this will be dropped when
46        // no more FB C++ code is running.
47        let _destroy_guard = unsafe { fbinit::DestroyGuard::new() };
48        bootstrap()
49            .await
50            .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
51    })
52}
53
54#[pyfunction]
55pub fn run_worker_loop_forever(_py: Python<'_>, address: &str) -> PyResult<PyPythonTask> {
56    let addr = ChannelAddr::from_zmq_url(address)?;
57
58    // Check if we're running in a PAR/XAR build by looking for FB_XAR_INVOKED_NAME environment variable
59    let invoked_name = std::env::var("FB_XAR_INVOKED_NAME");
60
61    let mut env: std::collections::HashMap<String, String> = std::env::vars().collect();
62
63    let command = Some(if let Ok(invoked_name) = invoked_name {
64        // For PAR/XAR builds: use argv[0] from Python's sys.argv as the current executable
65        let current_exe = std::path::PathBuf::from(&invoked_name);
66
67        // For PAR/XAR builds: set PAR_MAIN_OVERRIDE and no additional args
68        env.insert(
69            "PAR_MAIN_OVERRIDE".to_string(),
70            "monarch._src.actor.bootstrap_main".to_string(),
71        );
72        BootstrapCommand {
73            program: current_exe,
74            arg0: Some(invoked_name),
75            args: vec![],
76            env,
77        }
78    } else {
79        // For regular Python builds: use argv[0] to preserve the original
80        // invocation path.  current_exe() resolves symlinks, which breaks
81        // virtual environments — the resolved path doesn't find pyvenv.cfg
82        // so site-packages aren't activated in subprocesses.
83        let current_exe = std::env::args()
84            .next()
85            .map(std::path::PathBuf::from)
86            .or_else(|| std::env::current_exe().ok())
87            .ok_or_else(|| {
88                pyo3::PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
89                    "Failed to determine current executable",
90                )
91            })?;
92        let current_exe_str = current_exe.to_string_lossy().to_string();
93        BootstrapCommand {
94            program: current_exe,
95            arg0: Some(current_exe_str),
96            args: vec![
97                "-m".to_string(),
98                "monarch._src.actor.bootstrap_main".to_string(),
99            ],
100            env,
101        }
102    });
103
104    let boot = Bootstrap::Host {
105        addr,
106        config: None,
107        command,
108        // This function is the entry point of the program, and no one else
109        // will terminate this process. So it needs to exit on its own.
110        exit_on_shutdown: true,
111    };
112
113    PyPythonTask::new(async {
114        // This should never return Ok because exit_on_shutdown is true.
115        let err = boot.bootstrap().await.unwrap_err();
116        Err(err).map_pyerr()?;
117        Ok(())
118    })
119}
120
121#[pyfunction]
122pub fn attach_to_workers<'py>(
123    instance: &crate::context::PyInstance,
124    workers: Vec<Bound<'py, PyPythonTask>>,
125    name: Option<&str>,
126) -> PyResult<PyPythonTask> {
127    let tasks = workers
128        .into_iter()
129        .map(|x| x.borrow_mut().take_task())
130        .collect::<PyResult<Vec<_>>>()?;
131
132    let name =
133        Name::new(name.unwrap_or("hosts")).map_err(|err| PyException::new_err(err.to_string()))?;
134    let instance = instance.clone();
135    PyPythonTask::new(async move {
136        let results = try_join_all(tasks).await?;
137
138        let addresses: Result<Vec<ChannelAddr>, anyhow::Error> = monarch_with_gil(|py| {
139            results
140                .into_iter()
141                .map(|result| {
142                    let url_str: String = result.bind(py).extract()?;
143                    ChannelAddr::from_zmq_url(&url_str)
144                })
145                .collect()
146        })
147        .await;
148        let addresses = addresses?;
149
150        let host_mesh = HostMesh::attach(&*instance, name, addresses)
151            .await
152            .map_err(|e| anyhow::anyhow!("attach failed: {}", e))?;
153        Ok(PyHostMesh::new_owned(host_mesh))
154    })
155}
156
157pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
158    let f = wrap_pyfunction!(bootstrap_main, hyperactor_mod)?;
159    f.setattr(
160        "__module__",
161        "monarch._rust_bindings.monarch_hyperactor.bootstrap",
162    )?;
163    hyperactor_mod.add_function(f)?;
164
165    let f = wrap_pyfunction!(run_worker_loop_forever, hyperactor_mod)?;
166    f.setattr(
167        "__module__",
168        "monarch._rust_bindings.monarch_hyperactor.bootstrap",
169    )?;
170    hyperactor_mod.add_function(f)?;
171
172    let f = wrap_pyfunction!(attach_to_workers, hyperactor_mod)?;
173    f.setattr(
174        "__module__",
175        "monarch._rust_bindings.monarch_hyperactor.bootstrap",
176    )?;
177    hyperactor_mod.add_function(f)?;
178
179    Ok(())
180}