monarch_hyperactor/
bootstrap.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use futures::future::try_join_all;
10use hyperactor::channel::ChannelAddr;
11use hyperactor_mesh::Bootstrap;
12use hyperactor_mesh::Name;
13use hyperactor_mesh::bootstrap::BootstrapCommand;
14use hyperactor_mesh::bootstrap::bootstrap;
15use hyperactor_mesh::host_mesh::HostMesh;
16use monarch_types::MapPyErr;
17use pyo3::Bound;
18use pyo3::PyAny;
19use pyo3::PyResult;
20use pyo3::Python;
21use pyo3::exceptions::PyException;
22use pyo3::exceptions::PyRuntimeError;
23use pyo3::pyfunction;
24use pyo3::types::PyAnyMethods;
25use pyo3::types::PyModule;
26use pyo3::types::PyModuleMethods;
27use pyo3::wrap_pyfunction;
28
29use crate::host_mesh::PyHostMesh;
30use crate::pytokio::PyPythonTask;
31use crate::runtime::monarch_with_gil;
32
33#[pyfunction]
34#[pyo3(signature = ())]
35pub fn bootstrap_main(py: Python) -> PyResult<Bound<PyAny>> {
36    // SAFETY: this is a correct use of this function.
37    let _ = unsafe {
38        fbinit::perform_init();
39    };
40
41    hyperactor::internal_macro_support::tracing::debug!("entering async bootstrap");
42    crate::runtime::future_into_py::<_, i32>(py, async move {
43        // SAFETY:
44        // - Only one of these is ever created.
45        // - This is the entry point of this program, so this will be dropped when
46        // no more FB C++ code is running.
47        let _destroy_guard = unsafe { fbinit::DestroyGuard::new() };
48        bootstrap()
49            .await
50            .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
51    })
52}
53
54#[pyfunction]
55pub fn run_worker_loop_forever(_py: Python<'_>, address: &str) -> PyResult<PyPythonTask> {
56    let addr = ChannelAddr::from_zmq_url(address)?;
57
58    // Check if we're running in a PAR/XAR build by looking for FB_XAR_INVOKED_NAME environment variable
59    let invoked_name = std::env::var("FB_XAR_INVOKED_NAME");
60
61    let mut env: std::collections::HashMap<String, String> = std::env::vars().collect();
62
63    let command = Some(if let Ok(invoked_name) = invoked_name {
64        // For PAR/XAR builds: use argv[0] from Python's sys.argv as the current executable
65        let current_exe = std::path::PathBuf::from(&invoked_name);
66
67        // For PAR/XAR builds: set PAR_MAIN_OVERRIDE and no additional args
68        env.insert(
69            "PAR_MAIN_OVERRIDE".to_string(),
70            "monarch._src.actor.bootstrap_main".to_string(),
71        );
72        BootstrapCommand {
73            program: current_exe,
74            arg0: Some(invoked_name),
75            args: vec![],
76            env,
77        }
78    } else {
79        // For regular Python builds: use current_exe() and -m arguments
80        let current_exe = std::env::current_exe().map_err(|e| {
81            pyo3::PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
82                "Failed to get current executable: {}",
83                e
84            ))
85        })?;
86        let current_exe_str = current_exe.to_string_lossy().to_string();
87        BootstrapCommand {
88            program: current_exe,
89            arg0: Some(current_exe_str),
90            args: vec![
91                "-m".to_string(),
92                "monarch._src.actor.bootstrap_main".to_string(),
93            ],
94            env,
95        }
96    });
97
98    let boot = Bootstrap::Host {
99        addr,
100        config: None,
101        command,
102        // This function is the entry point of the program, and no one else
103        // will terminate this process. So it needs to exit on its own.
104        exit_on_shutdown: true,
105    };
106
107    PyPythonTask::new(async {
108        // This should never return Ok because exit_on_shutdown is true.
109        let err = boot.bootstrap().await.unwrap_err();
110        Err(err).map_pyerr()?;
111        Ok(())
112    })
113}
114
115#[pyfunction]
116pub fn attach_to_workers<'py>(
117    instance: &crate::context::PyInstance,
118    workers: Vec<Bound<'py, PyPythonTask>>,
119    name: Option<&str>,
120) -> PyResult<PyPythonTask> {
121    let tasks = workers
122        .into_iter()
123        .map(|x| x.borrow_mut().take_task())
124        .collect::<PyResult<Vec<_>>>()?;
125
126    let name =
127        Name::new(name.unwrap_or("hosts")).map_err(|err| PyException::new_err(err.to_string()))?;
128    let instance = instance.clone();
129    PyPythonTask::new(async move {
130        let results = try_join_all(tasks).await?;
131
132        let addresses: Result<Vec<ChannelAddr>, anyhow::Error> = monarch_with_gil(|py| {
133            results
134                .into_iter()
135                .map(|result| {
136                    let url_str: String = result.bind(py).extract()?;
137                    ChannelAddr::from_zmq_url(&url_str)
138                })
139                .collect()
140        })
141        .await;
142        let addresses = addresses?;
143
144        let host_mesh = HostMesh::attach(&*instance, name, addresses)
145            .await
146            .map_err(|e| anyhow::anyhow!("attach failed: {}", e))?;
147        Ok(PyHostMesh::new_owned(host_mesh))
148    })
149}
150
151pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
152    let f = wrap_pyfunction!(bootstrap_main, hyperactor_mod)?;
153    f.setattr(
154        "__module__",
155        "monarch._rust_bindings.monarch_hyperactor.bootstrap",
156    )?;
157    hyperactor_mod.add_function(f)?;
158
159    let f = wrap_pyfunction!(run_worker_loop_forever, hyperactor_mod)?;
160    f.setattr(
161        "__module__",
162        "monarch._rust_bindings.monarch_hyperactor.bootstrap",
163    )?;
164    hyperactor_mod.add_function(f)?;
165
166    let f = wrap_pyfunction!(attach_to_workers, hyperactor_mod)?;
167    f.setattr(
168        "__module__",
169        "monarch._rust_bindings.monarch_hyperactor.bootstrap",
170    )?;
171    hyperactor_mod.add_function(f)?;
172
173    Ok(())
174}