monarch_hyperactor/
bootstrap.rs1use futures::future::try_join_all;
10use hyperactor::channel::ChannelAddr;
11use hyperactor::id::Label;
12use hyperactor_mesh::bootstrap::BootstrapCommand;
13use hyperactor_mesh::bootstrap::bootstrap;
14use hyperactor_mesh::bootstrap::halt;
15use hyperactor_mesh::bootstrap::host;
16use hyperactor_mesh::host_mesh::HostMesh;
17use hyperactor_mesh::mesh_id::HostMeshId;
18use monarch_types::MapPyErr;
19use pyo3::Bound;
20use pyo3::PyAny;
21use pyo3::PyResult;
22use pyo3::Python;
23use pyo3::exceptions::PyRuntimeError;
24use pyo3::pyfunction;
25use pyo3::types::PyAnyMethods;
26use pyo3::types::PyModule;
27use pyo3::types::PyModuleMethods;
28use pyo3::wrap_pyfunction;
29
30use crate::host_mesh::PyHostMesh;
31use crate::pytokio::PyPythonTask;
32use crate::runtime::monarch_with_gil;
33
34#[pyfunction]
35#[pyo3(signature = ())]
36pub fn bootstrap_main(py: Python) -> PyResult<Bound<PyAny>> {
37 unsafe {
39 fbinit::perform_init();
40 };
41
42 hyperactor::internal_macro_support::tracing::debug!("entering async bootstrap");
43 crate::runtime::future_into_py::<_, i32>(py, async move {
44 let _destroy_guard = unsafe { fbinit::DestroyGuard::new() };
49 bootstrap()
50 .await
51 .map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
52 })
53}
54
55#[pyfunction]
56pub fn run_worker_loop_forever(_py: Python<'_>, address: &str) -> PyResult<PyPythonTask> {
57 let (addr, listener) = ChannelAddr::from_zmq_url_with_listener(address)?;
58
59 let invoked_name = std::env::var("FB_XAR_INVOKED_NAME");
61
62 let mut env: std::collections::HashMap<String, String> = std::env::vars().collect();
63
64 let command = Some(if let Ok(invoked_name) = invoked_name {
65 let current_exe = std::path::PathBuf::from(&invoked_name);
67
68 env.insert(
70 "PAR_MAIN_OVERRIDE".to_string(),
71 "monarch._src.actor.bootstrap_main".to_string(),
72 );
73 BootstrapCommand {
74 program: current_exe,
75 arg0: Some(invoked_name),
76 args: vec![],
77 env,
78 }
79 } else {
80 let current_exe = std::env::args()
85 .next()
86 .map(std::path::PathBuf::from)
87 .or_else(|| std::env::current_exe().ok())
88 .ok_or_else(|| {
89 pyo3::PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
90 "Failed to determine current executable",
91 )
92 })?;
93 let current_exe_str = current_exe.to_string_lossy().to_string();
94 BootstrapCommand {
95 program: current_exe,
96 arg0: Some(current_exe_str),
97 args: vec![
98 "-m".to_string(),
99 "monarch._src.actor.bootstrap_main".to_string(),
100 ],
101 env,
102 }
103 });
104
105 PyPythonTask::new(async move {
106 let (_agent_handle, shutdown) = host(addr, command, None, true, listener)
107 .await
108 .map_pyerr()?;
109 shutdown.join().await;
110 halt::<()>().await;
111 Ok(())
112 })
113}
114
115#[pyfunction]
116pub fn attach_to_workers(
117 instance: &crate::context::PyInstance,
118 workers: Vec<Bound<'_, PyPythonTask>>,
119 name: Option<&str>,
120) -> PyResult<PyPythonTask> {
121 let tasks = workers
122 .into_iter()
123 .map(|x| x.borrow_mut().take_task())
124 .collect::<PyResult<Vec<_>>>()?;
125
126 let name = HostMeshId::instance(Label::strip(name.unwrap_or("hosts")));
131 let instance = instance.clone();
132 PyPythonTask::new(async move {
133 let results = try_join_all(tasks).await?;
134
135 let addresses: Result<Vec<ChannelAddr>, anyhow::Error> = monarch_with_gil(|py| {
136 results
137 .into_iter()
138 .map(|result| {
139 let url_str: String = result.bind(py).extract()?;
140 ChannelAddr::from_zmq_url(&url_str)
141 })
142 .collect()
143 })
144 .await;
145 let addresses = addresses?;
146
147 let host_mesh = HostMesh::attach(&*instance, name, addresses)
148 .await
149 .map_err(|e| anyhow::anyhow!("attach failed: {}", e))?;
150 Ok(PyHostMesh::new_owned(host_mesh))
151 })
152}
153
154pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
155 let f = wrap_pyfunction!(bootstrap_main, hyperactor_mod)?;
156 f.setattr(
157 "__module__",
158 "monarch._rust_bindings.monarch_hyperactor.bootstrap",
159 )?;
160 hyperactor_mod.add_function(f)?;
161
162 let f = wrap_pyfunction!(run_worker_loop_forever, hyperactor_mod)?;
163 f.setattr(
164 "__module__",
165 "monarch._rust_bindings.monarch_hyperactor.bootstrap",
166 )?;
167 hyperactor_mod.add_function(f)?;
168
169 let f = wrap_pyfunction!(attach_to_workers, hyperactor_mod)?;
170 f.setattr(
171 "__module__",
172 "monarch._rust_bindings.monarch_hyperactor.bootstrap",
173 )?;
174 hyperactor_mod.add_function(f)?;
175
176 Ok(())
177}