monarch_hyperactor/code_sync/
auto_reload.rs1use std::sync::Arc;
10
11use anyhow::Result;
12use async_trait::async_trait;
13use hyperactor::Actor;
14use hyperactor::Context;
15use hyperactor::Handler;
16use hyperactor::RemoteSpawn;
17use hyperactor::reference;
18use hyperactor_config::Flattrs;
19use monarch_types::SerializablePyErr;
20use pyo3::prelude::*;
21use serde::Deserialize;
22use serde::Serialize;
23use typeuri::Named;
24
25use crate::runtime::monarch_with_gil_blocking;
26
27#[derive(Debug, Clone, Named, Serialize, Deserialize)]
29pub struct AutoReloadMessage {
30 pub result: reference::PortRef<Result<(), String>>,
31}
32wirevalue::register_type!(AutoReloadMessage);
33
34#[derive(Debug, Clone, Named, Serialize, Deserialize)]
36pub struct AutoReloadParams {}
37wirevalue::register_type!(AutoReloadParams);
38
39#[derive(Debug)]
41#[hyperactor::export(spawn = true, handlers = [AutoReloadMessage])]
42pub struct AutoReloadActor {
43 state: Result<(Arc<Py<PyAny>>, Py<PyAny>), SerializablePyErr>,
44}
45
46impl Actor for AutoReloadActor {}
47
48#[async_trait]
49impl RemoteSpawn for AutoReloadActor {
50 type Params = AutoReloadParams;
51
52 async fn new(Self::Params {}: Self::Params, _environment: Flattrs) -> Result<Self> {
53 AutoReloadActor::new().await
54 }
55}
56
57impl AutoReloadActor {
58 pub(crate) async fn new() -> Result<Self, anyhow::Error> {
59 Ok(Self {
60 state: tokio::task::spawn_blocking(move || {
61 monarch_with_gil_blocking(|py| {
62 Self::create_state(py).map_err(SerializablePyErr::from_fn(py))
63 })
64 })
65 .await?,
66 })
67 }
68
69 fn create_state(py: Python) -> PyResult<(Arc<Py<PyAny>>, Py<PyAny>)> {
70 let auto_reload_module = py.import("monarch._src.actor.code_sync.auto_reload")?;
72 let auto_reloader_class = auto_reload_module.getattr("AutoReloader")?;
73
74 let reloader = auto_reloader_class.call0()?;
75
76 let sys_audit_import_hook_class = auto_reload_module.getattr("SysAuditImportHook")?;
78 let import_callback = reloader.getattr("import_callback")?;
79 let hook_guard = sys_audit_import_hook_class.call_method1("install", (import_callback,))?;
80
81 Ok((Arc::new(reloader.into()), hook_guard.into()))
82 }
83
84 fn reload(py: Python, py_reloader: &Py<PyAny>) -> PyResult<()> {
85 let reloader = py_reloader.bind(py);
86 let changed_modules: Vec<String> = reloader.call_method0("reload_changes")?.extract()?;
87 if !changed_modules.is_empty() {
88 eprintln!("reloaded modules: {:?}", changed_modules);
89 }
90 Ok(())
91 }
92}
93
94#[async_trait]
95impl Handler<AutoReloadMessage> for AutoReloadActor {
96 async fn handle(
97 &mut self,
98 cx: &Context<Self>,
99 AutoReloadMessage { result }: AutoReloadMessage,
100 ) -> Result<()> {
101 let res = async {
103 let py_reloader: Arc<_> = self.state.as_ref().map_err(Clone::clone)?.0.clone();
104 tokio::task::spawn_blocking(move || {
105 monarch_with_gil_blocking(|py| {
106 Self::reload(py, py_reloader.as_ref()).map_err(SerializablePyErr::from_fn(py))
107 })
108 })
109 .await??;
110 anyhow::Ok(())
111 }
112 .await;
113 result.send(cx, res.map_err(|e| format!("{:#?}", e)))?;
114 Ok(())
115 }
116}