Skip to main content

monarch_hyperactor/
bootstrap.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9use futures::future::try_join_all;
10use hyperactor::channel::ChannelAddr;
11use hyperactor::id::Label;
12use hyperactor_mesh::bootstrap::BootstrapCommand;
13use hyperactor_mesh::bootstrap::bootstrap;
14use hyperactor_mesh::bootstrap::halt;
15use hyperactor_mesh::bootstrap::host;
16use hyperactor_mesh::host_mesh::HostMesh;
17use hyperactor_mesh::mesh_id::HostMeshId;
18use monarch_types::MapPyErr;
19use pyo3::Bound;
20use pyo3::PyAny;
21use pyo3::PyResult;
22use pyo3::Python;
23use pyo3::exceptions::PyRuntimeError;
24use pyo3::pyfunction;
25use pyo3::types::PyAnyMethods;
26use pyo3::types::PyModule;
27use pyo3::types::PyModuleMethods;
28use pyo3::wrap_pyfunction;
29
30use crate::host_mesh::PyHostMesh;
31use crate::pytokio::PyPythonTask;
32use crate::runtime::monarch_with_gil;
33
34#[pyfunction]
35#[pyo3(signature = ())]
36pub fn bootstrap_main(py: Python) -> PyResult<Bound<PyAny>> {
37    // SAFETY: this is a correct use of this function.
38    unsafe {
39        fbinit::perform_init();
40    };
41
42    hyperactor::internal_macro_support::tracing::debug!("entering async bootstrap");
43    crate::runtime::future_into_py::<_, i32>(py, async move {
44        // SAFETY:
45        // - Only one of these is ever created.
46        // - This is the entry point of this program, so this will be dropped when
47        // no more FB C++ code is running.
48        let _destroy_guard = unsafe { fbinit::DestroyGuard::new() };
49        bootstrap()
50            .await
51            .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
52    })
53}
54
55#[pyfunction]
56pub fn run_worker_loop_forever(_py: Python<'_>, address: &str) -> PyResult<PyPythonTask> {
57    let (addr, listener) = ChannelAddr::from_zmq_url_with_listener(address)?;
58
59    // Check if we're running in a PAR/XAR build by looking for FB_XAR_INVOKED_NAME environment variable
60    let invoked_name = std::env::var("FB_XAR_INVOKED_NAME");
61
62    let mut env: std::collections::HashMap<String, String> = std::env::vars().collect();
63
64    let command = Some(if let Ok(invoked_name) = invoked_name {
65        // For PAR/XAR builds: use argv[0] from Python's sys.argv as the current executable
66        let current_exe = std::path::PathBuf::from(&invoked_name);
67
68        // For PAR/XAR builds: set PAR_MAIN_OVERRIDE and no additional args
69        env.insert(
70            "PAR_MAIN_OVERRIDE".to_string(),
71            "monarch._src.actor.bootstrap_main".to_string(),
72        );
73        BootstrapCommand {
74            program: current_exe,
75            arg0: Some(invoked_name),
76            args: vec![],
77            env,
78        }
79    } else {
80        // For regular Python builds: use argv[0] to preserve the original
81        // invocation path.  current_exe() resolves symlinks, which breaks
82        // virtual environments — the resolved path doesn't find pyvenv.cfg
83        // so site-packages aren't activated in subprocesses.
84        let current_exe = std::env::args()
85            .next()
86            .map(std::path::PathBuf::from)
87            .or_else(|| std::env::current_exe().ok())
88            .ok_or_else(|| {
89                pyo3::PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
90                    "Failed to determine current executable",
91                )
92            })?;
93        let current_exe_str = current_exe.to_string_lossy().to_string();
94        BootstrapCommand {
95            program: current_exe,
96            arg0: Some(current_exe_str),
97            args: vec![
98                "-m".to_string(),
99                "monarch._src.actor.bootstrap_main".to_string(),
100            ],
101            env,
102        }
103    });
104
105    PyPythonTask::new(async move {
106        let (_agent_handle, shutdown) = host(addr, command, None, true, listener)
107            .await
108            .map_pyerr()?;
109        shutdown.join().await;
110        halt::<()>().await;
111        Ok(())
112    })
113}
114
115#[pyfunction]
116pub fn attach_to_workers(
117    instance: &crate::context::PyInstance,
118    workers: Vec<Bound<'_, PyPythonTask>>,
119    name: Option<&str>,
120) -> PyResult<PyPythonTask> {
121    let tasks = workers
122        .into_iter()
123        .map(|x| x.borrow_mut().take_task())
124        .collect::<PyResult<Vec<_>>>()?;
125
126    // `Label::strip` (vs. `Label::new`) sanitizes user-supplied names — lowercases,
127    // drops illegal characters, falls back to "nil" if empty. Callers pass names
128    // derived from experiment / job names that may contain uppercase or punctuation;
129    // rejecting them surfaces as an opaque PyException far from the input site.
130    let name = HostMeshId::instance(Label::strip(name.unwrap_or("hosts")));
131    let instance = instance.clone();
132    PyPythonTask::new(async move {
133        let results = try_join_all(tasks).await?;
134
135        let addresses: Result<Vec<ChannelAddr>, anyhow::Error> = monarch_with_gil(|py| {
136            results
137                .into_iter()
138                .map(|result| {
139                    let url_str: String = result.bind(py).extract()?;
140                    ChannelAddr::from_zmq_url(&url_str)
141                })
142                .collect()
143        })
144        .await;
145        let addresses = addresses?;
146
147        let host_mesh = HostMesh::attach(&*instance, name, addresses)
148            .await
149            .map_err(|e| anyhow::anyhow!("attach failed: {}", e))?;
150        Ok(PyHostMesh::new_owned(host_mesh))
151    })
152}
153
154pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
155    let f = wrap_pyfunction!(bootstrap_main, hyperactor_mod)?;
156    f.setattr(
157        "__module__",
158        "monarch._rust_bindings.monarch_hyperactor.bootstrap",
159    )?;
160    hyperactor_mod.add_function(f)?;
161
162    let f = wrap_pyfunction!(run_worker_loop_forever, hyperactor_mod)?;
163    f.setattr(
164        "__module__",
165        "monarch._rust_bindings.monarch_hyperactor.bootstrap",
166    )?;
167    hyperactor_mod.add_function(f)?;
168
169    let f = wrap_pyfunction!(attach_to_workers, hyperactor_mod)?;
170    f.setattr(
171        "__module__",
172        "monarch._rust_bindings.monarch_hyperactor.bootstrap",
173    )?;
174    hyperactor_mod.add_function(f)?;
175
176    Ok(())
177}