Skip to main content

hydro_lang/compile/
embedded.rs

1//! "Embedded" deployment backend for Hydro.
2//!
3//! Instead of compiling each location into a standalone binary, this backend generates
4//! a Rust source file containing one function per location. Each function returns a
5//! `dfir_rs::scheduled::graph::Dfir` that can be manually driven by the caller.
6//!
7//! This is useful when you want full control over where and how the projected DFIR
8//! code runs (e.g. embedding it into an existing application).
9//!
10//! # Limitations
11//!
12//! Networking is **not** supported. All `Deploy` networking trait methods will panic
13//! if called. Only pure local computations (with data embedded in the Hydro program)
14//! are supported.
15
16use std::future::Future;
17use std::io::Error;
18use std::pin::Pin;
19
20use bytes::{Bytes, BytesMut};
21use dfir_lang::diagnostic::Diagnostics;
22use dfir_lang::graph::DfirGraph;
23use futures::{Sink, Stream};
24use proc_macro2::Span;
25use quote::quote;
26use serde::Serialize;
27use serde::de::DeserializeOwned;
28use stageleft::{QuotedWithContext, q};
29
30use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
31use crate::compile::builder::ExternalPortId;
32use crate::location::dynamic::LocationId;
33use crate::location::member_id::TaglessMemberId;
34use crate::location::{LocationKey, MembershipEvent, NetworkHint};
35
36/// Marker type for the embedded deployment backend.
37///
38/// All networking methods panic — this backend only supports pure local computation.
39pub enum EmbeddedDeploy {}
40
41/// A trivial node type for embedded deployment. Stores a user-provided function name.
42#[derive(Clone)]
43pub struct EmbeddedNode {
44    /// The function name to use in the generated code for this location.
45    pub fn_name: String,
46}
47
48impl Node for EmbeddedNode {
49    type Port = ();
50    type Meta = ();
51    type InstantiateEnv = ();
52
53    fn next_port(&self) -> Self::Port {}
54
55    fn update_meta(&self, _meta: &Self::Meta) {}
56
57    fn instantiate(
58        &self,
59        _env: &mut Self::InstantiateEnv,
60        _meta: &mut Self::Meta,
61        _graph: DfirGraph,
62        _extra_stmts: &[syn::Stmt],
63        _sidecars: &[syn::Expr],
64    ) {
65        // No-op: embedded mode doesn't instantiate nodes at deploy time.
66    }
67}
68
69impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
70    fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
71        panic!("EmbeddedDeploy does not support external ports");
72    }
73
74    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
75    fn as_bytes_bidi(
76        &self,
77        _external_port_id: ExternalPortId,
78    ) -> impl Future<
79        Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
80    > + 'a {
81        async { panic!("EmbeddedDeploy does not support external ports") }
82    }
83
84    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
85    fn as_bincode_bidi<InT, OutT>(
86        &self,
87        _external_port_id: ExternalPortId,
88    ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
89    where
90        InT: Serialize + 'static,
91        OutT: DeserializeOwned + 'static,
92    {
93        async { panic!("EmbeddedDeploy does not support external ports") }
94    }
95
96    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
97    fn as_bincode_sink<T>(
98        &self,
99        _external_port_id: ExternalPortId,
100    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
101    where
102        T: Serialize + 'static,
103    {
104        async { panic!("EmbeddedDeploy does not support external ports") }
105    }
106
107    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
108    fn as_bincode_source<T>(
109        &self,
110        _external_port_id: ExternalPortId,
111    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
112    where
113        T: DeserializeOwned + 'static,
114    {
115        async { panic!("EmbeddedDeploy does not support external ports") }
116    }
117}
118
119impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
120    fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
121        EmbeddedNode {
122            fn_name: self.into(),
123        }
124    }
125}
126
127impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
128    fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
129        EmbeddedNode {
130            fn_name: self.into(),
131        }
132    }
133}
134
135impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
136    fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
137        EmbeddedNode {
138            fn_name: self.into(),
139        }
140    }
141}
142
143impl<'a> Deploy<'a> for EmbeddedDeploy {
144    type Meta = ();
145    type InstantiateEnv = ();
146
147    type Process = EmbeddedNode;
148    type Cluster = EmbeddedNode;
149    type External = EmbeddedNode;
150
151    fn o2o_sink_source(
152        _p1: &Self::Process,
153        _p1_port: &(),
154        _p2: &Self::Process,
155        _p2_port: &(),
156    ) -> (syn::Expr, syn::Expr) {
157        panic!("EmbeddedDeploy does not support networking (o2o)")
158    }
159
160    fn o2o_connect(
161        _p1: &Self::Process,
162        _p1_port: &(),
163        _p2: &Self::Process,
164        _p2_port: &(),
165    ) -> Box<dyn FnOnce()> {
166        panic!("EmbeddedDeploy does not support networking (o2o)")
167    }
168
169    fn o2m_sink_source(
170        _p1: &Self::Process,
171        _p1_port: &(),
172        _c2: &Self::Cluster,
173        _c2_port: &(),
174    ) -> (syn::Expr, syn::Expr) {
175        panic!("EmbeddedDeploy does not support networking (o2m)")
176    }
177
178    fn o2m_connect(
179        _p1: &Self::Process,
180        _p1_port: &(),
181        _c2: &Self::Cluster,
182        _c2_port: &(),
183    ) -> Box<dyn FnOnce()> {
184        panic!("EmbeddedDeploy does not support networking (o2m)")
185    }
186
187    fn m2o_sink_source(
188        _c1: &Self::Cluster,
189        _c1_port: &(),
190        _p2: &Self::Process,
191        _p2_port: &(),
192    ) -> (syn::Expr, syn::Expr) {
193        panic!("EmbeddedDeploy does not support networking (m2o)")
194    }
195
196    fn m2o_connect(
197        _c1: &Self::Cluster,
198        _c1_port: &(),
199        _p2: &Self::Process,
200        _p2_port: &(),
201    ) -> Box<dyn FnOnce()> {
202        panic!("EmbeddedDeploy does not support networking (m2o)")
203    }
204
205    fn m2m_sink_source(
206        _c1: &Self::Cluster,
207        _c1_port: &(),
208        _c2: &Self::Cluster,
209        _c2_port: &(),
210    ) -> (syn::Expr, syn::Expr) {
211        panic!("EmbeddedDeploy does not support networking (m2m)")
212    }
213
214    fn m2m_connect(
215        _c1: &Self::Cluster,
216        _c1_port: &(),
217        _c2: &Self::Cluster,
218        _c2_port: &(),
219    ) -> Box<dyn FnOnce()> {
220        panic!("EmbeddedDeploy does not support networking (m2m)")
221    }
222
223    fn e2o_many_source(
224        _extra_stmts: &mut Vec<syn::Stmt>,
225        _p2: &Self::Process,
226        _p2_port: &(),
227        _codec_type: &syn::Type,
228        _shared_handle: String,
229    ) -> syn::Expr {
230        panic!("EmbeddedDeploy does not support networking (e2o)")
231    }
232
233    fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
234        panic!("EmbeddedDeploy does not support networking (e2o)")
235    }
236
237    fn e2o_source(
238        _extra_stmts: &mut Vec<syn::Stmt>,
239        _p1: &Self::External,
240        _p1_port: &(),
241        _p2: &Self::Process,
242        _p2_port: &(),
243        _codec_type: &syn::Type,
244        _shared_handle: String,
245    ) -> syn::Expr {
246        panic!("EmbeddedDeploy does not support networking (e2o)")
247    }
248
249    fn e2o_connect(
250        _p1: &Self::External,
251        _p1_port: &(),
252        _p2: &Self::Process,
253        _p2_port: &(),
254        _many: bool,
255        _server_hint: NetworkHint,
256    ) -> Box<dyn FnOnce()> {
257        panic!("EmbeddedDeploy does not support networking (e2o)")
258    }
259
260    fn o2e_sink(
261        _p1: &Self::Process,
262        _p1_port: &(),
263        _p2: &Self::External,
264        _p2_port: &(),
265        _shared_handle: String,
266    ) -> syn::Expr {
267        panic!("EmbeddedDeploy does not support networking (o2e)")
268    }
269
270    #[expect(
271        unreachable_code,
272        reason = "panic before q! which is only for return type"
273    )]
274    fn cluster_ids(
275        _of_cluster: LocationKey,
276    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
277        panic!("EmbeddedDeploy does not support cluster IDs");
278        q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
279    }
280
281    #[expect(
282        unreachable_code,
283        reason = "panic before q! which is only for return type"
284    )]
285    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
286        panic!("EmbeddedDeploy does not support cluster self ID");
287        q!(unreachable!(
288            "EmbeddedDeploy does not support cluster self ID"
289        ))
290    }
291
292    #[expect(
293        unreachable_code,
294        reason = "panic before q! which is only for return type"
295    )]
296    fn cluster_membership_stream(
297        _location_id: &LocationId,
298    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
299    {
300        panic!("EmbeddedDeploy does not support cluster membership streams");
301        q!(unreachable!(
302            "EmbeddedDeploy does not support cluster membership streams"
303        ))
304    }
305}
306
307impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
308    /// Generates a `syn::File` containing one function per location in the flow.
309    ///
310    /// Each generated function has the signature:
311    /// ```ignore
312    /// pub fn <fn_name>() -> dfir_rs::scheduled::graph::Dfir<'static>
313    /// ```
314    /// where `fn_name` is the `String` passed to `with_process` / `with_cluster`.
315    ///
316    /// The returned `Dfir` can be manually executed by the caller.
317    ///
318    /// # Arguments
319    ///
320    /// * `crate_name` — the name of the crate containing the Hydro program (used for stageleft
321    ///   re-exports). Hyphens will be replaced with underscores.
322    ///
323    /// # Usage
324    ///
325    /// Typically called from a `build.rs` in a wrapper crate:
326    /// ```ignore
327    /// // build.rs
328    /// let deploy = flow.with_process(&process, "my_fn".to_string());
329    /// let code = deploy.generate_embedded("my_hydro_crate");
330    /// let out_dir = std::env::var("OUT_DIR").unwrap();
331    /// std::fs::write(format!("{out_dir}/embedded.rs"), prettyplease::unparse(&code)).unwrap();
332    /// ```
333    ///
334    /// Then in `lib.rs`:
335    /// ```ignore
336    /// include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
337    /// ```
338    pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
339        let compiled = self.compile_internal();
340
341        let root = crate::staging_util::get_this_crate();
342        let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
343
344        let mut functions: Vec<syn::Item> = Vec::new();
345
346        // Sort location keys for deterministic output.
347        let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
348        location_keys.sort();
349
350        for location_key in location_keys {
351            let graph = &compiled.all_dfir()[location_key];
352
353            // Get the user-provided function name from the node.
354            let fn_name = self
355                .processes
356                .get(location_key)
357                .map(|n| &n.fn_name)
358                .or_else(|| self.clusters.get(location_key).map(|n| &n.fn_name))
359                .or_else(|| self.externals.get(location_key).map(|n| &n.fn_name))
360                .expect("location key not found in any node map");
361
362            let fn_ident = syn::Ident::new(fn_name, Span::call_site());
363
364            let mut diagnostics = Diagnostics::new();
365            let dfir_tokens = graph
366                .as_code(&quote! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
367                .expect("DFIR code generation failed with diagnostics.");
368
369            let func: syn::Item = syn::parse_quote! {
370                #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
371                pub fn #fn_ident<'a>() -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
372                    #dfir_tokens
373                }
374            };
375            functions.push(func);
376        }
377
378        syn::parse_quote! {
379            use #orig_crate_name::__staged::__deps::*;
380            use #root::prelude::*;
381            use #root::runtime_support::dfir_rs as __root_dfir_rs;
382            pub use #orig_crate_name::__staged;
383
384            #( #functions )*
385        }
386    }
387}