monarch_hyperactor/
bootstrap.rs1use futures::future::try_join_all;
10use hyperactor::channel::ChannelAddr;
11use hyperactor_mesh::Bootstrap;
12use hyperactor_mesh::Name;
13use hyperactor_mesh::bootstrap::BootstrapCommand;
14use hyperactor_mesh::bootstrap::bootstrap;
15use hyperactor_mesh::host_mesh::HostMesh;
16use monarch_types::MapPyErr;
17use pyo3::Bound;
18use pyo3::PyAny;
19use pyo3::PyResult;
20use pyo3::Python;
21use pyo3::exceptions::PyException;
22use pyo3::exceptions::PyRuntimeError;
23use pyo3::pyfunction;
24use pyo3::types::PyAnyMethods;
25use pyo3::types::PyModule;
26use pyo3::types::PyModuleMethods;
27use pyo3::wrap_pyfunction;
28
29use crate::host_mesh::PyHostMesh;
30use crate::pytokio::PyPythonTask;
31use crate::runtime::monarch_with_gil;
32
33#[pyfunction]
34#[pyo3(signature = ())]
35pub fn bootstrap_main(py: Python) -> PyResult<Bound<PyAny>> {
36 let _ = unsafe {
38 fbinit::perform_init();
39 };
40
41 hyperactor::internal_macro_support::tracing::debug!("entering async bootstrap");
42 crate::runtime::future_into_py::<_, i32>(py, async move {
43 let _destroy_guard = unsafe { fbinit::DestroyGuard::new() };
48 bootstrap()
49 .await
50 .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
51 })
52}
53
54#[pyfunction]
55pub fn run_worker_loop_forever(_py: Python<'_>, address: &str) -> PyResult<PyPythonTask> {
56 let addr = ChannelAddr::from_zmq_url(address)?;
57
58 let invoked_name = std::env::var("FB_XAR_INVOKED_NAME");
60
61 let mut env: std::collections::HashMap<String, String> = std::env::vars().collect();
62
63 let command = Some(if let Ok(invoked_name) = invoked_name {
64 let current_exe = std::path::PathBuf::from(&invoked_name);
66
67 env.insert(
69 "PAR_MAIN_OVERRIDE".to_string(),
70 "monarch._src.actor.bootstrap_main".to_string(),
71 );
72 BootstrapCommand {
73 program: current_exe,
74 arg0: Some(invoked_name),
75 args: vec![],
76 env,
77 }
78 } else {
79 let current_exe = std::env::args()
84 .next()
85 .map(std::path::PathBuf::from)
86 .or_else(|| std::env::current_exe().ok())
87 .ok_or_else(|| {
88 pyo3::PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
89 "Failed to determine current executable",
90 )
91 })?;
92 let current_exe_str = current_exe.to_string_lossy().to_string();
93 BootstrapCommand {
94 program: current_exe,
95 arg0: Some(current_exe_str),
96 args: vec![
97 "-m".to_string(),
98 "monarch._src.actor.bootstrap_main".to_string(),
99 ],
100 env,
101 }
102 });
103
104 let boot = Bootstrap::Host {
105 addr,
106 config: None,
107 command,
108 exit_on_shutdown: true,
111 };
112
113 PyPythonTask::new(async {
114 let err = boot.bootstrap().await.unwrap_err();
116 Err(err).map_pyerr()?;
117 Ok(())
118 })
119}
120
121#[pyfunction]
122pub fn attach_to_workers<'py>(
123 instance: &crate::context::PyInstance,
124 workers: Vec<Bound<'py, PyPythonTask>>,
125 name: Option<&str>,
126) -> PyResult<PyPythonTask> {
127 let tasks = workers
128 .into_iter()
129 .map(|x| x.borrow_mut().take_task())
130 .collect::<PyResult<Vec<_>>>()?;
131
132 let name =
133 Name::new(name.unwrap_or("hosts")).map_err(|err| PyException::new_err(err.to_string()))?;
134 let instance = instance.clone();
135 PyPythonTask::new(async move {
136 let results = try_join_all(tasks).await?;
137
138 let addresses: Result<Vec<ChannelAddr>, anyhow::Error> = monarch_with_gil(|py| {
139 results
140 .into_iter()
141 .map(|result| {
142 let url_str: String = result.bind(py).extract()?;
143 ChannelAddr::from_zmq_url(&url_str)
144 })
145 .collect()
146 })
147 .await;
148 let addresses = addresses?;
149
150 let host_mesh = HostMesh::attach(&*instance, name, addresses)
151 .await
152 .map_err(|e| anyhow::anyhow!("attach failed: {}", e))?;
153 Ok(PyHostMesh::new_owned(host_mesh))
154 })
155}
156
157pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
158 let f = wrap_pyfunction!(bootstrap_main, hyperactor_mod)?;
159 f.setattr(
160 "__module__",
161 "monarch._rust_bindings.monarch_hyperactor.bootstrap",
162 )?;
163 hyperactor_mod.add_function(f)?;
164
165 let f = wrap_pyfunction!(run_worker_loop_forever, hyperactor_mod)?;
166 f.setattr(
167 "__module__",
168 "monarch._rust_bindings.monarch_hyperactor.bootstrap",
169 )?;
170 hyperactor_mod.add_function(f)?;
171
172 let f = wrap_pyfunction!(attach_to_workers, hyperactor_mod)?;
173 f.setattr(
174 "__module__",
175 "monarch._rust_bindings.monarch_hyperactor.bootstrap",
176 )?;
177 hyperactor_mod.add_function(f)?;
178
179 Ok(())
180}