monarch_hyperactor/
bootstrap.rs1use futures::future::try_join_all;
10use hyperactor::channel::ChannelAddr;
11use hyperactor_mesh::Bootstrap;
12use hyperactor_mesh::Name;
13use hyperactor_mesh::bootstrap::BootstrapCommand;
14use hyperactor_mesh::bootstrap::bootstrap;
15use hyperactor_mesh::host_mesh::HostMesh;
16use monarch_types::MapPyErr;
17use pyo3::Bound;
18use pyo3::PyAny;
19use pyo3::PyResult;
20use pyo3::Python;
21use pyo3::exceptions::PyException;
22use pyo3::exceptions::PyRuntimeError;
23use pyo3::pyfunction;
24use pyo3::types::PyAnyMethods;
25use pyo3::types::PyModule;
26use pyo3::types::PyModuleMethods;
27use pyo3::wrap_pyfunction;
28
29use crate::host_mesh::PyHostMesh;
30use crate::pytokio::PyPythonTask;
31use crate::runtime::monarch_with_gil;
32
33#[pyfunction]
34#[pyo3(signature = ())]
35pub fn bootstrap_main(py: Python) -> PyResult<Bound<PyAny>> {
36 let _ = unsafe {
38 fbinit::perform_init();
39 };
40
41 hyperactor::internal_macro_support::tracing::debug!("entering async bootstrap");
42 crate::runtime::future_into_py::<_, i32>(py, async move {
43 let _destroy_guard = unsafe { fbinit::DestroyGuard::new() };
48 bootstrap()
49 .await
50 .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
51 })
52}
53
54#[pyfunction]
55pub fn run_worker_loop_forever(_py: Python<'_>, address: &str) -> PyResult<PyPythonTask> {
56 let addr = ChannelAddr::from_zmq_url(address)?;
57
58 let invoked_name = std::env::var("FB_XAR_INVOKED_NAME");
60
61 let mut env: std::collections::HashMap<String, String> = std::env::vars().collect();
62
63 let command = Some(if let Ok(invoked_name) = invoked_name {
64 let current_exe = std::path::PathBuf::from(&invoked_name);
66
67 env.insert(
69 "PAR_MAIN_OVERRIDE".to_string(),
70 "monarch._src.actor.bootstrap_main".to_string(),
71 );
72 BootstrapCommand {
73 program: current_exe,
74 arg0: Some(invoked_name),
75 args: vec![],
76 env,
77 }
78 } else {
79 let current_exe = std::env::current_exe().map_err(|e| {
81 pyo3::PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
82 "Failed to get current executable: {}",
83 e
84 ))
85 })?;
86 let current_exe_str = current_exe.to_string_lossy().to_string();
87 BootstrapCommand {
88 program: current_exe,
89 arg0: Some(current_exe_str),
90 args: vec![
91 "-m".to_string(),
92 "monarch._src.actor.bootstrap_main".to_string(),
93 ],
94 env,
95 }
96 });
97
98 let boot = Bootstrap::Host {
99 addr,
100 config: None,
101 command,
102 exit_on_shutdown: true,
105 };
106
107 PyPythonTask::new(async {
108 let err = boot.bootstrap().await.unwrap_err();
110 Err(err).map_pyerr()?;
111 Ok(())
112 })
113}
114
115#[pyfunction]
116pub fn attach_to_workers<'py>(
117 instance: &crate::context::PyInstance,
118 workers: Vec<Bound<'py, PyPythonTask>>,
119 name: Option<&str>,
120) -> PyResult<PyPythonTask> {
121 let tasks = workers
122 .into_iter()
123 .map(|x| x.borrow_mut().take_task())
124 .collect::<PyResult<Vec<_>>>()?;
125
126 let name =
127 Name::new(name.unwrap_or("hosts")).map_err(|err| PyException::new_err(err.to_string()))?;
128 let instance = instance.clone();
129 PyPythonTask::new(async move {
130 let results = try_join_all(tasks).await?;
131
132 let addresses: Result<Vec<ChannelAddr>, anyhow::Error> = monarch_with_gil(|py| {
133 results
134 .into_iter()
135 .map(|result| {
136 let url_str: String = result.bind(py).extract()?;
137 ChannelAddr::from_zmq_url(&url_str)
138 })
139 .collect()
140 })
141 .await;
142 let addresses = addresses?;
143
144 let host_mesh = HostMesh::attach(&*instance, name, addresses)
145 .await
146 .map_err(|e| anyhow::anyhow!("attach failed: {}", e))?;
147 Ok(PyHostMesh::new_owned(host_mesh))
148 })
149}
150
151pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
152 let f = wrap_pyfunction!(bootstrap_main, hyperactor_mod)?;
153 f.setattr(
154 "__module__",
155 "monarch._rust_bindings.monarch_hyperactor.bootstrap",
156 )?;
157 hyperactor_mod.add_function(f)?;
158
159 let f = wrap_pyfunction!(run_worker_loop_forever, hyperactor_mod)?;
160 f.setattr(
161 "__module__",
162 "monarch._rust_bindings.monarch_hyperactor.bootstrap",
163 )?;
164 hyperactor_mod.add_function(f)?;
165
166 let f = wrap_pyfunction!(attach_to_workers, hyperactor_mod)?;
167 f.setattr(
168 "__module__",
169 "monarch._rust_bindings.monarch_hyperactor.bootstrap",
170 )?;
171 hyperactor_mod.add_function(f)?;
172
173 Ok(())
174}