1use std::future::Future;
17use std::io::Error;
18use std::pin::Pin;
19
20use bytes::{Bytes, BytesMut};
21use dfir_lang::diagnostic::Diagnostics;
22use dfir_lang::graph::DfirGraph;
23use futures::{Sink, Stream};
24use proc_macro2::Span;
25use quote::quote;
26use serde::Serialize;
27use serde::de::DeserializeOwned;
28use stageleft::{QuotedWithContext, q};
29
30use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
31use crate::compile::builder::ExternalPortId;
32use crate::location::dynamic::LocationId;
33use crate::location::member_id::TaglessMemberId;
34use crate::location::{LocationKey, MembershipEvent, NetworkHint};
35
36pub enum EmbeddedDeploy {}
40
41#[derive(Clone)]
43pub struct EmbeddedNode {
44 pub fn_name: String,
46}
47
48impl Node for EmbeddedNode {
49 type Port = ();
50 type Meta = ();
51 type InstantiateEnv = ();
52
53 fn next_port(&self) -> Self::Port {}
54
55 fn update_meta(&self, _meta: &Self::Meta) {}
56
57 fn instantiate(
58 &self,
59 _env: &mut Self::InstantiateEnv,
60 _meta: &mut Self::Meta,
61 _graph: DfirGraph,
62 _extra_stmts: &[syn::Stmt],
63 _sidecars: &[syn::Expr],
64 ) {
65 }
67}
68
69impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
70 fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
71 panic!("EmbeddedDeploy does not support external ports");
72 }
73
74 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
75 fn as_bytes_bidi(
76 &self,
77 _external_port_id: ExternalPortId,
78 ) -> impl Future<
79 Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
80 > + 'a {
81 async { panic!("EmbeddedDeploy does not support external ports") }
82 }
83
84 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
85 fn as_bincode_bidi<InT, OutT>(
86 &self,
87 _external_port_id: ExternalPortId,
88 ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
89 where
90 InT: Serialize + 'static,
91 OutT: DeserializeOwned + 'static,
92 {
93 async { panic!("EmbeddedDeploy does not support external ports") }
94 }
95
96 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
97 fn as_bincode_sink<T>(
98 &self,
99 _external_port_id: ExternalPortId,
100 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
101 where
102 T: Serialize + 'static,
103 {
104 async { panic!("EmbeddedDeploy does not support external ports") }
105 }
106
107 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
108 fn as_bincode_source<T>(
109 &self,
110 _external_port_id: ExternalPortId,
111 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
112 where
113 T: DeserializeOwned + 'static,
114 {
115 async { panic!("EmbeddedDeploy does not support external ports") }
116 }
117}
118
119impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
120 fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
121 EmbeddedNode {
122 fn_name: self.into(),
123 }
124 }
125}
126
127impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
128 fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
129 EmbeddedNode {
130 fn_name: self.into(),
131 }
132 }
133}
134
135impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
136 fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
137 EmbeddedNode {
138 fn_name: self.into(),
139 }
140 }
141}
142
143impl<'a> Deploy<'a> for EmbeddedDeploy {
144 type Meta = ();
145 type InstantiateEnv = ();
146
147 type Process = EmbeddedNode;
148 type Cluster = EmbeddedNode;
149 type External = EmbeddedNode;
150
151 fn o2o_sink_source(
152 _p1: &Self::Process,
153 _p1_port: &(),
154 _p2: &Self::Process,
155 _p2_port: &(),
156 ) -> (syn::Expr, syn::Expr) {
157 panic!("EmbeddedDeploy does not support networking (o2o)")
158 }
159
160 fn o2o_connect(
161 _p1: &Self::Process,
162 _p1_port: &(),
163 _p2: &Self::Process,
164 _p2_port: &(),
165 ) -> Box<dyn FnOnce()> {
166 panic!("EmbeddedDeploy does not support networking (o2o)")
167 }
168
169 fn o2m_sink_source(
170 _p1: &Self::Process,
171 _p1_port: &(),
172 _c2: &Self::Cluster,
173 _c2_port: &(),
174 ) -> (syn::Expr, syn::Expr) {
175 panic!("EmbeddedDeploy does not support networking (o2m)")
176 }
177
178 fn o2m_connect(
179 _p1: &Self::Process,
180 _p1_port: &(),
181 _c2: &Self::Cluster,
182 _c2_port: &(),
183 ) -> Box<dyn FnOnce()> {
184 panic!("EmbeddedDeploy does not support networking (o2m)")
185 }
186
187 fn m2o_sink_source(
188 _c1: &Self::Cluster,
189 _c1_port: &(),
190 _p2: &Self::Process,
191 _p2_port: &(),
192 ) -> (syn::Expr, syn::Expr) {
193 panic!("EmbeddedDeploy does not support networking (m2o)")
194 }
195
196 fn m2o_connect(
197 _c1: &Self::Cluster,
198 _c1_port: &(),
199 _p2: &Self::Process,
200 _p2_port: &(),
201 ) -> Box<dyn FnOnce()> {
202 panic!("EmbeddedDeploy does not support networking (m2o)")
203 }
204
205 fn m2m_sink_source(
206 _c1: &Self::Cluster,
207 _c1_port: &(),
208 _c2: &Self::Cluster,
209 _c2_port: &(),
210 ) -> (syn::Expr, syn::Expr) {
211 panic!("EmbeddedDeploy does not support networking (m2m)")
212 }
213
214 fn m2m_connect(
215 _c1: &Self::Cluster,
216 _c1_port: &(),
217 _c2: &Self::Cluster,
218 _c2_port: &(),
219 ) -> Box<dyn FnOnce()> {
220 panic!("EmbeddedDeploy does not support networking (m2m)")
221 }
222
223 fn e2o_many_source(
224 _extra_stmts: &mut Vec<syn::Stmt>,
225 _p2: &Self::Process,
226 _p2_port: &(),
227 _codec_type: &syn::Type,
228 _shared_handle: String,
229 ) -> syn::Expr {
230 panic!("EmbeddedDeploy does not support networking (e2o)")
231 }
232
233 fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
234 panic!("EmbeddedDeploy does not support networking (e2o)")
235 }
236
237 fn e2o_source(
238 _extra_stmts: &mut Vec<syn::Stmt>,
239 _p1: &Self::External,
240 _p1_port: &(),
241 _p2: &Self::Process,
242 _p2_port: &(),
243 _codec_type: &syn::Type,
244 _shared_handle: String,
245 ) -> syn::Expr {
246 panic!("EmbeddedDeploy does not support networking (e2o)")
247 }
248
249 fn e2o_connect(
250 _p1: &Self::External,
251 _p1_port: &(),
252 _p2: &Self::Process,
253 _p2_port: &(),
254 _many: bool,
255 _server_hint: NetworkHint,
256 ) -> Box<dyn FnOnce()> {
257 panic!("EmbeddedDeploy does not support networking (e2o)")
258 }
259
260 fn o2e_sink(
261 _p1: &Self::Process,
262 _p1_port: &(),
263 _p2: &Self::External,
264 _p2_port: &(),
265 _shared_handle: String,
266 ) -> syn::Expr {
267 panic!("EmbeddedDeploy does not support networking (o2e)")
268 }
269
270 #[expect(
271 unreachable_code,
272 reason = "panic before q! which is only for return type"
273 )]
274 fn cluster_ids(
275 _of_cluster: LocationKey,
276 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
277 panic!("EmbeddedDeploy does not support cluster IDs");
278 q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
279 }
280
281 #[expect(
282 unreachable_code,
283 reason = "panic before q! which is only for return type"
284 )]
285 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
286 panic!("EmbeddedDeploy does not support cluster self ID");
287 q!(unreachable!(
288 "EmbeddedDeploy does not support cluster self ID"
289 ))
290 }
291
292 #[expect(
293 unreachable_code,
294 reason = "panic before q! which is only for return type"
295 )]
296 fn cluster_membership_stream(
297 _location_id: &LocationId,
298 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
299 {
300 panic!("EmbeddedDeploy does not support cluster membership streams");
301 q!(unreachable!(
302 "EmbeddedDeploy does not support cluster membership streams"
303 ))
304 }
305}
306
307impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
308 pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
339 let compiled = self.compile_internal();
340
341 let root = crate::staging_util::get_this_crate();
342 let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
343
344 let mut functions: Vec<syn::Item> = Vec::new();
345
346 let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
348 location_keys.sort();
349
350 for location_key in location_keys {
351 let graph = &compiled.all_dfir()[location_key];
352
353 let fn_name = self
355 .processes
356 .get(location_key)
357 .map(|n| &n.fn_name)
358 .or_else(|| self.clusters.get(location_key).map(|n| &n.fn_name))
359 .or_else(|| self.externals.get(location_key).map(|n| &n.fn_name))
360 .expect("location key not found in any node map");
361
362 let fn_ident = syn::Ident::new(fn_name, Span::call_site());
363
364 let mut diagnostics = Diagnostics::new();
365 let dfir_tokens = graph
366 .as_code("e! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
367 .expect("DFIR code generation failed with diagnostics.");
368
369 let func: syn::Item = syn::parse_quote! {
370 #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
371 pub fn #fn_ident<'a>() -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
372 #dfir_tokens
373 }
374 };
375 functions.push(func);
376 }
377
378 syn::parse_quote! {
379 use #orig_crate_name::__staged::__deps::*;
380 use #root::prelude::*;
381 use #root::runtime_support::dfir_rs as __root_dfir_rs;
382 pub use #orig_crate_name::__staged;
383
384 #( #functions )*
385 }
386 }
387}