Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39/// A closure expression bundled with any singleton references it captures.
40///
41/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
42/// alongside the closure's expression. This allows per-closure tracking of singleton
43/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
44pub struct ClosureExpr {
45    pub(crate) expr: DebugExpr,
46    /// Each entry is `(HydroNode::Reference, is_mut: bool)`.
47    /// The index in the Vec determines the ident name via [`handoff_ref_ident`].
48    /// The `access_counter` was assigned at staging time in code order.
49    pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53    fn clone(&self) -> Self {
54        Self {
55            expr: self.expr.clone(),
56            singleton_refs: self
57                .singleton_refs
58                .iter()
59                .map(|(node, is_mut)| {
60                    let HydroNode::Reference {
61                        inner,
62                        kind,
63                        access_counter,
64                        metadata,
65                    } = node
66                    else {
67                        panic!("singleton_refs should only contain HydroNode::Reference");
68                    };
69                    (
70                        HydroNode::Reference {
71                            inner: SharedNode(Rc::clone(&inner.0)),
72                            kind: *kind,
73                            access_counter: access_counter.freeze(),
74                            metadata: metadata.clone(),
75                        },
76                        *is_mut,
77                    )
78                })
79                .collect(),
80        }
81    }
82}
83
84impl Hash for ClosureExpr {
85    fn hash<H: Hasher>(&self, state: &mut H) {
86        self.expr.hash(state);
87        // singleton_refs are structural children (like HydroIrMetadata), not
88        // identity-defining. Two closures with the same expr but different
89        // captured refs are the same closure text — the refs only affect codegen.
90    }
91}
92
93impl serde::Serialize for ClosureExpr {
94    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95        use serde::ser::SerializeStruct;
96        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97        s.serialize_field("expr", &self.expr)?;
98        s.serialize_field(
99            "singleton_refs",
100            &SerializableSingletonRefs(&self.singleton_refs),
101        )?;
102        s.end()
103    }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110        use serde::ser::SerializeSeq;
111        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112        for (node, is_mut) in self.0.iter() {
113            seq.serialize_element(&(node, is_mut))?;
114        }
115        seq.end()
116    }
117}
118
119impl Debug for ClosureExpr {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        Debug::fmt(&self.expr, f)
122    }
123}
124
125impl Display for ClosureExpr {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        Display::fmt(&self.expr, f)
128    }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132    fn from(expr: syn::Expr) -> Self {
133        Self {
134            expr: DebugExpr(Box::new(expr)),
135            singleton_refs: Vec::new(),
136        }
137    }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141    fn from(expr: DebugExpr) -> Self {
142        Self {
143            expr,
144            singleton_refs: Vec::new(),
145        }
146    }
147}
148
149impl ClosureExpr {
150    pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151        Self {
152            expr,
153            singleton_refs,
154        }
155    }
156
157    pub fn has_mut_ref(&self) -> bool {
158        self.singleton_refs.iter().any(|(_, is_mut)| *is_mut)
159    }
160
161    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
162        Self {
163            expr: self.expr.clone(),
164            singleton_refs: self
165                .singleton_refs
166                .iter()
167                .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
168                .collect(),
169        }
170    }
171
172    pub fn transform_children(
173        &mut self,
174        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
175        seen_tees: &mut SeenSharedNodes,
176    ) {
177        for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
178            transform(ref_node, seen_tees);
179        }
180    }
181
182    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
183    /// replacing local singleton ref idents with `#{N} dfir_ident` or `#{N} mut dfir_ident` references.
184    #[cfg(feature = "build")]
185    pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
186        if self.singleton_refs.is_empty() {
187            self.expr.0.to_token_stream()
188        } else {
189            assert!(
190                ident_stack.len() >= self.singleton_refs.len(),
191                "ident_stack has {} entries but expected at least {} for singleton_refs",
192                ident_stack.len(),
193                self.singleton_refs.len()
194            );
195            let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
196
197            let mut let_bindings = Vec::new();
198            for ((i, (ref_node, is_mut)), ref_ident) in
199                self.singleton_refs.iter().enumerate().zip(ref_idents)
200            {
201                let HydroNode::Reference { access_counter, .. } = ref_node else {
202                    panic!("ClosureExpression expected references to `HydroNode::Reference`");
203                };
204                let group = access_counter.frozen_group();
205                // TODO(mingwei): proper spanning?
206                let local_ident = handoff_ref_ident(i);
207                let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
208                let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
209                let mut_token = is_mut.then(|| quote!(mut));
210                let binding = quote! {
211                    let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
212                };
213                let_bindings.push(binding);
214            }
215
216            let expr = &self.expr.0;
217            quote! {
218                {
219                    #( #let_bindings )*
220                    #expr
221                }
222            }
223        }
224    }
225}
226
227/// Wrapper that displays only the tokens of a parsed expr.
228///
229/// Boxes `syn::Type` which is ~240 bytes.
230#[derive(Clone, Hash)]
231pub struct DebugExpr(pub Box<syn::Expr>);
232
233impl serde::Serialize for DebugExpr {
234    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
235        serializer.serialize_str(&self.to_string())
236    }
237}
238
239impl From<syn::Expr> for DebugExpr {
240    fn from(expr: syn::Expr) -> Self {
241        Self(Box::new(expr))
242    }
243}
244
245impl Deref for DebugExpr {
246    type Target = syn::Expr;
247
248    fn deref(&self) -> &Self::Target {
249        &self.0
250    }
251}
252
253impl ToTokens for DebugExpr {
254    fn to_tokens(&self, tokens: &mut TokenStream) {
255        self.0.to_tokens(tokens);
256    }
257}
258
259impl Debug for DebugExpr {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        write!(f, "{}", self.0.to_token_stream())
262    }
263}
264
265impl Display for DebugExpr {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        let original = self.0.as_ref().clone();
268        let simplified = simplify_q_macro(original);
269
270        // For now, just use quote formatting without trying to parse as a statement
271        // This avoids the syn::parse_quote! issues entirely
272        write!(f, "q!({})", quote::quote!(#simplified))
273    }
274}
275
276/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
277fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
278    if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
279        // Look for calls to stageleft::runtime_support::fn*
280        && is_stageleft_runtime_support_call(&path_expr.path)
281        && let syn::Expr::Block(b) = &call.args[0]
282        && b.block.stmts.len() == 3
283        && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
284    // skip the first two, which are imports
285    {
286        let mut e = e.clone();
287        while let syn::Expr::Block(ref mut block) = e
288            && block.block.stmts.len() == 1
289            && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
290        {
291            e = inner_e;
292        }
293
294        e
295    } else {
296        expr
297    }
298}
299
300fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
301    // Check if this is a call to stageleft::runtime_support::fn*
302    if let Some(last_segment) = path.segments.last() {
303        let fn_name = last_segment.ident.to_string();
304        path.segments.len() > 2
305            && path.segments[0].ident == "stageleft"
306            && path.segments[1].ident == "runtime_support"
307            && fn_name.contains("_type_hint")
308    } else {
309        false
310    }
311}
312
313/// Debug displays the type's tokens.
314///
315/// Boxes `syn::Type` which is ~320 bytes.
316#[derive(Clone, PartialEq, Eq, Hash)]
317pub struct DebugType(pub Box<syn::Type>);
318
319impl From<syn::Type> for DebugType {
320    fn from(t: syn::Type) -> Self {
321        Self(Box::new(t))
322    }
323}
324
325impl Deref for DebugType {
326    type Target = syn::Type;
327
328    fn deref(&self) -> &Self::Target {
329        &self.0
330    }
331}
332
333impl ToTokens for DebugType {
334    fn to_tokens(&self, tokens: &mut TokenStream) {
335        self.0.to_tokens(tokens);
336    }
337}
338
339impl Debug for DebugType {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        write!(f, "{}", self.0.to_token_stream())
342    }
343}
344
345impl serde::Serialize for DebugType {
346    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
347        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
348    }
349}
350
351fn serialize_backtrace_as_span<S: serde::Serializer>(
352    backtrace: &Backtrace,
353    serializer: S,
354) -> Result<S::Ok, S::Error> {
355    match backtrace.format_span() {
356        Some(span) => serializer.serialize_some(&span),
357        None => serializer.serialize_none(),
358    }
359}
360
361fn serialize_ident<S: serde::Serializer>(
362    ident: &syn::Ident,
363    serializer: S,
364) -> Result<S::Ok, S::Error> {
365    serializer.serialize_str(&ident.to_string())
366}
367
368pub enum DebugInstantiate {
369    Building,
370    Finalized(Box<DebugInstantiateFinalized>),
371}
372
373impl serde::Serialize for DebugInstantiate {
374    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
375        match self {
376            DebugInstantiate::Building => {
377                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
378            }
379            DebugInstantiate::Finalized(_) => {
380                panic!(
381                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
382                )
383            }
384        }
385    }
386}
387
388#[cfg_attr(
389    not(feature = "build"),
390    expect(
391        dead_code,
392        reason = "sink, source unused without `feature = \"build\"`."
393    )
394)]
395pub struct DebugInstantiateFinalized {
396    sink: syn::Expr,
397    source: syn::Expr,
398    connect_fn: Option<Box<dyn FnOnce()>>,
399}
400
401impl From<DebugInstantiateFinalized> for DebugInstantiate {
402    fn from(f: DebugInstantiateFinalized) -> Self {
403        Self::Finalized(Box::new(f))
404    }
405}
406
407impl Debug for DebugInstantiate {
408    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409        write!(f, "<network instantiate>")
410    }
411}
412
413impl Hash for DebugInstantiate {
414    fn hash<H: Hasher>(&self, _state: &mut H) {
415        // Do nothing
416    }
417}
418
419impl Clone for DebugInstantiate {
420    fn clone(&self) -> Self {
421        match self {
422            DebugInstantiate::Building => DebugInstantiate::Building,
423            DebugInstantiate::Finalized(_) => {
424                panic!("DebugInstantiate::Finalized should not be cloned")
425            }
426        }
427    }
428}
429
430/// Tracks the instantiation state of a `ClusterMembers` source.
431///
432/// During `compile_network`, the first `ClusterMembers` node for a given
433/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
434/// receives the expression returned by `Deploy::cluster_membership_stream`.
435/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
436/// during code-gen they simply reference the tee output of the first node
437/// instead of creating a redundant `source_stream`.
438#[derive(Debug, Hash, Clone, serde::Serialize)]
439pub enum ClusterMembersState {
440    /// Not yet instantiated.
441    Uninit,
442    /// The primary instance: holds the stream expression and will emit
443    /// `source_stream(expr) -> tee()` during code-gen.
444    Stream(DebugExpr),
445    /// A secondary instance that references the tee output of the primary.
446    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
447    /// can derive the deterministic tee ident without extra state.
448    Tee(LocationId, LocationId),
449}
450
451/// A source in a Hydro graph, where data enters the graph.
452#[derive(Debug, Hash, Clone, serde::Serialize)]
453pub enum HydroSource {
454    Stream(DebugExpr),
455    ExternalNetwork(),
456    Iter(DebugExpr),
457    Spin(),
458    ClusterMembers(LocationId, ClusterMembersState),
459    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
460    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
461}
462
463#[cfg(feature = "build")]
464/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
465/// and simulations.
466///
467/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
468/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
469pub trait DfirBuilder {
470    /// Whether the representation of singletons should include intermediate states.
471    fn singleton_intermediates(&self) -> bool;
472
473    /// Gets the DFIR builder for the given location, creating it if necessary.
474    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
475
476    #[expect(clippy::too_many_arguments, reason = "TODO")]
477    fn batch(
478        &mut self,
479        in_ident: syn::Ident,
480        in_location: &LocationId,
481        in_kind: &CollectionKind,
482        out_ident: &syn::Ident,
483        out_location: &LocationId,
484        op_meta: &HydroIrOpMetadata,
485        fold_hooked_idents: &HashSet<String>,
486    );
487    fn yield_from_tick(
488        &mut self,
489        in_ident: syn::Ident,
490        in_location: &LocationId,
491        in_kind: &CollectionKind,
492        out_ident: &syn::Ident,
493        out_location: &LocationId,
494    );
495
496    fn begin_atomic(
497        &mut self,
498        in_ident: syn::Ident,
499        in_location: &LocationId,
500        in_kind: &CollectionKind,
501        out_ident: &syn::Ident,
502        out_location: &LocationId,
503        op_meta: &HydroIrOpMetadata,
504    );
505    fn end_atomic(
506        &mut self,
507        in_ident: syn::Ident,
508        in_location: &LocationId,
509        in_kind: &CollectionKind,
510        out_ident: &syn::Ident,
511    );
512
513    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
514    fn observe_nondet(
515        &mut self,
516        trusted: bool,
517        location: &LocationId,
518        in_ident: syn::Ident,
519        in_kind: &CollectionKind,
520        out_ident: &syn::Ident,
521        out_kind: &CollectionKind,
522        op_meta: &HydroIrOpMetadata,
523    );
524
525    #[expect(clippy::too_many_arguments, reason = "TODO")]
526    fn merge_ordered(
527        &mut self,
528        location: &LocationId,
529        first_ident: syn::Ident,
530        second_ident: syn::Ident,
531        out_ident: &syn::Ident,
532        in_kind: &CollectionKind,
533        op_meta: &HydroIrOpMetadata,
534        operator_tag: Option<&str>,
535    );
536
537    #[expect(clippy::too_many_arguments, reason = "TODO")]
538    fn create_network(
539        &mut self,
540        from: &LocationId,
541        to: &LocationId,
542        input_ident: syn::Ident,
543        out_ident: &syn::Ident,
544        serialize: Option<&DebugExpr>,
545        sink: syn::Expr,
546        source: syn::Expr,
547        deserialize: Option<&DebugExpr>,
548        tag_id: StmtId,
549        networking_info: &crate::networking::NetworkingInfo,
550    );
551
552    fn create_external_source(
553        &mut self,
554        on: &LocationId,
555        source_expr: syn::Expr,
556        out_ident: &syn::Ident,
557        deserialize: Option<&DebugExpr>,
558        tag_id: StmtId,
559    );
560
561    fn create_external_output(
562        &mut self,
563        on: &LocationId,
564        sink_expr: syn::Expr,
565        input_ident: &syn::Ident,
566        serialize: Option<&DebugExpr>,
567        tag_id: StmtId,
568    );
569
570    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
571    /// Returns the new input ident to use for the fold if a hook was emitted.
572    fn emit_fold_hook(
573        &mut self,
574        location: &LocationId,
575        in_ident: &syn::Ident,
576        in_kind: &CollectionKind,
577        op_meta: &HydroIrOpMetadata,
578    ) -> Option<syn::Ident>;
579
580    /// Inserts necessary code to validate a manual assertion that at this point the
581    /// input live collection is consistent. In production, this is a no-op, but in simulation
582    /// this will (not yet implemented) inject assertions that validate consistency.
583    fn assert_is_consistent(
584        &mut self,
585        trusted: bool,
586        location: &LocationId,
587        in_ident: syn::Ident,
588        out_ident: &syn::Ident,
589    );
590
591    /// Observes non-determinism introduced by a mut closure operating on a non-strict
592    /// (unordered / at-least-once) input. In production this is identity; in simulation
593    /// it delegates to `observe_nondet` with the strict output kind.
594    fn observe_for_mut(
595        &mut self,
596        location: &LocationId,
597        in_ident: syn::Ident,
598        in_kind: &CollectionKind,
599        out_ident: &syn::Ident,
600        op_meta: &HydroIrOpMetadata,
601    );
602}
603
604#[cfg(feature = "build")]
605impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
606    fn singleton_intermediates(&self) -> bool {
607        false
608    }
609
610    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
611        self.entry(location.root().key())
612            .expect("location was removed")
613            .or_default()
614    }
615
616    fn batch(
617        &mut self,
618        in_ident: syn::Ident,
619        in_location: &LocationId,
620        in_kind: &CollectionKind,
621        out_ident: &syn::Ident,
622        _out_location: &LocationId,
623        _op_meta: &HydroIrOpMetadata,
624        _fold_hooked_idents: &HashSet<String>,
625    ) {
626        let builder = self.get_dfir_mut(in_location.root());
627        if in_kind.is_bounded()
628            && matches!(
629                in_kind,
630                CollectionKind::Singleton { .. }
631                    | CollectionKind::Optional { .. }
632                    | CollectionKind::KeyedSingleton { .. }
633            )
634        {
635            assert!(in_location.is_top_level());
636            builder.add_dfir(
637                parse_quote! {
638                    #out_ident = #in_ident -> persist::<'static>();
639                },
640                None,
641                None,
642            );
643        } else {
644            builder.add_dfir(
645                parse_quote! {
646                    #out_ident = #in_ident;
647                },
648                None,
649                None,
650            );
651        }
652    }
653
654    fn yield_from_tick(
655        &mut self,
656        in_ident: syn::Ident,
657        in_location: &LocationId,
658        _in_kind: &CollectionKind,
659        out_ident: &syn::Ident,
660        _out_location: &LocationId,
661    ) {
662        let builder = self.get_dfir_mut(in_location.root());
663        builder.add_dfir(
664            parse_quote! {
665                #out_ident = #in_ident;
666            },
667            None,
668            None,
669        );
670    }
671
672    fn begin_atomic(
673        &mut self,
674        in_ident: syn::Ident,
675        in_location: &LocationId,
676        _in_kind: &CollectionKind,
677        out_ident: &syn::Ident,
678        _out_location: &LocationId,
679        _op_meta: &HydroIrOpMetadata,
680    ) {
681        let builder = self.get_dfir_mut(in_location.root());
682        builder.add_dfir(
683            parse_quote! {
684                #out_ident = #in_ident;
685            },
686            None,
687            None,
688        );
689    }
690
691    fn end_atomic(
692        &mut self,
693        in_ident: syn::Ident,
694        in_location: &LocationId,
695        _in_kind: &CollectionKind,
696        out_ident: &syn::Ident,
697    ) {
698        let builder = self.get_dfir_mut(in_location.root());
699        builder.add_dfir(
700            parse_quote! {
701                #out_ident = #in_ident;
702            },
703            None,
704            None,
705        );
706    }
707
708    fn observe_nondet(
709        &mut self,
710        _trusted: bool,
711        location: &LocationId,
712        in_ident: syn::Ident,
713        _in_kind: &CollectionKind,
714        out_ident: &syn::Ident,
715        _out_kind: &CollectionKind,
716        _op_meta: &HydroIrOpMetadata,
717    ) {
718        let builder = self.get_dfir_mut(location);
719        builder.add_dfir(
720            parse_quote! {
721                #out_ident = #in_ident;
722            },
723            None,
724            None,
725        );
726    }
727
728    fn merge_ordered(
729        &mut self,
730        location: &LocationId,
731        first_ident: syn::Ident,
732        second_ident: syn::Ident,
733        out_ident: &syn::Ident,
734        _in_kind: &CollectionKind,
735        _op_meta: &HydroIrOpMetadata,
736        operator_tag: Option<&str>,
737    ) {
738        let builder = self.get_dfir_mut(location);
739        builder.add_dfir(
740            parse_quote! {
741                #out_ident = union();
742                #first_ident -> [0]#out_ident;
743                #second_ident -> [1]#out_ident;
744            },
745            None,
746            operator_tag,
747        );
748    }
749
750    fn create_network(
751        &mut self,
752        from: &LocationId,
753        to: &LocationId,
754        input_ident: syn::Ident,
755        out_ident: &syn::Ident,
756        serialize: Option<&DebugExpr>,
757        sink: syn::Expr,
758        source: syn::Expr,
759        deserialize: Option<&DebugExpr>,
760        tag_id: StmtId,
761        _networking_info: &crate::networking::NetworkingInfo,
762    ) {
763        let sender_builder = self.get_dfir_mut(from);
764        if let Some(serialize_pipeline) = serialize {
765            sender_builder.add_dfir(
766                parse_quote! {
767                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
768                },
769                None,
770                // operator tag separates send and receive, which otherwise have the same next_stmt_id
771                Some(&format!("send{}", tag_id)),
772            );
773        } else {
774            sender_builder.add_dfir(
775                parse_quote! {
776                    #input_ident -> dest_sink(#sink);
777                },
778                None,
779                Some(&format!("send{}", tag_id)),
780            );
781        }
782
783        let receiver_builder = self.get_dfir_mut(to);
784        if let Some(deserialize_pipeline) = deserialize {
785            receiver_builder.add_dfir(
786                parse_quote! {
787                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
788                },
789                None,
790                Some(&format!("recv{}", tag_id)),
791            );
792        } else {
793            receiver_builder.add_dfir(
794                parse_quote! {
795                    #out_ident = source_stream(#source);
796                },
797                None,
798                Some(&format!("recv{}", tag_id)),
799            );
800        }
801    }
802
803    fn create_external_source(
804        &mut self,
805        on: &LocationId,
806        source_expr: syn::Expr,
807        out_ident: &syn::Ident,
808        deserialize: Option<&DebugExpr>,
809        tag_id: StmtId,
810    ) {
811        let receiver_builder = self.get_dfir_mut(on);
812        if let Some(deserialize_pipeline) = deserialize {
813            receiver_builder.add_dfir(
814                parse_quote! {
815                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
816                },
817                None,
818                Some(&format!("recv{}", tag_id)),
819            );
820        } else {
821            receiver_builder.add_dfir(
822                parse_quote! {
823                    #out_ident = source_stream(#source_expr);
824                },
825                None,
826                Some(&format!("recv{}", tag_id)),
827            );
828        }
829    }
830
831    fn create_external_output(
832        &mut self,
833        on: &LocationId,
834        sink_expr: syn::Expr,
835        input_ident: &syn::Ident,
836        serialize: Option<&DebugExpr>,
837        tag_id: StmtId,
838    ) {
839        let sender_builder = self.get_dfir_mut(on);
840        if let Some(serialize_fn) = serialize {
841            sender_builder.add_dfir(
842                parse_quote! {
843                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
844                },
845                None,
846                // operator tag separates send and receive, which otherwise have the same next_stmt_id
847                Some(&format!("send{}", tag_id)),
848            );
849        } else {
850            sender_builder.add_dfir(
851                parse_quote! {
852                    #input_ident -> dest_sink(#sink_expr);
853                },
854                None,
855                Some(&format!("send{}", tag_id)),
856            );
857        }
858    }
859
860    fn emit_fold_hook(
861        &mut self,
862        _location: &LocationId,
863        _in_ident: &syn::Ident,
864        _in_kind: &CollectionKind,
865        _op_meta: &HydroIrOpMetadata,
866    ) -> Option<syn::Ident> {
867        None
868    }
869
870    fn assert_is_consistent(
871        &mut self,
872        _trusted: bool,
873        location: &LocationId,
874        in_ident: syn::Ident,
875        out_ident: &syn::Ident,
876    ) {
877        let builder = self.get_dfir_mut(location);
878        builder.add_dfir(
879            parse_quote! {
880                #out_ident = #in_ident;
881            },
882            None,
883            None,
884        );
885    }
886
887    fn observe_for_mut(
888        &mut self,
889        location: &LocationId,
890        in_ident: syn::Ident,
891        _in_kind: &CollectionKind,
892        out_ident: &syn::Ident,
893        _op_meta: &HydroIrOpMetadata,
894    ) {
895        let builder = self.get_dfir_mut(location);
896        builder.add_dfir(
897            parse_quote! {
898                #out_ident = #in_ident;
899            },
900            None,
901            None,
902        );
903    }
904}
905
906#[cfg(feature = "build")]
907pub enum BuildersOrCallback<'a, L, N>
908where
909    L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
910    N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
911{
912    Builders(&'a mut dyn DfirBuilder),
913    Callback(L, N),
914}
915
916/// An root in a Hydro graph, which is an pipeline that doesn't emit
917/// any downstream values. Traversals over the dataflow graph and
918/// generating DFIR IR start from roots.
919#[derive(Debug, Hash, serde::Serialize)]
920pub enum HydroRoot {
921    ForEach {
922        f: ClosureExpr,
923        input: Box<HydroNode>,
924        op_metadata: HydroIrOpMetadata,
925    },
926    SendExternal {
927        to_external_key: LocationKey,
928        to_port_id: ExternalPortId,
929        to_many: bool,
930        unpaired: bool,
931        serialize_fn: Option<DebugExpr>,
932        instantiate_fn: DebugInstantiate,
933        input: Box<HydroNode>,
934        op_metadata: HydroIrOpMetadata,
935    },
936    DestSink {
937        sink: DebugExpr,
938        input: Box<HydroNode>,
939        op_metadata: HydroIrOpMetadata,
940    },
941    CycleSink {
942        cycle_id: CycleId,
943        input: Box<HydroNode>,
944        op_metadata: HydroIrOpMetadata,
945    },
946    EmbeddedOutput {
947        #[serde(serialize_with = "serialize_ident")]
948        ident: syn::Ident,
949        input: Box<HydroNode>,
950        op_metadata: HydroIrOpMetadata,
951    },
952    Null {
953        input: Box<HydroNode>,
954        op_metadata: HydroIrOpMetadata,
955    },
956}
957
958impl HydroRoot {
959    #[cfg(feature = "build")]
960    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
961    pub fn compile_network<'a, D>(
962        &mut self,
963        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
964        seen_tees: &mut SeenSharedNodes,
965        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
966        processes: &SparseSecondaryMap<LocationKey, D::Process>,
967        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
968        externals: &SparseSecondaryMap<LocationKey, D::External>,
969        env: &mut D::InstantiateEnv,
970    ) where
971        D: Deploy<'a>,
972    {
973        let refcell_extra_stmts = RefCell::new(extra_stmts);
974        let refcell_env = RefCell::new(env);
975        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
976        self.transform_bottom_up(
977            &mut |l| {
978                if let HydroRoot::SendExternal {
979                    #[cfg(feature = "tokio")]
980                    input,
981                    #[cfg(feature = "tokio")]
982                    to_external_key,
983                    #[cfg(feature = "tokio")]
984                    to_port_id,
985                    #[cfg(feature = "tokio")]
986                    to_many,
987                    #[cfg(feature = "tokio")]
988                    unpaired,
989                    #[cfg(feature = "tokio")]
990                    instantiate_fn,
991                    ..
992                } = l
993                {
994                    #[cfg(feature = "tokio")]
995                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
996                        DebugInstantiate::Building => {
997                            let to_node = externals
998                                .get(*to_external_key)
999                                .unwrap_or_else(|| {
1000                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
1001                                })
1002                                .clone();
1003
1004                            match input.metadata().location_id.root() {
1005                                &LocationId::Process(process_key) => {
1006                                    if *to_many {
1007                                        (
1008                                            (
1009                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1010                                                parse_quote!(DUMMY),
1011                                            ),
1012                                            Box::new(|| {}) as Box<dyn FnOnce()>,
1013                                        )
1014                                    } else {
1015                                        let from_node = processes
1016                                            .get(process_key)
1017                                            .unwrap_or_else(|| {
1018                                                panic!("A process used in the graph was not instantiated: {}", process_key)
1019                                            })
1020                                            .clone();
1021
1022                                        let sink_port = from_node.next_port();
1023                                        let source_port = to_node.next_port();
1024
1025                                        if *unpaired {
1026                                            use stageleft::quote_type;
1027                                            use tokio_util::codec::LengthDelimitedCodec;
1028
1029                                            to_node.register(*to_port_id, source_port.clone());
1030
1031                                            let _ = D::e2o_source(
1032                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1033                                                &to_node, &source_port,
1034                                                &from_node, &sink_port,
1035                                                &quote_type::<LengthDelimitedCodec>(),
1036                                                format!("{}_{}", *to_external_key, *to_port_id)
1037                                            );
1038                                        }
1039
1040                                        (
1041                                            (
1042                                                D::o2e_sink(
1043                                                    &from_node,
1044                                                    &sink_port,
1045                                                    &to_node,
1046                                                    &source_port,
1047                                                    format!("{}_{}", *to_external_key, *to_port_id)
1048                                                ),
1049                                                parse_quote!(DUMMY),
1050                                            ),
1051                                            if *unpaired {
1052                                                D::e2o_connect(
1053                                                    &to_node,
1054                                                    &source_port,
1055                                                    &from_node,
1056                                                    &sink_port,
1057                                                    *to_many,
1058                                                    NetworkHint::Auto,
1059                                                )
1060                                            } else {
1061                                                Box::new(|| {}) as Box<dyn FnOnce()>
1062                                            },
1063                                        )
1064                                    }
1065                                }
1066                                LocationId::Cluster(cluster_key) => {
1067                                    let from_node = clusters
1068                                        .get(*cluster_key)
1069                                        .unwrap_or_else(|| {
1070                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1071                                        })
1072                                        .clone();
1073
1074                                    let sink_port = from_node.next_port();
1075                                    let source_port = to_node.next_port();
1076
1077                                    if *unpaired {
1078                                        to_node.register(*to_port_id, source_port.clone());
1079                                    }
1080
1081                                    (
1082                                        (
1083                                            D::m2e_sink(
1084                                                &from_node,
1085                                                &sink_port,
1086                                                &to_node,
1087                                                &source_port,
1088                                                format!("{}_{}", *to_external_key, *to_port_id)
1089                                            ),
1090                                            parse_quote!(DUMMY),
1091                                        ),
1092                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1093                                    )
1094                                }
1095                                _ => panic!()
1096                            }
1097                        },
1098
1099                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1100                    };
1101
1102                    #[cfg(not(feature = "tokio"))]
1103                    {
1104                        panic!("Cannot instantiate external inputs without tokio");
1105                    };
1106
1107                    #[cfg(feature = "tokio")]
1108                    {
1109                        *instantiate_fn = DebugInstantiateFinalized {
1110                            sink: sink_expr,
1111                            source: source_expr,
1112                            connect_fn: Some(connect_fn),
1113                        }
1114                        .into();
1115                    };
1116                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1117                    let element_type = match &input.metadata().collection_kind {
1118                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1119                        _ => panic!("Embedded output must have Stream collection kind"),
1120                    };
1121                    let location_key = match input.metadata().location_id.root() {
1122                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1123                        _ => panic!("Embedded output must be on a process or cluster"),
1124                    };
1125                    D::register_embedded_output(
1126                        &mut refcell_env.borrow_mut(),
1127                        location_key,
1128                        ident,
1129                        &element_type,
1130                    );
1131                }
1132            },
1133            &mut |n| {
1134                if let HydroNode::Network {
1135                    name,
1136                    networking_info,
1137                    input,
1138                    instantiate_fn,
1139                    metadata,
1140                    ..
1141                } = n
1142                {
1143                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1144                        DebugInstantiate::Building => instantiate_network::<D>(
1145                            &mut refcell_env.borrow_mut(),
1146                            input.metadata().location_id.root(),
1147                            metadata.location_id.root(),
1148                            processes,
1149                            clusters,
1150                            name.as_deref(),
1151                            networking_info,
1152                        ),
1153
1154                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1155                    };
1156
1157                    *instantiate_fn = DebugInstantiateFinalized {
1158                        sink: sink_expr,
1159                        source: source_expr,
1160                        connect_fn: Some(connect_fn),
1161                    }
1162                    .into();
1163                } else if let HydroNode::ExternalInput {
1164                    from_external_key,
1165                    from_port_id,
1166                    from_many,
1167                    codec_type,
1168                    port_hint,
1169                    instantiate_fn,
1170                    metadata,
1171                    ..
1172                } = n
1173                {
1174                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1175                        DebugInstantiate::Building => {
1176                            let from_node = externals
1177                                .get(*from_external_key)
1178                                .unwrap_or_else(|| {
1179                                    panic!(
1180                                        "A external used in the graph was not instantiated: {}",
1181                                        from_external_key,
1182                                    )
1183                                })
1184                                .clone();
1185
1186                            match metadata.location_id.root() {
1187                                &LocationId::Process(process_key) => {
1188                                    let to_node = processes
1189                                        .get(process_key)
1190                                        .unwrap_or_else(|| {
1191                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1192                                        })
1193                                        .clone();
1194
1195                                    let sink_port = from_node.next_port();
1196                                    let source_port = to_node.next_port();
1197
1198                                    from_node.register(*from_port_id, sink_port.clone());
1199
1200                                    (
1201                                        (
1202                                            parse_quote!(DUMMY),
1203                                            if *from_many {
1204                                                D::e2o_many_source(
1205                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1206                                                    &to_node, &source_port,
1207                                                    codec_type.0.as_ref(),
1208                                                    format!("{}_{}", *from_external_key, *from_port_id)
1209                                                )
1210                                            } else {
1211                                                D::e2o_source(
1212                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1213                                                    &from_node, &sink_port,
1214                                                    &to_node, &source_port,
1215                                                    codec_type.0.as_ref(),
1216                                                    format!("{}_{}", *from_external_key, *from_port_id)
1217                                                )
1218                                            },
1219                                        ),
1220                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1221                                    )
1222                                }
1223                                LocationId::Cluster(cluster_key) => {
1224                                    let to_node = clusters
1225                                        .get(*cluster_key)
1226                                        .unwrap_or_else(|| {
1227                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1228                                        })
1229                                        .clone();
1230
1231                                    let sink_port = from_node.next_port();
1232                                    let source_port = to_node.next_port();
1233
1234                                    from_node.register(*from_port_id, sink_port.clone());
1235
1236                                    (
1237                                        (
1238                                            parse_quote!(DUMMY),
1239                                            D::e2m_source(
1240                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1241                                                &from_node, &sink_port,
1242                                                &to_node, &source_port,
1243                                                codec_type.0.as_ref(),
1244                                                format!("{}_{}", *from_external_key, *from_port_id)
1245                                            ),
1246                                        ),
1247                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1248                                    )
1249                                }
1250                                _ => panic!()
1251                            }
1252                        },
1253
1254                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1255                    };
1256
1257                    *instantiate_fn = DebugInstantiateFinalized {
1258                        sink: sink_expr,
1259                        source: source_expr,
1260                        connect_fn: Some(connect_fn),
1261                    }
1262                    .into();
1263                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1264                    let element_type = match &metadata.collection_kind {
1265                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1266                        _ => panic!("Embedded source must have Stream collection kind"),
1267                    };
1268                    let location_key = match metadata.location_id.root() {
1269                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1270                        _ => panic!("Embedded source must be on a process or cluster"),
1271                    };
1272                    D::register_embedded_stream_input(
1273                        &mut refcell_env.borrow_mut(),
1274                        location_key,
1275                        ident,
1276                        &element_type,
1277                    );
1278                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1279                    let element_type = match &metadata.collection_kind {
1280                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1281                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1282                    };
1283                    let location_key = match metadata.location_id.root() {
1284                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1285                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1286                    };
1287                    D::register_embedded_singleton_input(
1288                        &mut refcell_env.borrow_mut(),
1289                        location_key,
1290                        ident,
1291                        &element_type,
1292                    );
1293                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1294                    match state {
1295                        ClusterMembersState::Uninit => {
1296                            let at_location = metadata.location_id.root().clone();
1297                            let key = (at_location.clone(), location_id.key());
1298                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1299                                // First occurrence: call cluster_membership_stream and mark as Stream.
1300                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1301                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1302                                    &(),
1303                                );
1304                                *state = ClusterMembersState::Stream(expr.into());
1305                            } else {
1306                                // Already instantiated for this (at, target) pair: just tee.
1307                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1308                            }
1309                        }
1310                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1311                            panic!("cluster members already finalized");
1312                        }
1313                    }
1314                }
1315            },
1316            seen_tees,
1317            false,
1318        );
1319    }
1320
1321    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1322        self.transform_bottom_up(
1323            &mut |l| {
1324                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1325                    match instantiate_fn {
1326                        DebugInstantiate::Building => panic!("network not built"),
1327
1328                        DebugInstantiate::Finalized(finalized) => {
1329                            (finalized.connect_fn.take().unwrap())();
1330                        }
1331                    }
1332                }
1333            },
1334            &mut |n| {
1335                if let HydroNode::Network { instantiate_fn, .. }
1336                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1337                {
1338                    match instantiate_fn {
1339                        DebugInstantiate::Building => panic!("network not built"),
1340
1341                        DebugInstantiate::Finalized(finalized) => {
1342                            (finalized.connect_fn.take().unwrap())();
1343                        }
1344                    }
1345                }
1346            },
1347            seen_tees,
1348            false,
1349        );
1350    }
1351
1352    pub fn transform_bottom_up(
1353        &mut self,
1354        transform_root: &mut impl FnMut(&mut HydroRoot),
1355        transform_node: &mut impl FnMut(&mut HydroNode),
1356        seen_tees: &mut SeenSharedNodes,
1357        check_well_formed: bool,
1358    ) {
1359        self.transform_children(
1360            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1361            seen_tees,
1362        );
1363
1364        transform_root(self);
1365    }
1366
1367    pub fn transform_children(
1368        &mut self,
1369        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1370        seen_tees: &mut SeenSharedNodes,
1371    ) {
1372        match self {
1373            HydroRoot::ForEach { f, input, .. } => {
1374                f.transform_children(&mut transform, seen_tees);
1375                transform(input, seen_tees);
1376            }
1377            HydroRoot::SendExternal { input, .. }
1378            | HydroRoot::DestSink { input, .. }
1379            | HydroRoot::CycleSink { input, .. }
1380            | HydroRoot::EmbeddedOutput { input, .. }
1381            | HydroRoot::Null { input, .. } => {
1382                transform(input, seen_tees);
1383            }
1384        }
1385    }
1386
1387    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1388        match self {
1389            HydroRoot::ForEach {
1390                f,
1391                input,
1392                op_metadata,
1393            } => HydroRoot::ForEach {
1394                f: f.deep_clone(seen_tees),
1395                input: Box::new(input.deep_clone(seen_tees)),
1396                op_metadata: op_metadata.clone(),
1397            },
1398            HydroRoot::SendExternal {
1399                to_external_key,
1400                to_port_id,
1401                to_many,
1402                unpaired,
1403                serialize_fn,
1404                instantiate_fn,
1405                input,
1406                op_metadata,
1407            } => HydroRoot::SendExternal {
1408                to_external_key: *to_external_key,
1409                to_port_id: *to_port_id,
1410                to_many: *to_many,
1411                unpaired: *unpaired,
1412                serialize_fn: serialize_fn.clone(),
1413                instantiate_fn: instantiate_fn.clone(),
1414                input: Box::new(input.deep_clone(seen_tees)),
1415                op_metadata: op_metadata.clone(),
1416            },
1417            HydroRoot::DestSink {
1418                sink,
1419                input,
1420                op_metadata,
1421            } => HydroRoot::DestSink {
1422                sink: sink.clone(),
1423                input: Box::new(input.deep_clone(seen_tees)),
1424                op_metadata: op_metadata.clone(),
1425            },
1426            HydroRoot::CycleSink {
1427                cycle_id,
1428                input,
1429                op_metadata,
1430            } => HydroRoot::CycleSink {
1431                cycle_id: *cycle_id,
1432                input: Box::new(input.deep_clone(seen_tees)),
1433                op_metadata: op_metadata.clone(),
1434            },
1435            HydroRoot::EmbeddedOutput {
1436                ident,
1437                input,
1438                op_metadata,
1439            } => HydroRoot::EmbeddedOutput {
1440                ident: ident.clone(),
1441                input: Box::new(input.deep_clone(seen_tees)),
1442                op_metadata: op_metadata.clone(),
1443            },
1444            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1445                input: Box::new(input.deep_clone(seen_tees)),
1446                op_metadata: op_metadata.clone(),
1447            },
1448        }
1449    }
1450
1451    #[cfg(feature = "build")]
1452    pub fn emit(
1453        &mut self,
1454        graph_builders: &mut dyn DfirBuilder,
1455        seen_tees: &mut SeenSharedNodes,
1456        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1457        next_stmt_id: &mut crate::Counter<StmtId>,
1458        fold_hooked_idents: &mut HashSet<String>,
1459    ) {
1460        self.emit_core(
1461            &mut BuildersOrCallback::<
1462                fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1463                fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1464            >::Builders(graph_builders),
1465            seen_tees,
1466            built_tees,
1467            next_stmt_id,
1468            fold_hooked_idents,
1469        );
1470    }
1471
1472    #[cfg(feature = "build")]
1473    pub fn emit_core(
1474        &mut self,
1475        builders_or_callback: &mut BuildersOrCallback<
1476            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1477            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1478        >,
1479        seen_tees: &mut SeenSharedNodes,
1480        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1481        next_stmt_id: &mut crate::Counter<StmtId>,
1482        fold_hooked_idents: &mut HashSet<String>,
1483    ) {
1484        match self {
1485            HydroRoot::ForEach { f, input, .. } => {
1486                let input_ident = input.emit_core(
1487                    builders_or_callback,
1488                    seen_tees,
1489                    built_tees,
1490                    next_stmt_id,
1491                    fold_hooked_idents,
1492                );
1493
1494                let stmt_id = next_stmt_id.get_and_increment();
1495
1496                match builders_or_callback {
1497                    BuildersOrCallback::Builders(graph_builders) => {
1498                        let mut ident_stack: Vec<syn::Ident> = Vec::new();
1499
1500                        // Look up each captured ref's ident from built_tees
1501                        for (ref_node, _is_mut) in f.singleton_refs.iter() {
1502                            let HydroNode::Reference { inner, .. } = ref_node else {
1503                                panic!("singleton_refs should only contain HydroNode::Reference");
1504                            };
1505                            let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1506                            let idents = built_tees.get(&ptr).expect(
1507                                "ForEach singleton ref not found in built_tees — ref node was not emitted",
1508                            );
1509                            ident_stack.push(idents[0].clone());
1510                        }
1511
1512                        let f_tokens = f.emit_tokens(&mut ident_stack);
1513
1514                        graph_builders
1515                            .get_dfir_mut(&input.metadata().location_id)
1516                            .add_dfir(
1517                                parse_quote! {
1518                                    #input_ident -> for_each(#f_tokens);
1519                                },
1520                                None,
1521                                Some(&stmt_id.to_string()),
1522                            );
1523                    }
1524                    BuildersOrCallback::Callback(leaf_callback, _) => {
1525                        leaf_callback(self, next_stmt_id);
1526                    }
1527                }
1528            }
1529
1530            HydroRoot::SendExternal {
1531                serialize_fn,
1532                instantiate_fn,
1533                input,
1534                ..
1535            } => {
1536                let input_ident = input.emit_core(
1537                    builders_or_callback,
1538                    seen_tees,
1539                    built_tees,
1540                    next_stmt_id,
1541                    fold_hooked_idents,
1542                );
1543
1544                let stmt_id = next_stmt_id.get_and_increment();
1545
1546                match builders_or_callback {
1547                    BuildersOrCallback::Builders(graph_builders) => {
1548                        let (sink_expr, _) = match instantiate_fn {
1549                            DebugInstantiate::Building => (
1550                                syn::parse_quote!(DUMMY_SINK),
1551                                syn::parse_quote!(DUMMY_SOURCE),
1552                            ),
1553
1554                            DebugInstantiate::Finalized(finalized) => {
1555                                (finalized.sink.clone(), finalized.source.clone())
1556                            }
1557                        };
1558
1559                        graph_builders.create_external_output(
1560                            &input.metadata().location_id,
1561                            sink_expr,
1562                            &input_ident,
1563                            serialize_fn.as_ref(),
1564                            stmt_id,
1565                        );
1566                    }
1567                    BuildersOrCallback::Callback(leaf_callback, _) => {
1568                        leaf_callback(self, next_stmt_id);
1569                    }
1570                }
1571            }
1572
1573            HydroRoot::DestSink { sink, input, .. } => {
1574                let input_ident = input.emit_core(
1575                    builders_or_callback,
1576                    seen_tees,
1577                    built_tees,
1578                    next_stmt_id,
1579                    fold_hooked_idents,
1580                );
1581
1582                let stmt_id = next_stmt_id.get_and_increment();
1583
1584                match builders_or_callback {
1585                    BuildersOrCallback::Builders(graph_builders) => {
1586                        graph_builders
1587                            .get_dfir_mut(&input.metadata().location_id)
1588                            .add_dfir(
1589                                parse_quote! {
1590                                    #input_ident -> dest_sink(#sink);
1591                                },
1592                                None,
1593                                Some(&stmt_id.to_string()),
1594                            );
1595                    }
1596                    BuildersOrCallback::Callback(leaf_callback, _) => {
1597                        leaf_callback(self, next_stmt_id);
1598                    }
1599                }
1600            }
1601
1602            HydroRoot::CycleSink {
1603                cycle_id, input, ..
1604            } => {
1605                let input_ident = input.emit_core(
1606                    builders_or_callback,
1607                    seen_tees,
1608                    built_tees,
1609                    next_stmt_id,
1610                    fold_hooked_idents,
1611                );
1612
1613                match builders_or_callback {
1614                    BuildersOrCallback::Builders(graph_builders) => {
1615                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1616                            CollectionKind::KeyedSingleton {
1617                                key_type,
1618                                value_type,
1619                                ..
1620                            }
1621                            | CollectionKind::KeyedStream {
1622                                key_type,
1623                                value_type,
1624                                ..
1625                            } => {
1626                                parse_quote!((#key_type, #value_type))
1627                            }
1628                            CollectionKind::Stream { element_type, .. }
1629                            | CollectionKind::Singleton { element_type, .. }
1630                            | CollectionKind::Optional { element_type, .. } => {
1631                                parse_quote!(#element_type)
1632                            }
1633                        };
1634
1635                        let cycle_id_ident = cycle_id.as_ident();
1636                        graph_builders
1637                            .get_dfir_mut(&input.metadata().location_id)
1638                            .add_dfir(
1639                                parse_quote! {
1640                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1641                                },
1642                                None,
1643                                None,
1644                            );
1645                    }
1646                    // No ID, no callback
1647                    BuildersOrCallback::Callback(_, _) => {}
1648                }
1649            }
1650
1651            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1652                let input_ident = input.emit_core(
1653                    builders_or_callback,
1654                    seen_tees,
1655                    built_tees,
1656                    next_stmt_id,
1657                    fold_hooked_idents,
1658                );
1659
1660                let stmt_id = next_stmt_id.get_and_increment();
1661
1662                match builders_or_callback {
1663                    BuildersOrCallback::Builders(graph_builders) => {
1664                        graph_builders
1665                            .get_dfir_mut(&input.metadata().location_id)
1666                            .add_dfir(
1667                                parse_quote! {
1668                                    #input_ident -> for_each(&mut #ident);
1669                                },
1670                                None,
1671                                Some(&stmt_id.to_string()),
1672                            );
1673                    }
1674                    BuildersOrCallback::Callback(leaf_callback, _) => {
1675                        leaf_callback(self, next_stmt_id);
1676                    }
1677                }
1678            }
1679
1680            HydroRoot::Null { input, .. } => {
1681                let input_ident = input.emit_core(
1682                    builders_or_callback,
1683                    seen_tees,
1684                    built_tees,
1685                    next_stmt_id,
1686                    fold_hooked_idents,
1687                );
1688
1689                let stmt_id = next_stmt_id.get_and_increment();
1690
1691                match builders_or_callback {
1692                    BuildersOrCallback::Builders(graph_builders) => {
1693                        graph_builders
1694                            .get_dfir_mut(&input.metadata().location_id)
1695                            .add_dfir(
1696                                parse_quote! {
1697                                    #input_ident -> for_each(|_| {});
1698                                },
1699                                None,
1700                                Some(&stmt_id.to_string()),
1701                            );
1702                    }
1703                    BuildersOrCallback::Callback(leaf_callback, _) => {
1704                        leaf_callback(self, next_stmt_id);
1705                    }
1706                }
1707            }
1708        }
1709    }
1710
1711    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1712        match self {
1713            HydroRoot::ForEach { op_metadata, .. }
1714            | HydroRoot::SendExternal { op_metadata, .. }
1715            | HydroRoot::DestSink { op_metadata, .. }
1716            | HydroRoot::CycleSink { op_metadata, .. }
1717            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1718            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1719        }
1720    }
1721
1722    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1723        match self {
1724            HydroRoot::ForEach { op_metadata, .. }
1725            | HydroRoot::SendExternal { op_metadata, .. }
1726            | HydroRoot::DestSink { op_metadata, .. }
1727            | HydroRoot::CycleSink { op_metadata, .. }
1728            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1729            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1730        }
1731    }
1732
1733    pub fn input(&self) -> &HydroNode {
1734        match self {
1735            HydroRoot::ForEach { input, .. }
1736            | HydroRoot::SendExternal { input, .. }
1737            | HydroRoot::DestSink { input, .. }
1738            | HydroRoot::CycleSink { input, .. }
1739            | HydroRoot::EmbeddedOutput { input, .. }
1740            | HydroRoot::Null { input, .. } => input,
1741        }
1742    }
1743
1744    pub fn input_metadata(&self) -> &HydroIrMetadata {
1745        self.input().metadata()
1746    }
1747
1748    pub fn print_root(&self) -> String {
1749        match self {
1750            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1751            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1752            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1753            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1754            HydroRoot::EmbeddedOutput { ident, .. } => {
1755                format!("EmbeddedOutput({})", ident)
1756            }
1757            HydroRoot::Null { .. } => "Null".to_owned(),
1758        }
1759    }
1760
1761    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1762        match self {
1763            HydroRoot::ForEach { f, .. } => {
1764                transform(&mut f.expr);
1765            }
1766            HydroRoot::DestSink { sink, .. } => {
1767                transform(sink);
1768            }
1769            HydroRoot::SendExternal { .. }
1770            | HydroRoot::CycleSink { .. }
1771            | HydroRoot::EmbeddedOutput { .. }
1772            | HydroRoot::Null { .. } => {}
1773        }
1774    }
1775}
1776
1777#[cfg(feature = "build")]
1778fn tick_of(loc: &LocationId) -> Option<ClockId> {
1779    match loc {
1780        LocationId::Tick(id, _) => Some(*id),
1781        LocationId::Atomic(inner) => tick_of(inner),
1782        _ => None,
1783    }
1784}
1785
1786#[cfg(feature = "build")]
1787fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1788    match loc {
1789        LocationId::Tick(id, inner) => {
1790            *id = uf_find(uf, *id);
1791            remap_location(inner, uf);
1792        }
1793        LocationId::Atomic(inner) => {
1794            remap_location(inner, uf);
1795        }
1796        LocationId::Process(_) | LocationId::Cluster(_) => {}
1797    }
1798}
1799
1800#[cfg(feature = "build")]
1801fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1802    let p = *parent.get(&x).unwrap_or(&x);
1803    if p == x {
1804        return x;
1805    }
1806    let root = uf_find(parent, p);
1807    parent.insert(x, root);
1808    root
1809}
1810
1811#[cfg(feature = "build")]
1812fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1813    let ra = uf_find(parent, a);
1814    let rb = uf_find(parent, b);
1815    if ra != rb {
1816        parent.insert(ra, rb);
1817    }
1818}
1819
1820/// Traverse the IR to build a union-find that unifies tick IDs connected
1821/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1822/// rewrite all `LocationId`s to use the representative tick ID.
1823#[cfg(feature = "build")]
1824pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1825    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1826
1827    // Pass 1: collect unifications.
1828    transform_bottom_up(
1829        ir,
1830        &mut |_| {},
1831        &mut |node: &mut HydroNode| match node {
1832            HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1833                if let (Some(a), Some(b)) = (
1834                    tick_of(&inner.metadata().location_id),
1835                    tick_of(&metadata.location_id),
1836                ) {
1837                    uf_union(&mut uf, a, b);
1838                }
1839            }
1840            HydroNode::Chain {
1841                first,
1842                second,
1843                metadata,
1844            }
1845            | HydroNode::ChainFirst {
1846                first,
1847                second,
1848                metadata,
1849            }
1850            | HydroNode::MergeOrdered {
1851                first,
1852                second,
1853                metadata,
1854            } => {
1855                if let (Some(a), Some(b)) = (
1856                    tick_of(&first.metadata().location_id),
1857                    tick_of(&metadata.location_id),
1858                ) {
1859                    uf_union(&mut uf, a, b);
1860                }
1861                if let (Some(a), Some(b)) = (
1862                    tick_of(&second.metadata().location_id),
1863                    tick_of(&metadata.location_id),
1864                ) {
1865                    uf_union(&mut uf, a, b);
1866                }
1867            }
1868            _ => {}
1869        },
1870        false,
1871    );
1872
1873    // Pass 2: rewrite all LocationIds.
1874    transform_bottom_up(
1875        ir,
1876        &mut |_| {},
1877        &mut |node: &mut HydroNode| {
1878            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1879        },
1880        false,
1881    );
1882}
1883
1884#[cfg(feature = "build")]
1885pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1886    let mut builders = SecondaryMap::new();
1887    let mut seen_tees = HashMap::new();
1888    let mut built_tees = HashMap::new();
1889    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1890    let mut fold_hooked_idents = HashSet::new();
1891    for leaf in ir {
1892        leaf.emit(
1893            &mut builders,
1894            &mut seen_tees,
1895            &mut built_tees,
1896            &mut next_stmt_id,
1897            &mut fold_hooked_idents,
1898        );
1899    }
1900    builders
1901}
1902
1903#[cfg(feature = "build")]
1904pub fn traverse_dfir(
1905    ir: &mut [HydroRoot],
1906    transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1907    transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1908) {
1909    let mut seen_tees = HashMap::new();
1910    let mut built_tees = HashMap::new();
1911    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1912    let mut fold_hooked_idents = HashSet::new();
1913    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1914    ir.iter_mut().for_each(|leaf| {
1915        leaf.emit_core(
1916            &mut callback,
1917            &mut seen_tees,
1918            &mut built_tees,
1919            &mut next_stmt_id,
1920            &mut fold_hooked_idents,
1921        );
1922    });
1923}
1924
1925pub fn transform_bottom_up(
1926    ir: &mut [HydroRoot],
1927    transform_root: &mut impl FnMut(&mut HydroRoot),
1928    transform_node: &mut impl FnMut(&mut HydroNode),
1929    check_well_formed: bool,
1930) {
1931    let mut seen_tees = HashMap::new();
1932    ir.iter_mut().for_each(|leaf| {
1933        leaf.transform_bottom_up(
1934            transform_root,
1935            transform_node,
1936            &mut seen_tees,
1937            check_well_formed,
1938        );
1939    });
1940}
1941
1942pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1943    let mut seen_tees = HashMap::new();
1944    ir.iter()
1945        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1946        .collect()
1947}
1948
1949type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1950thread_local! {
1951    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1952    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1953    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1954    /// on subsequent encounters, preventing infinite loops.
1955    static SERIALIZED_SHARED: PrintedTees
1956        = const { RefCell::new(None) };
1957}
1958
1959pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1960    PRINTED_TEES.with(|printed_tees| {
1961        let mut printed_tees_mut = printed_tees.borrow_mut();
1962        *printed_tees_mut = Some((0, HashMap::new()));
1963        drop(printed_tees_mut);
1964
1965        let ret = f();
1966
1967        let mut printed_tees_mut = printed_tees.borrow_mut();
1968        *printed_tees_mut = None;
1969
1970        ret
1971    })
1972}
1973
1974/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1975/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1976/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1977/// back-reference.  The tracking state is restored when `f` returns or panics.
1978pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1979    let _guard = SerializedSharedGuard::enter();
1980    f()
1981}
1982
1983/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1984/// making `serialize_dedup_shared` re-entrant and panic-safe.
1985struct SerializedSharedGuard {
1986    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1987}
1988
1989impl SerializedSharedGuard {
1990    fn enter() -> Self {
1991        let previous = SERIALIZED_SHARED.with(|cell| {
1992            let mut guard = cell.borrow_mut();
1993            guard.replace((0, HashMap::new()))
1994        });
1995        Self { previous }
1996    }
1997}
1998
1999impl Drop for SerializedSharedGuard {
2000    fn drop(&mut self) {
2001        SERIALIZED_SHARED.with(|cell| {
2002            *cell.borrow_mut() = self.previous.take();
2003        });
2004    }
2005}
2006
2007pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2008
2009impl serde::Serialize for SharedNode {
2010    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
2011    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
2012    /// same subtree every time and, if the graph ever contains a cycle, loop
2013    /// forever.
2014    ///
2015    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
2016    /// integer id.  The first time we see a pointer we assign it the next id and
2017    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
2018    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
2019    /// recursion.  Requires an active `serialize_dedup_shared` scope.
2020    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2021        SERIALIZED_SHARED.with(|cell| {
2022            let mut guard = cell.borrow_mut();
2023            // (next_id, pointer → assigned_id)
2024            let state = guard.as_mut().ok_or_else(|| {
2025                serde::ser::Error::custom(
2026                    "SharedNode serialization requires an active serialize_dedup_shared scope",
2027                )
2028            })?;
2029            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2030
2031            if let Some(&id) = state.1.get(&ptr) {
2032                drop(guard);
2033                use serde::ser::SerializeMap;
2034                let mut map = serializer.serialize_map(Some(1))?;
2035                map.serialize_entry("$shared_ref", &id)?;
2036                map.end()
2037            } else {
2038                let id = state.0;
2039                state.0 += 1;
2040                state.1.insert(ptr, id);
2041                drop(guard);
2042
2043                use serde::ser::SerializeMap;
2044                let mut map = serializer.serialize_map(Some(2))?;
2045                map.serialize_entry("$shared", &id)?;
2046                map.serialize_entry("node", &*self.0.borrow())?;
2047                map.end()
2048            }
2049        })
2050    }
2051}
2052
2053impl SharedNode {
2054    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2055        Rc::as_ptr(&self.0)
2056    }
2057}
2058
2059impl Debug for SharedNode {
2060    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2061        PRINTED_TEES.with(|printed_tees| {
2062            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2063            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2064
2065            if let Some(printed_tees_mut) = printed_tees_mut {
2066                if let Some(existing) = printed_tees_mut
2067                    .1
2068                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2069                {
2070                    write!(f, "<shared {}>", existing)
2071                } else {
2072                    let next_id = printed_tees_mut.0;
2073                    printed_tees_mut.0 += 1;
2074                    printed_tees_mut
2075                        .1
2076                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2077                    drop(printed_tees_mut_borrow);
2078                    write!(f, "<shared {}>: ", next_id)?;
2079                    Debug::fmt(&self.0.borrow(), f)
2080                }
2081            } else {
2082                drop(printed_tees_mut_borrow);
2083                write!(f, "<shared>: ")?;
2084                Debug::fmt(&self.0.borrow(), f)
2085            }
2086        })
2087    }
2088}
2089
2090impl Hash for SharedNode {
2091    fn hash<H: Hasher>(&self, state: &mut H) {
2092        self.0.borrow_mut().hash(state);
2093    }
2094}
2095
2096/// A counter for tracking singleton access groups on a `HydroNode::Reference`.
2097///
2098/// Each mutable access increments the counter (before and after) to isolate itself in its own group;
2099/// immutable accesses share the current group.
2100#[derive(Debug)]
2101pub enum AccessCounter {
2102    Counting(Cell<u32>),
2103    Frozen(u32),
2104}
2105
2106impl AccessCounter {
2107    pub fn new() -> Self {
2108        Self::Counting(Cell::new(0))
2109    }
2110
2111    /// Assign the next access group for this reference.
2112    /// Mutable accesses get an isolated group (counter increments before and after).
2113    /// Immutable accesses share the current group.
2114    pub fn next_group(&self, is_mut: bool) -> Self {
2115        let AccessCounter::Counting(count) = self else {
2116            panic!("Cannot count on `AccessCounter::Frozen`");
2117        };
2118        let c = if is_mut {
2119            let c = count.get() + 1;
2120            count.set(c + 1);
2121            c
2122        } else {
2123            count.get()
2124        };
2125        Self::Frozen(c)
2126    }
2127
2128    /// Creates a frozen counter to prevent further counting.
2129    pub fn freeze(&self) -> Self {
2130        Self::Frozen(match self {
2131            Self::Counting(count) => count.get(),
2132            Self::Frozen(count) => *count,
2133        })
2134    }
2135
2136    pub fn frozen_group(&self) -> u32 {
2137        let Self::Frozen(count) = self else {
2138            panic!("`AccessCounter` not frozen");
2139        };
2140        *count
2141    }
2142}
2143
2144impl Default for AccessCounter {
2145    fn default() -> Self {
2146        Self::new()
2147    }
2148}
2149
2150impl Hash for AccessCounter {
2151    fn hash<H: Hasher>(&self, _state: &mut H) {
2152        // Access counter does not participate in hashing — it is runtime bookkeeping.
2153    }
2154}
2155
2156impl serde::Serialize for AccessCounter {
2157    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2158        let count = match self {
2159            AccessCounter::Counting(count) => count.get(),
2160            AccessCounter::Frozen(count) => *count,
2161        };
2162        count.serialize(serializer)
2163    }
2164}
2165
2166#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2167pub enum BoundKind {
2168    Unbounded,
2169    Bounded,
2170}
2171
2172#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2173pub enum StreamOrder {
2174    NoOrder,
2175    TotalOrder,
2176}
2177
2178#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2179pub enum StreamRetry {
2180    AtLeastOnce,
2181    ExactlyOnce,
2182}
2183
2184#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2185pub enum KeyedSingletonBoundKind {
2186    Unbounded,
2187    MonotonicKeys,
2188    MonotonicValue,
2189    BoundedValue,
2190    Bounded,
2191}
2192
2193#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2194pub enum SingletonBoundKind {
2195    Unbounded,
2196    Monotonic,
2197    Bounded,
2198}
2199
2200#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2201pub enum CollectionKind {
2202    Stream {
2203        bound: BoundKind,
2204        order: StreamOrder,
2205        retry: StreamRetry,
2206        element_type: DebugType,
2207    },
2208    Singleton {
2209        bound: SingletonBoundKind,
2210        element_type: DebugType,
2211    },
2212    Optional {
2213        bound: BoundKind,
2214        element_type: DebugType,
2215    },
2216    KeyedStream {
2217        bound: BoundKind,
2218        value_order: StreamOrder,
2219        value_retry: StreamRetry,
2220        key_type: DebugType,
2221        value_type: DebugType,
2222    },
2223    KeyedSingleton {
2224        bound: KeyedSingletonBoundKind,
2225        key_type: DebugType,
2226        value_type: DebugType,
2227    },
2228}
2229
2230impl CollectionKind {
2231    pub fn is_bounded(&self) -> bool {
2232        matches!(
2233            self,
2234            CollectionKind::Stream {
2235                bound: BoundKind::Bounded,
2236                ..
2237            } | CollectionKind::Singleton {
2238                bound: SingletonBoundKind::Bounded,
2239                ..
2240            } | CollectionKind::Optional {
2241                bound: BoundKind::Bounded,
2242                ..
2243            } | CollectionKind::KeyedStream {
2244                bound: BoundKind::Bounded,
2245                ..
2246            } | CollectionKind::KeyedSingleton {
2247                bound: KeyedSingletonBoundKind::Bounded,
2248                ..
2249            }
2250        )
2251    }
2252
2253    /// Returns whether this collection kind is already "strict" (TotalOrder + ExactlyOnce),
2254    /// meaning no non-determinism needs to be observed for mut closures.
2255    pub fn is_strict(&self) -> bool {
2256        match self {
2257            CollectionKind::Stream { order, retry, .. } => {
2258                *order == StreamOrder::TotalOrder && *retry == StreamRetry::ExactlyOnce
2259            }
2260            CollectionKind::KeyedStream {
2261                value_order,
2262                value_retry,
2263                ..
2264            } => {
2265                *value_order == StreamOrder::TotalOrder && *value_retry == StreamRetry::ExactlyOnce
2266            }
2267            // Singletons/Optionals/KeyedSingletons do not have observable
2268            // non-determinism other than snapshots / batching
2269            CollectionKind::Singleton { .. }
2270            | CollectionKind::Optional { .. }
2271            | CollectionKind::KeyedSingleton { .. } => true,
2272        }
2273    }
2274
2275    /// Creates a "strict" version of this kind with TotalOrder and ExactlyOnce.
2276    pub fn strict_kind(&self) -> CollectionKind {
2277        match self {
2278            CollectionKind::Stream {
2279                bound,
2280                element_type,
2281                ..
2282            } => CollectionKind::Stream {
2283                bound: bound.clone(),
2284                order: StreamOrder::TotalOrder,
2285                retry: StreamRetry::ExactlyOnce,
2286                element_type: element_type.clone(),
2287            },
2288            CollectionKind::KeyedStream {
2289                bound,
2290                key_type,
2291                value_type,
2292                ..
2293            } => CollectionKind::KeyedStream {
2294                bound: bound.clone(),
2295                value_order: StreamOrder::TotalOrder,
2296                value_retry: StreamRetry::ExactlyOnce,
2297                key_type: key_type.clone(),
2298                value_type: value_type.clone(),
2299            },
2300            other => other.clone(),
2301        }
2302    }
2303}
2304
2305#[derive(Clone, serde::Serialize)]
2306pub struct HydroIrMetadata {
2307    pub location_id: LocationId,
2308    pub collection_kind: CollectionKind,
2309    pub consistency: Option<ClusterConsistency>,
2310    pub cardinality: Option<usize>,
2311    pub tag: Option<String>,
2312    pub op: HydroIrOpMetadata,
2313}
2314
2315// HydroIrMetadata shouldn't be used to hash or compare
2316impl Hash for HydroIrMetadata {
2317    fn hash<H: Hasher>(&self, _: &mut H) {}
2318}
2319
2320impl PartialEq for HydroIrMetadata {
2321    fn eq(&self, _: &Self) -> bool {
2322        true
2323    }
2324}
2325
2326impl Eq for HydroIrMetadata {}
2327
2328impl Debug for HydroIrMetadata {
2329    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2330        f.debug_struct("HydroIrMetadata")
2331            .field("location_id", &self.location_id)
2332            .field("collection_kind", &self.collection_kind)
2333            .finish()
2334    }
2335}
2336
2337/// Metadata that is specific to the operator itself, rather than its outputs.
2338/// This is available on _both_ inner nodes and roots.
2339#[derive(Clone, serde::Serialize)]
2340pub struct HydroIrOpMetadata {
2341    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2342    pub backtrace: Backtrace,
2343    pub cpu_usage: Option<f64>,
2344    pub network_recv_cpu_usage: Option<f64>,
2345    pub id: Option<usize>,
2346}
2347
2348impl HydroIrOpMetadata {
2349    #[expect(
2350        clippy::new_without_default,
2351        reason = "explicit calls to new ensure correct backtrace bounds"
2352    )]
2353    pub fn new() -> HydroIrOpMetadata {
2354        Self::new_with_skip(1)
2355    }
2356
2357    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2358        HydroIrOpMetadata {
2359            backtrace: Backtrace::get_backtrace(2 + skip_count),
2360            cpu_usage: None,
2361            network_recv_cpu_usage: None,
2362            id: None,
2363        }
2364    }
2365}
2366
2367impl Debug for HydroIrOpMetadata {
2368    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2369        f.debug_struct("HydroIrOpMetadata").finish()
2370    }
2371}
2372
2373impl Hash for HydroIrOpMetadata {
2374    fn hash<H: Hasher>(&self, _: &mut H) {}
2375}
2376
2377/// An intermediate node in a Hydro graph, which consumes data
2378/// from upstream nodes and emits data to downstream nodes.
2379#[derive(Debug, Hash, serde::Serialize)]
2380pub enum HydroNode {
2381    Placeholder,
2382
2383    /// Manually "casts" between two different collection kinds.
2384    ///
2385    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2386    /// correctness checks. In particular, the user must ensure that every possible
2387    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2388    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2389    /// collection. This ensures that the simulator does not miss any possible outputs.
2390    Cast {
2391        inner: Box<HydroNode>,
2392        metadata: HydroIrMetadata,
2393    },
2394
2395    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2396    /// interpretation of the input stream.
2397    ///
2398    /// In production, this simply passes through the input, but in simulation, this operator
2399    /// explicitly selects a randomized interpretation.
2400    ObserveNonDet {
2401        inner: Box<HydroNode>,
2402        trusted: bool, // if true, we do not need to simulate non-determinism
2403        metadata: HydroIrMetadata,
2404    },
2405
2406    Source {
2407        source: HydroSource,
2408        metadata: HydroIrMetadata,
2409    },
2410
2411    SingletonSource {
2412        value: DebugExpr,
2413        first_tick_only: bool,
2414        metadata: HydroIrMetadata,
2415    },
2416
2417    CycleSource {
2418        cycle_id: CycleId,
2419        metadata: HydroIrMetadata,
2420    },
2421
2422    Tee {
2423        inner: SharedNode,
2424        metadata: HydroIrMetadata,
2425    },
2426
2427    /// A reference materialization point. Wraps a SharedNode so that:
2428    /// - The pipe output delivers data to one consumer
2429    /// - `#var` references can borrow the value from the slot
2430    ///
2431    /// In DFIR codegen, emits `ident = inner_ident -> singleton()` or `-> optional()` or
2432    /// `-> handoff()` depending on `kind`.
2433    ///
2434    /// Uses the same `built_tees` dedup pattern as `Tee`.
2435    Reference {
2436        inner: SharedNode,
2437        kind: crate::handoff_ref::HandoffRefKind,
2438        access_counter: AccessCounter,
2439        metadata: HydroIrMetadata,
2440    },
2441
2442    Partition {
2443        inner: SharedNode,
2444        f: ClosureExpr,
2445        is_true: bool,
2446        metadata: HydroIrMetadata,
2447    },
2448
2449    BeginAtomic {
2450        inner: Box<HydroNode>,
2451        metadata: HydroIrMetadata,
2452    },
2453
2454    EndAtomic {
2455        inner: Box<HydroNode>,
2456        metadata: HydroIrMetadata,
2457    },
2458
2459    Batch {
2460        inner: Box<HydroNode>,
2461        metadata: HydroIrMetadata,
2462    },
2463
2464    YieldConcat {
2465        inner: Box<HydroNode>,
2466        metadata: HydroIrMetadata,
2467    },
2468
2469    Chain {
2470        first: Box<HydroNode>,
2471        second: Box<HydroNode>,
2472        metadata: HydroIrMetadata,
2473    },
2474
2475    MergeOrdered {
2476        first: Box<HydroNode>,
2477        second: Box<HydroNode>,
2478        metadata: HydroIrMetadata,
2479    },
2480
2481    ChainFirst {
2482        first: Box<HydroNode>,
2483        second: Box<HydroNode>,
2484        metadata: HydroIrMetadata,
2485    },
2486
2487    CrossProduct {
2488        left: Box<HydroNode>,
2489        right: Box<HydroNode>,
2490        metadata: HydroIrMetadata,
2491    },
2492
2493    CrossSingleton {
2494        left: Box<HydroNode>,
2495        right: Box<HydroNode>,
2496        metadata: HydroIrMetadata,
2497    },
2498
2499    Join {
2500        left: Box<HydroNode>,
2501        right: Box<HydroNode>,
2502        metadata: HydroIrMetadata,
2503    },
2504
2505    /// Asymmetric join where the right (build) side is bounded.
2506    /// The build side is accumulated (stratum-delayed) into a hash table,
2507    /// then the left (probe) side streams through preserving its ordering.
2508    JoinHalf {
2509        left: Box<HydroNode>,
2510        right: Box<HydroNode>,
2511        metadata: HydroIrMetadata,
2512    },
2513
2514    Difference {
2515        pos: Box<HydroNode>,
2516        neg: Box<HydroNode>,
2517        metadata: HydroIrMetadata,
2518    },
2519
2520    AntiJoin {
2521        pos: Box<HydroNode>,
2522        neg: Box<HydroNode>,
2523        metadata: HydroIrMetadata,
2524    },
2525
2526    ResolveFutures {
2527        input: Box<HydroNode>,
2528        metadata: HydroIrMetadata,
2529    },
2530    ResolveFuturesBlocking {
2531        input: Box<HydroNode>,
2532        metadata: HydroIrMetadata,
2533    },
2534    ResolveFuturesOrdered {
2535        input: Box<HydroNode>,
2536        metadata: HydroIrMetadata,
2537    },
2538
2539    Map {
2540        f: ClosureExpr,
2541        input: Box<HydroNode>,
2542        metadata: HydroIrMetadata,
2543    },
2544    FlatMap {
2545        f: ClosureExpr,
2546        input: Box<HydroNode>,
2547        metadata: HydroIrMetadata,
2548    },
2549    FlatMapStreamBlocking {
2550        f: ClosureExpr,
2551        input: Box<HydroNode>,
2552        metadata: HydroIrMetadata,
2553    },
2554    Filter {
2555        f: ClosureExpr,
2556        input: Box<HydroNode>,
2557        metadata: HydroIrMetadata,
2558    },
2559    FilterMap {
2560        f: ClosureExpr,
2561        input: Box<HydroNode>,
2562        metadata: HydroIrMetadata,
2563    },
2564
2565    DeferTick {
2566        input: Box<HydroNode>,
2567        metadata: HydroIrMetadata,
2568    },
2569    Enumerate {
2570        input: Box<HydroNode>,
2571        metadata: HydroIrMetadata,
2572    },
2573    Inspect {
2574        f: ClosureExpr,
2575        input: Box<HydroNode>,
2576        metadata: HydroIrMetadata,
2577    },
2578
2579    Unique {
2580        input: Box<HydroNode>,
2581        metadata: HydroIrMetadata,
2582    },
2583
2584    Sort {
2585        input: Box<HydroNode>,
2586        metadata: HydroIrMetadata,
2587    },
2588    Fold {
2589        init: ClosureExpr,
2590        acc: ClosureExpr,
2591        input: Box<HydroNode>,
2592        metadata: HydroIrMetadata,
2593    },
2594
2595    Scan {
2596        init: ClosureExpr,
2597        acc: ClosureExpr,
2598        input: Box<HydroNode>,
2599        metadata: HydroIrMetadata,
2600    },
2601    ScanAsyncBlocking {
2602        init: ClosureExpr,
2603        acc: ClosureExpr,
2604        input: Box<HydroNode>,
2605        metadata: HydroIrMetadata,
2606    },
2607    FoldKeyed {
2608        init: ClosureExpr,
2609        acc: ClosureExpr,
2610        input: Box<HydroNode>,
2611        metadata: HydroIrMetadata,
2612    },
2613
2614    Reduce {
2615        f: ClosureExpr,
2616        input: Box<HydroNode>,
2617        metadata: HydroIrMetadata,
2618    },
2619    ReduceKeyed {
2620        f: ClosureExpr,
2621        input: Box<HydroNode>,
2622        metadata: HydroIrMetadata,
2623    },
2624    ReduceKeyedWatermark {
2625        f: ClosureExpr,
2626        input: Box<HydroNode>,
2627        watermark: Box<HydroNode>,
2628        metadata: HydroIrMetadata,
2629    },
2630
2631    Network {
2632        name: Option<String>,
2633        networking_info: crate::networking::NetworkingInfo,
2634        serialize_fn: Option<DebugExpr>,
2635        instantiate_fn: DebugInstantiate,
2636        deserialize_fn: Option<DebugExpr>,
2637        input: Box<HydroNode>,
2638        metadata: HydroIrMetadata,
2639    },
2640
2641    ExternalInput {
2642        from_external_key: LocationKey,
2643        from_port_id: ExternalPortId,
2644        from_many: bool,
2645        codec_type: DebugType,
2646        #[serde(skip)]
2647        port_hint: NetworkHint,
2648        instantiate_fn: DebugInstantiate,
2649        deserialize_fn: Option<DebugExpr>,
2650        metadata: HydroIrMetadata,
2651    },
2652
2653    Counter {
2654        tag: String,
2655        duration: DebugExpr,
2656        prefix: String,
2657        input: Box<HydroNode>,
2658        metadata: HydroIrMetadata,
2659    },
2660
2661    AssertIsConsistent {
2662        inner: Box<HydroNode>,
2663        trusted: bool,
2664        metadata: HydroIrMetadata,
2665    },
2666
2667    UnboundSingleton {
2668        inner: Box<HydroNode>,
2669        metadata: HydroIrMetadata,
2670    },
2671}
2672
2673pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2674pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2675
2676/// If `f` has a mut singleton ref and `in_kind` is non-strict, emits an
2677/// `observe_for_mut` node and returns the new ident. Otherwise returns
2678/// `in_ident` unchanged. Always consumes a stmt_id when applicable.
2679#[cfg(feature = "build")]
2680fn maybe_observe_for_mut(
2681    f: &ClosureExpr,
2682    in_ident: syn::Ident,
2683    in_location: &LocationId,
2684    in_kind: &CollectionKind,
2685    op_meta: &HydroIrOpMetadata,
2686    builders_or_callback: &mut BuildersOrCallback<
2687        impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
2688        impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
2689    >,
2690    next_stmt_id: &mut crate::Counter<StmtId>,
2691) -> syn::Ident {
2692    if f.has_mut_ref() && !in_kind.is_strict() {
2693        let observe_stmt_id = next_stmt_id.get_and_increment();
2694        let observe_ident =
2695            syn::Ident::new(&format!("stream_{}", observe_stmt_id), Span::call_site());
2696        if let BuildersOrCallback::Builders(graph_builders) = builders_or_callback {
2697            graph_builders.observe_for_mut(in_location, in_ident, in_kind, &observe_ident, op_meta);
2698        }
2699        observe_ident
2700    } else {
2701        in_ident
2702    }
2703}
2704
2705impl HydroNode {
2706    pub fn transform_bottom_up(
2707        &mut self,
2708        transform: &mut impl FnMut(&mut HydroNode),
2709        seen_tees: &mut SeenSharedNodes,
2710        check_well_formed: bool,
2711    ) {
2712        self.transform_children(
2713            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2714            seen_tees,
2715        );
2716
2717        transform(self);
2718
2719        let self_location = self.metadata().location_id.root();
2720
2721        if check_well_formed {
2722            match &*self {
2723                HydroNode::Network { .. } => {}
2724                _ => {
2725                    self.input_metadata().iter().for_each(|i| {
2726                        if i.location_id.root() != self_location {
2727                            panic!(
2728                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2729                                i,
2730                                i.location_id.root(),
2731                                self,
2732                                self_location
2733                            )
2734                        }
2735                    });
2736                }
2737            }
2738        }
2739    }
2740
2741    #[inline(always)]
2742    pub fn transform_children(
2743        &mut self,
2744        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2745        seen_tees: &mut SeenSharedNodes,
2746    ) {
2747        match self {
2748            HydroNode::Placeholder => {
2749                panic!();
2750            }
2751
2752            HydroNode::Source { .. }
2753            | HydroNode::SingletonSource { .. }
2754            | HydroNode::CycleSource { .. }
2755            | HydroNode::ExternalInput { .. } => {}
2756
2757            HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2758                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2759                    *inner = SharedNode(transformed.clone());
2760                } else {
2761                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2762                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2763                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2764                    transform(&mut orig, seen_tees);
2765                    *transformed_cell.borrow_mut() = orig;
2766                    *inner = SharedNode(transformed_cell);
2767                }
2768            }
2769
2770            HydroNode::Partition { inner, f, .. } => {
2771                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2772                    *inner = SharedNode(transformed.clone());
2773                } else {
2774                    f.transform_children(&mut transform, seen_tees);
2775                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2776                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2777                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2778                    transform(&mut orig, seen_tees);
2779                    *transformed_cell.borrow_mut() = orig;
2780                    *inner = SharedNode(transformed_cell);
2781                }
2782            }
2783
2784            HydroNode::Cast { inner, .. }
2785            | HydroNode::ObserveNonDet { inner, .. }
2786            | HydroNode::BeginAtomic { inner, .. }
2787            | HydroNode::EndAtomic { inner, .. }
2788            | HydroNode::Batch { inner, .. }
2789            | HydroNode::YieldConcat { inner, .. }
2790            | HydroNode::UnboundSingleton { inner, .. }
2791            | HydroNode::AssertIsConsistent { inner, .. } => {
2792                transform(inner.as_mut(), seen_tees);
2793            }
2794
2795            HydroNode::Chain { first, second, .. } => {
2796                transform(first.as_mut(), seen_tees);
2797                transform(second.as_mut(), seen_tees);
2798            }
2799
2800            HydroNode::MergeOrdered { first, second, .. } => {
2801                transform(first.as_mut(), seen_tees);
2802                transform(second.as_mut(), seen_tees);
2803            }
2804
2805            HydroNode::ChainFirst { first, second, .. } => {
2806                transform(first.as_mut(), seen_tees);
2807                transform(second.as_mut(), seen_tees);
2808            }
2809
2810            HydroNode::CrossSingleton { left, right, .. }
2811            | HydroNode::CrossProduct { left, right, .. }
2812            | HydroNode::Join { left, right, .. }
2813            | HydroNode::JoinHalf { left, right, .. } => {
2814                transform(left.as_mut(), seen_tees);
2815                transform(right.as_mut(), seen_tees);
2816            }
2817
2818            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2819                transform(pos.as_mut(), seen_tees);
2820                transform(neg.as_mut(), seen_tees);
2821            }
2822
2823            HydroNode::Map { f, input, .. } => {
2824                f.transform_children(&mut transform, seen_tees);
2825                transform(input.as_mut(), seen_tees);
2826            }
2827            HydroNode::FlatMap { f, input, .. }
2828            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2829            | HydroNode::Filter { f, input, .. }
2830            | HydroNode::FilterMap { f, input, .. }
2831            | HydroNode::Inspect { f, input, .. }
2832            | HydroNode::Reduce { f, input, .. }
2833            | HydroNode::ReduceKeyed { f, input, .. } => {
2834                f.transform_children(&mut transform, seen_tees);
2835                transform(input.as_mut(), seen_tees);
2836            }
2837            HydroNode::ReduceKeyedWatermark {
2838                f,
2839                input,
2840                watermark,
2841                ..
2842            } => {
2843                f.transform_children(&mut transform, seen_tees);
2844                transform(input.as_mut(), seen_tees);
2845                transform(watermark.as_mut(), seen_tees);
2846            }
2847            HydroNode::Fold {
2848                init, acc, input, ..
2849            }
2850            | HydroNode::Scan {
2851                init, acc, input, ..
2852            }
2853            | HydroNode::ScanAsyncBlocking {
2854                init, acc, input, ..
2855            }
2856            | HydroNode::FoldKeyed {
2857                init, acc, input, ..
2858            } => {
2859                init.transform_children(&mut transform, seen_tees);
2860                acc.transform_children(&mut transform, seen_tees);
2861                transform(input.as_mut(), seen_tees);
2862            }
2863            HydroNode::ResolveFutures { input, .. }
2864            | HydroNode::ResolveFuturesBlocking { input, .. }
2865            | HydroNode::ResolveFuturesOrdered { input, .. }
2866            | HydroNode::Sort { input, .. }
2867            | HydroNode::DeferTick { input, .. }
2868            | HydroNode::Enumerate { input, .. }
2869            | HydroNode::Unique { input, .. }
2870            | HydroNode::Network { input, .. }
2871            | HydroNode::Counter { input, .. } => {
2872                transform(input.as_mut(), seen_tees);
2873            }
2874        }
2875    }
2876
2877    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2878        match self {
2879            HydroNode::Placeholder => HydroNode::Placeholder,
2880            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2881                inner: Box::new(inner.deep_clone(seen_tees)),
2882                metadata: metadata.clone(),
2883            },
2884            HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2885                inner: Box::new(inner.deep_clone(seen_tees)),
2886                metadata: metadata.clone(),
2887            },
2888            HydroNode::ObserveNonDet {
2889                inner,
2890                trusted,
2891                metadata,
2892            } => HydroNode::ObserveNonDet {
2893                inner: Box::new(inner.deep_clone(seen_tees)),
2894                trusted: *trusted,
2895                metadata: metadata.clone(),
2896            },
2897            HydroNode::AssertIsConsistent {
2898                inner,
2899                trusted,
2900                metadata,
2901            } => HydroNode::AssertIsConsistent {
2902                inner: Box::new(inner.deep_clone(seen_tees)),
2903                trusted: *trusted,
2904                metadata: metadata.clone(),
2905            },
2906            HydroNode::Source { source, metadata } => HydroNode::Source {
2907                source: source.clone(),
2908                metadata: metadata.clone(),
2909            },
2910            HydroNode::SingletonSource {
2911                value,
2912                first_tick_only,
2913                metadata,
2914            } => HydroNode::SingletonSource {
2915                value: value.clone(),
2916                first_tick_only: *first_tick_only,
2917                metadata: metadata.clone(),
2918            },
2919            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2920                cycle_id: *cycle_id,
2921                metadata: metadata.clone(),
2922            },
2923            HydroNode::Tee { inner, metadata }
2924            | HydroNode::Reference {
2925                inner, metadata, ..
2926            } => {
2927                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2928                    SharedNode(transformed.clone())
2929                } else {
2930                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2931                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2932                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2933                    *new_rc.borrow_mut() = cloned;
2934                    SharedNode(new_rc)
2935                };
2936                if let HydroNode::Reference {
2937                    kind,
2938                    access_counter,
2939                    ..
2940                } = self
2941                {
2942                    HydroNode::Reference {
2943                        inner: cloned_inner,
2944                        kind: *kind,
2945                        access_counter: access_counter.freeze(),
2946                        metadata: metadata.clone(),
2947                    }
2948                } else {
2949                    HydroNode::Tee {
2950                        inner: cloned_inner,
2951                        metadata: metadata.clone(),
2952                    }
2953                }
2954            }
2955            HydroNode::Partition {
2956                inner,
2957                f,
2958                is_true,
2959                metadata,
2960            } => {
2961                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2962                    HydroNode::Partition {
2963                        inner: SharedNode(transformed.clone()),
2964                        f: f.deep_clone(seen_tees),
2965                        is_true: *is_true,
2966                        metadata: metadata.clone(),
2967                    }
2968                } else {
2969                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2970                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2971                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2972                    *new_rc.borrow_mut() = cloned;
2973                    HydroNode::Partition {
2974                        inner: SharedNode(new_rc),
2975                        f: f.deep_clone(seen_tees),
2976                        is_true: *is_true,
2977                        metadata: metadata.clone(),
2978                    }
2979                }
2980            }
2981            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2982                inner: Box::new(inner.deep_clone(seen_tees)),
2983                metadata: metadata.clone(),
2984            },
2985            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2986                inner: Box::new(inner.deep_clone(seen_tees)),
2987                metadata: metadata.clone(),
2988            },
2989            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2990                inner: Box::new(inner.deep_clone(seen_tees)),
2991                metadata: metadata.clone(),
2992            },
2993            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2994                inner: Box::new(inner.deep_clone(seen_tees)),
2995                metadata: metadata.clone(),
2996            },
2997            HydroNode::Chain {
2998                first,
2999                second,
3000                metadata,
3001            } => HydroNode::Chain {
3002                first: Box::new(first.deep_clone(seen_tees)),
3003                second: Box::new(second.deep_clone(seen_tees)),
3004                metadata: metadata.clone(),
3005            },
3006            HydroNode::MergeOrdered {
3007                first,
3008                second,
3009                metadata,
3010            } => HydroNode::MergeOrdered {
3011                first: Box::new(first.deep_clone(seen_tees)),
3012                second: Box::new(second.deep_clone(seen_tees)),
3013                metadata: metadata.clone(),
3014            },
3015            HydroNode::ChainFirst {
3016                first,
3017                second,
3018                metadata,
3019            } => HydroNode::ChainFirst {
3020                first: Box::new(first.deep_clone(seen_tees)),
3021                second: Box::new(second.deep_clone(seen_tees)),
3022                metadata: metadata.clone(),
3023            },
3024            HydroNode::CrossProduct {
3025                left,
3026                right,
3027                metadata,
3028            } => HydroNode::CrossProduct {
3029                left: Box::new(left.deep_clone(seen_tees)),
3030                right: Box::new(right.deep_clone(seen_tees)),
3031                metadata: metadata.clone(),
3032            },
3033            HydroNode::CrossSingleton {
3034                left,
3035                right,
3036                metadata,
3037            } => HydroNode::CrossSingleton {
3038                left: Box::new(left.deep_clone(seen_tees)),
3039                right: Box::new(right.deep_clone(seen_tees)),
3040                metadata: metadata.clone(),
3041            },
3042            HydroNode::Join {
3043                left,
3044                right,
3045                metadata,
3046            } => HydroNode::Join {
3047                left: Box::new(left.deep_clone(seen_tees)),
3048                right: Box::new(right.deep_clone(seen_tees)),
3049                metadata: metadata.clone(),
3050            },
3051            HydroNode::JoinHalf {
3052                left,
3053                right,
3054                metadata,
3055            } => HydroNode::JoinHalf {
3056                left: Box::new(left.deep_clone(seen_tees)),
3057                right: Box::new(right.deep_clone(seen_tees)),
3058                metadata: metadata.clone(),
3059            },
3060            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
3061                pos: Box::new(pos.deep_clone(seen_tees)),
3062                neg: Box::new(neg.deep_clone(seen_tees)),
3063                metadata: metadata.clone(),
3064            },
3065            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
3066                pos: Box::new(pos.deep_clone(seen_tees)),
3067                neg: Box::new(neg.deep_clone(seen_tees)),
3068                metadata: metadata.clone(),
3069            },
3070            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
3071                input: Box::new(input.deep_clone(seen_tees)),
3072                metadata: metadata.clone(),
3073            },
3074            HydroNode::ResolveFuturesBlocking { input, metadata } => {
3075                HydroNode::ResolveFuturesBlocking {
3076                    input: Box::new(input.deep_clone(seen_tees)),
3077                    metadata: metadata.clone(),
3078                }
3079            }
3080            HydroNode::ResolveFuturesOrdered { input, metadata } => {
3081                HydroNode::ResolveFuturesOrdered {
3082                    input: Box::new(input.deep_clone(seen_tees)),
3083                    metadata: metadata.clone(),
3084                }
3085            }
3086            HydroNode::Map { f, input, metadata } => HydroNode::Map {
3087                f: f.deep_clone(seen_tees),
3088                input: Box::new(input.deep_clone(seen_tees)),
3089                metadata: metadata.clone(),
3090            },
3091            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
3092                f: f.deep_clone(seen_tees),
3093                input: Box::new(input.deep_clone(seen_tees)),
3094                metadata: metadata.clone(),
3095            },
3096            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
3097                HydroNode::FlatMapStreamBlocking {
3098                    f: f.deep_clone(seen_tees),
3099                    input: Box::new(input.deep_clone(seen_tees)),
3100                    metadata: metadata.clone(),
3101                }
3102            }
3103            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
3104                f: f.deep_clone(seen_tees),
3105                input: Box::new(input.deep_clone(seen_tees)),
3106                metadata: metadata.clone(),
3107            },
3108            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
3109                f: f.deep_clone(seen_tees),
3110                input: Box::new(input.deep_clone(seen_tees)),
3111                metadata: metadata.clone(),
3112            },
3113            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
3114                input: Box::new(input.deep_clone(seen_tees)),
3115                metadata: metadata.clone(),
3116            },
3117            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
3118                input: Box::new(input.deep_clone(seen_tees)),
3119                metadata: metadata.clone(),
3120            },
3121            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3122                f: f.deep_clone(seen_tees),
3123                input: Box::new(input.deep_clone(seen_tees)),
3124                metadata: metadata.clone(),
3125            },
3126            HydroNode::Unique { input, metadata } => HydroNode::Unique {
3127                input: Box::new(input.deep_clone(seen_tees)),
3128                metadata: metadata.clone(),
3129            },
3130            HydroNode::Sort { input, metadata } => HydroNode::Sort {
3131                input: Box::new(input.deep_clone(seen_tees)),
3132                metadata: metadata.clone(),
3133            },
3134            HydroNode::Fold {
3135                init,
3136                acc,
3137                input,
3138                metadata,
3139            } => HydroNode::Fold {
3140                init: init.deep_clone(seen_tees),
3141                acc: acc.deep_clone(seen_tees),
3142                input: Box::new(input.deep_clone(seen_tees)),
3143                metadata: metadata.clone(),
3144            },
3145            HydroNode::Scan {
3146                init,
3147                acc,
3148                input,
3149                metadata,
3150            } => HydroNode::Scan {
3151                init: init.deep_clone(seen_tees),
3152                acc: acc.deep_clone(seen_tees),
3153                input: Box::new(input.deep_clone(seen_tees)),
3154                metadata: metadata.clone(),
3155            },
3156            HydroNode::ScanAsyncBlocking {
3157                init,
3158                acc,
3159                input,
3160                metadata,
3161            } => HydroNode::ScanAsyncBlocking {
3162                init: init.deep_clone(seen_tees),
3163                acc: acc.deep_clone(seen_tees),
3164                input: Box::new(input.deep_clone(seen_tees)),
3165                metadata: metadata.clone(),
3166            },
3167            HydroNode::FoldKeyed {
3168                init,
3169                acc,
3170                input,
3171                metadata,
3172            } => HydroNode::FoldKeyed {
3173                init: init.deep_clone(seen_tees),
3174                acc: acc.deep_clone(seen_tees),
3175                input: Box::new(input.deep_clone(seen_tees)),
3176                metadata: metadata.clone(),
3177            },
3178            HydroNode::ReduceKeyedWatermark {
3179                f,
3180                input,
3181                watermark,
3182                metadata,
3183            } => HydroNode::ReduceKeyedWatermark {
3184                f: f.deep_clone(seen_tees),
3185                input: Box::new(input.deep_clone(seen_tees)),
3186                watermark: Box::new(watermark.deep_clone(seen_tees)),
3187                metadata: metadata.clone(),
3188            },
3189            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3190                f: f.deep_clone(seen_tees),
3191                input: Box::new(input.deep_clone(seen_tees)),
3192                metadata: metadata.clone(),
3193            },
3194            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3195                f: f.deep_clone(seen_tees),
3196                input: Box::new(input.deep_clone(seen_tees)),
3197                metadata: metadata.clone(),
3198            },
3199            HydroNode::Network {
3200                name,
3201                networking_info,
3202                serialize_fn,
3203                instantiate_fn,
3204                deserialize_fn,
3205                input,
3206                metadata,
3207            } => HydroNode::Network {
3208                name: name.clone(),
3209                networking_info: networking_info.clone(),
3210                serialize_fn: serialize_fn.clone(),
3211                instantiate_fn: instantiate_fn.clone(),
3212                deserialize_fn: deserialize_fn.clone(),
3213                input: Box::new(input.deep_clone(seen_tees)),
3214                metadata: metadata.clone(),
3215            },
3216            HydroNode::ExternalInput {
3217                from_external_key,
3218                from_port_id,
3219                from_many,
3220                codec_type,
3221                port_hint,
3222                instantiate_fn,
3223                deserialize_fn,
3224                metadata,
3225            } => HydroNode::ExternalInput {
3226                from_external_key: *from_external_key,
3227                from_port_id: *from_port_id,
3228                from_many: *from_many,
3229                codec_type: codec_type.clone(),
3230                port_hint: *port_hint,
3231                instantiate_fn: instantiate_fn.clone(),
3232                deserialize_fn: deserialize_fn.clone(),
3233                metadata: metadata.clone(),
3234            },
3235            HydroNode::Counter {
3236                tag,
3237                duration,
3238                prefix,
3239                input,
3240                metadata,
3241            } => HydroNode::Counter {
3242                tag: tag.clone(),
3243                duration: duration.clone(),
3244                prefix: prefix.clone(),
3245                input: Box::new(input.deep_clone(seen_tees)),
3246                metadata: metadata.clone(),
3247            },
3248        }
3249    }
3250
3251    #[cfg(feature = "build")]
3252    pub fn emit_core(
3253        &mut self,
3254        builders_or_callback: &mut BuildersOrCallback<
3255            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3256            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3257        >,
3258        seen_tees: &mut SeenSharedNodes,
3259        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3260        next_stmt_id: &mut crate::Counter<StmtId>,
3261        fold_hooked_idents: &mut HashSet<String>,
3262    ) -> syn::Ident {
3263        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3264
3265        self.transform_bottom_up(
3266            &mut |node: &mut HydroNode| {
3267                let out_location = node.metadata().location_id.clone();
3268                match node {
3269                    HydroNode::Placeholder => {
3270                        panic!()
3271                    }
3272
3273                    HydroNode::Cast { .. } => {
3274                        // Cast passes through the input ident unchanged
3275                        // The input ident is already on the stack from processing the child
3276                        let _ = next_stmt_id.get_and_increment();
3277                        match builders_or_callback {
3278                            BuildersOrCallback::Builders(_) => {}
3279                            BuildersOrCallback::Callback(_, node_callback) => {
3280                                node_callback(node, next_stmt_id);
3281                            }
3282                        }
3283                        // input_ident stays on stack as output
3284                    }
3285
3286                    HydroNode::UnboundSingleton { .. } => {
3287                        let inner_ident = ident_stack.pop().unwrap();
3288
3289                        let stmt_id = next_stmt_id.get_and_increment();
3290                        let out_ident =
3291                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3292
3293                        match builders_or_callback {
3294                            BuildersOrCallback::Builders(graph_builders) => {
3295                                if graph_builders.singleton_intermediates() {
3296                                    let builder = graph_builders.get_dfir_mut(&out_location);
3297                                    builder.add_dfir(
3298                                        parse_quote! {
3299                                            #out_ident = #inner_ident;
3300                                        },
3301                                        None,
3302                                        None,
3303                                    );
3304                                } else {
3305                                    let builder = graph_builders.get_dfir_mut(&out_location);
3306                                    builder.add_dfir(
3307                                        parse_quote! {
3308                                            #out_ident = #inner_ident -> persist::<'static>();
3309                                        },
3310                                        None,
3311                                        None,
3312                                    );
3313                                }
3314                            }
3315                            BuildersOrCallback::Callback(_, node_callback) => {
3316                                node_callback(node, next_stmt_id);
3317                            }
3318                        }
3319
3320                        ident_stack.push(out_ident);
3321                    }
3322
3323                    HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3324                        let inner_ident = ident_stack.pop().unwrap();
3325
3326                        let stmt_id = next_stmt_id.get_and_increment();
3327                        let out_ident =
3328                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3329
3330                        match builders_or_callback {
3331                            BuildersOrCallback::Builders(graph_builders) => {
3332                                graph_builders.assert_is_consistent(
3333                                    *trusted,
3334                                    &inner.metadata().location_id,
3335                                    inner_ident,
3336                                    &out_ident,
3337                                );
3338                            }
3339                            BuildersOrCallback::Callback(_, node_callback) => {
3340                                node_callback(node, next_stmt_id);
3341                            }
3342                        }
3343
3344                        ident_stack.push(out_ident);
3345                    }
3346
3347                    HydroNode::ObserveNonDet {
3348                        inner,
3349                        trusted,
3350                        metadata,
3351                        ..
3352                    } => {
3353                        let inner_ident = ident_stack.pop().unwrap();
3354
3355                        let stmt_id = next_stmt_id.get_and_increment();
3356                        let observe_ident =
3357                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3358
3359                        match builders_or_callback {
3360                            BuildersOrCallback::Builders(graph_builders) => {
3361                                graph_builders.observe_nondet(
3362                                    *trusted,
3363                                    &inner.metadata().location_id,
3364                                    inner_ident,
3365                                    &inner.metadata().collection_kind,
3366                                    &observe_ident,
3367                                    &metadata.collection_kind,
3368                                    &metadata.op,
3369                                );
3370                            }
3371                            BuildersOrCallback::Callback(_, node_callback) => {
3372                                node_callback(node, next_stmt_id);
3373                            }
3374                        }
3375
3376                        ident_stack.push(observe_ident);
3377                    }
3378
3379                    HydroNode::Batch {
3380                        inner, metadata, ..
3381                    } => {
3382                        let inner_ident = ident_stack.pop().unwrap();
3383
3384                        let stmt_id = next_stmt_id.get_and_increment();
3385                        let batch_ident =
3386                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3387
3388                        match builders_or_callback {
3389                            BuildersOrCallback::Builders(graph_builders) => {
3390                                graph_builders.batch(
3391                                    inner_ident,
3392                                    &inner.metadata().location_id,
3393                                    &inner.metadata().collection_kind,
3394                                    &batch_ident,
3395                                    &out_location,
3396                                    &metadata.op,
3397                                    fold_hooked_idents,
3398                                );
3399                            }
3400                            BuildersOrCallback::Callback(_, node_callback) => {
3401                                node_callback(node, next_stmt_id);
3402                            }
3403                        }
3404
3405                        ident_stack.push(batch_ident);
3406                    }
3407
3408                    HydroNode::YieldConcat { inner, .. } => {
3409                        let inner_ident = ident_stack.pop().unwrap();
3410
3411                        let stmt_id = next_stmt_id.get_and_increment();
3412                        let yield_ident =
3413                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3414
3415                        match builders_or_callback {
3416                            BuildersOrCallback::Builders(graph_builders) => {
3417                                graph_builders.yield_from_tick(
3418                                    inner_ident,
3419                                    &inner.metadata().location_id,
3420                                    &inner.metadata().collection_kind,
3421                                    &yield_ident,
3422                                    &out_location,
3423                                );
3424                            }
3425                            BuildersOrCallback::Callback(_, node_callback) => {
3426                                node_callback(node, next_stmt_id);
3427                            }
3428                        }
3429
3430                        ident_stack.push(yield_ident);
3431                    }
3432
3433                    HydroNode::BeginAtomic { inner, metadata } => {
3434                        let inner_ident = ident_stack.pop().unwrap();
3435
3436                        let stmt_id = next_stmt_id.get_and_increment();
3437                        let begin_ident =
3438                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3439
3440                        match builders_or_callback {
3441                            BuildersOrCallback::Builders(graph_builders) => {
3442                                graph_builders.begin_atomic(
3443                                    inner_ident,
3444                                    &inner.metadata().location_id,
3445                                    &inner.metadata().collection_kind,
3446                                    &begin_ident,
3447                                    &out_location,
3448                                    &metadata.op,
3449                                );
3450                            }
3451                            BuildersOrCallback::Callback(_, node_callback) => {
3452                                node_callback(node, next_stmt_id);
3453                            }
3454                        }
3455
3456                        ident_stack.push(begin_ident);
3457                    }
3458
3459                    HydroNode::EndAtomic { inner, .. } => {
3460                        let inner_ident = ident_stack.pop().unwrap();
3461
3462                        let stmt_id = next_stmt_id.get_and_increment();
3463                        let end_ident =
3464                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3465
3466                        match builders_or_callback {
3467                            BuildersOrCallback::Builders(graph_builders) => {
3468                                graph_builders.end_atomic(
3469                                    inner_ident,
3470                                    &inner.metadata().location_id,
3471                                    &inner.metadata().collection_kind,
3472                                    &end_ident,
3473                                );
3474                            }
3475                            BuildersOrCallback::Callback(_, node_callback) => {
3476                                node_callback(node, next_stmt_id);
3477                            }
3478                        }
3479
3480                        ident_stack.push(end_ident);
3481                    }
3482
3483                    HydroNode::Source {
3484                        source, metadata, ..
3485                    } => {
3486                        if let HydroSource::ExternalNetwork() = source {
3487                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3488                        } else {
3489                            let stmt_id = next_stmt_id.get_and_increment();
3490                            let source_ident =
3491                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3492
3493                            let source_stmt = match source {
3494                                HydroSource::Stream(expr) => {
3495                                    debug_assert!(metadata.location_id.is_top_level());
3496                                    parse_quote! {
3497                                        #source_ident = source_stream(#expr);
3498                                    }
3499                                }
3500
3501                                HydroSource::ExternalNetwork() => {
3502                                    unreachable!()
3503                                }
3504
3505                                HydroSource::Iter(expr) => {
3506                                    if metadata.location_id.is_top_level() {
3507                                        parse_quote! {
3508                                            #source_ident = source_iter(#expr);
3509                                        }
3510                                    } else {
3511                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3512                                        parse_quote! {
3513                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3514                                        }
3515                                    }
3516                                }
3517
3518                                HydroSource::Spin() => {
3519                                    debug_assert!(metadata.location_id.is_top_level());
3520                                    parse_quote! {
3521                                        #source_ident = spin();
3522                                    }
3523                                }
3524
3525                                HydroSource::ClusterMembers(target_loc, state) => {
3526                                    debug_assert!(metadata.location_id.is_top_level());
3527
3528                                    let members_tee_ident = syn::Ident::new(
3529                                        &format!(
3530                                            "__cluster_members_tee_{}_{}",
3531                                            metadata.location_id.root().key(),
3532                                            target_loc.key(),
3533                                        ),
3534                                        Span::call_site(),
3535                                    );
3536
3537                                    match state {
3538                                        ClusterMembersState::Stream(d) => {
3539                                            parse_quote! {
3540                                                #members_tee_ident = source_stream(#d) -> tee();
3541                                                #source_ident = #members_tee_ident;
3542                                            }
3543                                        },
3544                                        ClusterMembersState::Uninit => syn::parse_quote! {
3545                                            #source_ident = source_stream(DUMMY);
3546                                        },
3547                                        ClusterMembersState::Tee(..) => parse_quote! {
3548                                            #source_ident = #members_tee_ident;
3549                                        },
3550                                    }
3551                                }
3552
3553                                HydroSource::Embedded(ident) => {
3554                                    parse_quote! {
3555                                        #source_ident = source_stream(#ident);
3556                                    }
3557                                }
3558
3559                                HydroSource::EmbeddedSingleton(ident) => {
3560                                    parse_quote! {
3561                                        #source_ident = source_iter([#ident]);
3562                                    }
3563                                }
3564                            };
3565
3566                            match builders_or_callback {
3567                                BuildersOrCallback::Builders(graph_builders) => {
3568                                    let builder = graph_builders.get_dfir_mut(&out_location);
3569                                    builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3570                                }
3571                                BuildersOrCallback::Callback(_, node_callback) => {
3572                                    node_callback(node, next_stmt_id);
3573                                }
3574                            }
3575
3576                            ident_stack.push(source_ident);
3577                        }
3578                    }
3579
3580                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3581                        let stmt_id = next_stmt_id.get_and_increment();
3582                        let source_ident =
3583                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3584
3585                        match builders_or_callback {
3586                            BuildersOrCallback::Builders(graph_builders) => {
3587                                let builder = graph_builders.get_dfir_mut(&out_location);
3588
3589                                if *first_tick_only {
3590                                    assert!(
3591                                        !metadata.location_id.is_top_level(),
3592                                        "first_tick_only SingletonSource must be inside a tick"
3593                                    );
3594                                }
3595
3596                                if *first_tick_only
3597                                    || (metadata.location_id.is_top_level()
3598                                        && metadata.collection_kind.is_bounded())
3599                                {
3600                                    builder.add_dfir(
3601                                        parse_quote! {
3602                                            #source_ident = source_iter([#value]);
3603                                        },
3604                                        None,
3605                                        Some(&stmt_id.to_string()),
3606                                    );
3607                                } else {
3608                                    builder.add_dfir(
3609                                        parse_quote! {
3610                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3611                                        },
3612                                        None,
3613                                        Some(&stmt_id.to_string()),
3614                                    );
3615                                }
3616                            }
3617                            BuildersOrCallback::Callback(_, node_callback) => {
3618                                node_callback(node, next_stmt_id);
3619                            }
3620                        }
3621
3622                        ident_stack.push(source_ident);
3623                    }
3624
3625                    HydroNode::CycleSource { cycle_id, .. } => {
3626                        let ident = cycle_id.as_ident();
3627
3628                        // consume a stmt id even though we did not emit anything so that we can instrument this
3629                        let _ = next_stmt_id.get_and_increment();
3630
3631                        match builders_or_callback {
3632                            BuildersOrCallback::Builders(_) => {}
3633                            BuildersOrCallback::Callback(_, node_callback) => {
3634                                node_callback(node, next_stmt_id);
3635                            }
3636                        }
3637
3638                        ident_stack.push(ident);
3639                    }
3640
3641                    HydroNode::Tee { inner, .. } => {
3642                        // we consume a stmt id regardless of if we emit the tee() operator,
3643                        // so that during rewrites we touch all recipients of the tee()
3644                        let stmt_id = next_stmt_id.get_and_increment();
3645
3646                        let ret_ident = if let Some(built_idents) =
3647                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3648                        {
3649                            match builders_or_callback {
3650                                BuildersOrCallback::Builders(_) => {}
3651                                BuildersOrCallback::Callback(_, node_callback) => {
3652                                    node_callback(node, next_stmt_id);
3653                                }
3654                            }
3655
3656                            built_idents[0].clone()
3657                        } else {
3658                            // The inner node was already processed by transform_bottom_up,
3659                            // so its ident is on the stack
3660                            let inner_ident = ident_stack.pop().unwrap();
3661
3662                            let tee_ident =
3663                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3664
3665                            built_tees.insert(
3666                                inner.0.as_ref() as *const RefCell<HydroNode>,
3667                                vec![tee_ident.clone()],
3668                            );
3669
3670                            match builders_or_callback {
3671                                BuildersOrCallback::Builders(graph_builders) => {
3672                                    // NOTE: With `forward_ref`, the fold codegen may not have
3673                                    // run yet when we reach this tee, so `fold_hooked_idents`
3674                                    // might not contain the inner ident. In that case we won't
3675                                    // propagate the "hooked" status to the tee and the
3676                                    // downstream singleton batch will use the normal
3677                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3678                                    // This is not a soundness issue: the fallback hook still
3679                                    // produces correct behavior, just with a redundant decision
3680                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3681                                    // fix ordering so forward_ref folds are always processed
3682                                    // before their downstream tees.
3683                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3684                                        fold_hooked_idents.insert(tee_ident.to_string());
3685                                    }
3686                                    let builder = graph_builders.get_dfir_mut(&out_location);
3687                                    builder.add_dfir(
3688                                        parse_quote! {
3689                                            #tee_ident = #inner_ident -> tee();
3690                                        },
3691                                        None,
3692                                        Some(&stmt_id.to_string()),
3693                                    );
3694                                }
3695                                BuildersOrCallback::Callback(_, node_callback) => {
3696                                    node_callback(node, next_stmt_id);
3697                                }
3698                            }
3699
3700                            tee_ident
3701                        };
3702
3703                        ident_stack.push(ret_ident);
3704                    }
3705
3706                    HydroNode::Reference { inner, kind, .. } => {
3707                        // we consume a stmt id regardless of if we emit the operator,
3708                        // so that during rewrites we touch all recipients
3709                        let stmt_id = next_stmt_id.get_and_increment();
3710
3711                        let ret_ident = if let Some(built_idents) =
3712                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3713                        {
3714                            built_idents[0].clone()
3715                        } else {
3716                            let inner_ident = ident_stack.pop().unwrap();
3717
3718                            let ref_ident =
3719                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3720
3721                            built_tees.insert(
3722                                inner.0.as_ref() as *const RefCell<HydroNode>,
3723                                vec![ref_ident.clone()],
3724                            );
3725
3726                            match builders_or_callback {
3727                                BuildersOrCallback::Builders(graph_builders) => {
3728                                    let builder = graph_builders.get_dfir_mut(&out_location);
3729                                    let op_ident = syn::Ident::new(
3730                                        match kind {
3731                                            crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3732                                            crate::handoff_ref::HandoffRefKind::Optional => "optional",
3733                                            crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3734                                        },
3735                                        Span::call_site(),
3736                                    );
3737                                    builder.add_dfir(
3738                                        parse_quote! {
3739                                            #ref_ident = #inner_ident -> #op_ident();
3740                                        },
3741                                        None,
3742                                        Some(&stmt_id.to_string()),
3743                                    );
3744                                }
3745                                BuildersOrCallback::Callback(_, node_callback) => {
3746                                    node_callback(node, next_stmt_id);
3747                                }
3748                            }
3749
3750                            ref_ident
3751                        };
3752
3753                        ident_stack.push(ret_ident);
3754                    }
3755
3756                    HydroNode::Partition {
3757                        inner, f, is_true, metadata,
3758                    } => {
3759                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3760                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3761                        let stmt_id = next_stmt_id.get_and_increment();
3762
3763                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3764                            match builders_or_callback {
3765                                BuildersOrCallback::Builders(_) => {}
3766                                BuildersOrCallback::Callback(_, node_callback) => {
3767                                    node_callback(node, next_stmt_id);
3768                                }
3769                            }
3770
3771                            let idx = if is_true { 0 } else { 1 };
3772                            built_idents[idx].clone()
3773                        } else {
3774                            // The inner node was already processed by transform_bottom_up,
3775                            // so its ident is on the stack
3776                            let inner_ident = ident_stack.pop().unwrap();
3777                            let f_tokens = f.emit_tokens(&mut ident_stack);
3778
3779                            let inner_ident = {
3780                                let inner_borrow = inner.0.borrow();
3781                                maybe_observe_for_mut(
3782                                    f, inner_ident,
3783                                    &inner_borrow.metadata().location_id,
3784                                    &inner_borrow.metadata().collection_kind,
3785                                    &metadata.op,
3786                                    builders_or_callback, next_stmt_id,
3787                                )
3788                            };
3789
3790                            let partition_ident = syn::Ident::new(
3791                                &format!("stream_{}_partition", stmt_id),
3792                                Span::call_site(),
3793                            );
3794                            let true_ident = syn::Ident::new(
3795                                &format!("stream_{}_true", stmt_id),
3796                                Span::call_site(),
3797                            );
3798                            let false_ident = syn::Ident::new(
3799                                &format!("stream_{}_false", stmt_id),
3800                                Span::call_site(),
3801                            );
3802
3803                            built_tees.insert(
3804                                ptr,
3805                                vec![true_ident.clone(), false_ident.clone()],
3806                            );
3807
3808                            let stmt_id = next_stmt_id.get_and_increment();
3809                            match builders_or_callback {
3810                                BuildersOrCallback::Builders(graph_builders) => {
3811                                    let builder = graph_builders.get_dfir_mut(&out_location);
3812                                    builder.add_dfir(
3813                                        parse_quote! {
3814                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3815                                            #true_ident = #partition_ident[0];
3816                                            #false_ident = #partition_ident[1];
3817                                        },
3818                                        None,
3819                                        Some(&stmt_id.to_string()),
3820                                    );
3821                                }
3822                                BuildersOrCallback::Callback(_, node_callback) => {
3823                                    node_callback(node, next_stmt_id);
3824                                }
3825                            }
3826
3827                            if is_true { true_ident } else { false_ident }
3828                        };
3829
3830                        ident_stack.push(ret_ident);
3831                    }
3832
3833                    HydroNode::Chain { .. } => {
3834                        // Children are processed left-to-right, so second is on top
3835                        let second_ident = ident_stack.pop().unwrap();
3836                        let first_ident = ident_stack.pop().unwrap();
3837
3838                        let stmt_id = next_stmt_id.get_and_increment();
3839                        let chain_ident =
3840                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3841
3842                        match builders_or_callback {
3843                            BuildersOrCallback::Builders(graph_builders) => {
3844                                let builder = graph_builders.get_dfir_mut(&out_location);
3845                                builder.add_dfir(
3846                                    parse_quote! {
3847                                        #chain_ident = chain();
3848                                        #first_ident -> [0]#chain_ident;
3849                                        #second_ident -> [1]#chain_ident;
3850                                    },
3851                                    None,
3852                                    Some(&stmt_id.to_string()),
3853                                );
3854                            }
3855                            BuildersOrCallback::Callback(_, node_callback) => {
3856                                node_callback(node, next_stmt_id);
3857                            }
3858                        }
3859
3860                        ident_stack.push(chain_ident);
3861                    }
3862
3863                    HydroNode::MergeOrdered { first, metadata, .. } => {
3864                        let second_ident = ident_stack.pop().unwrap();
3865                        let first_ident = ident_stack.pop().unwrap();
3866
3867                        let stmt_id = next_stmt_id.get_and_increment();
3868                        let merge_ident =
3869                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3870
3871                        match builders_or_callback {
3872                            BuildersOrCallback::Builders(graph_builders) => {
3873                                graph_builders.merge_ordered(
3874                                    &first.metadata().location_id,
3875                                    first_ident,
3876                                    second_ident,
3877                                    &merge_ident,
3878                                    &first.metadata().collection_kind,
3879                                    &metadata.op,
3880                                    Some(&stmt_id.to_string()),
3881                                );
3882                            }
3883                            BuildersOrCallback::Callback(_, node_callback) => {
3884                                node_callback(node, next_stmt_id);
3885                            }
3886                        }
3887
3888                        ident_stack.push(merge_ident);
3889                    }
3890
3891                    HydroNode::ChainFirst { .. } => {
3892                        let second_ident = ident_stack.pop().unwrap();
3893                        let first_ident = ident_stack.pop().unwrap();
3894
3895                        let stmt_id = next_stmt_id.get_and_increment();
3896                        let chain_ident =
3897                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3898
3899                        match builders_or_callback {
3900                            BuildersOrCallback::Builders(graph_builders) => {
3901                                let builder = graph_builders.get_dfir_mut(&out_location);
3902                                builder.add_dfir(
3903                                    parse_quote! {
3904                                        #chain_ident = chain_first_n(1);
3905                                        #first_ident -> [0]#chain_ident;
3906                                        #second_ident -> [1]#chain_ident;
3907                                    },
3908                                    None,
3909                                    Some(&stmt_id.to_string()),
3910                                );
3911                            }
3912                            BuildersOrCallback::Callback(_, node_callback) => {
3913                                node_callback(node, next_stmt_id);
3914                            }
3915                        }
3916
3917                        ident_stack.push(chain_ident);
3918                    }
3919
3920                    HydroNode::CrossSingleton { right, .. } => {
3921                        let right_ident = ident_stack.pop().unwrap();
3922                        let left_ident = ident_stack.pop().unwrap();
3923
3924                        let stmt_id = next_stmt_id.get_and_increment();
3925                        let cross_ident =
3926                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3927
3928                        match builders_or_callback {
3929                            BuildersOrCallback::Builders(graph_builders) => {
3930                                let builder = graph_builders.get_dfir_mut(&out_location);
3931
3932                                if right.metadata().location_id.is_top_level()
3933                                    && right.metadata().collection_kind.is_bounded()
3934                                {
3935                                    builder.add_dfir(
3936                                        parse_quote! {
3937                                            #cross_ident = cross_singleton::<'static>();
3938                                            #left_ident -> [input]#cross_ident;
3939                                            #right_ident -> [single]#cross_ident;
3940                                        },
3941                                        None,
3942                                        Some(&stmt_id.to_string()),
3943                                    );
3944                                } else {
3945                                    builder.add_dfir(
3946                                        parse_quote! {
3947                                            #cross_ident = cross_singleton();
3948                                            #left_ident -> [input]#cross_ident;
3949                                            #right_ident -> [single]#cross_ident;
3950                                        },
3951                                        None,
3952                                        Some(&stmt_id.to_string()),
3953                                    );
3954                                }
3955                            }
3956                            BuildersOrCallback::Callback(_, node_callback) => {
3957                                node_callback(node, next_stmt_id);
3958                            }
3959                        }
3960
3961                        ident_stack.push(cross_ident);
3962                    }
3963
3964                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3965                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3966                            parse_quote!(cross_join_multiset)
3967                        } else {
3968                            parse_quote!(join_multiset)
3969                        };
3970
3971                        let (HydroNode::CrossProduct { left, right, .. }
3972                        | HydroNode::Join { left, right, .. }) = node
3973                        else {
3974                            unreachable!()
3975                        };
3976
3977                        let is_top_level = left.metadata().location_id.is_top_level()
3978                            && right.metadata().location_id.is_top_level();
3979                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3980                            quote!('static)
3981                        } else {
3982                            quote!('tick)
3983                        };
3984
3985                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3986                            quote!('static)
3987                        } else {
3988                            quote!('tick)
3989                        };
3990
3991                        let right_ident = ident_stack.pop().unwrap();
3992                        let left_ident = ident_stack.pop().unwrap();
3993
3994                        let stmt_id = next_stmt_id.get_and_increment();
3995                        let stream_ident =
3996                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3997
3998                        match builders_or_callback {
3999                            BuildersOrCallback::Builders(graph_builders) => {
4000                                let builder = graph_builders.get_dfir_mut(&out_location);
4001                                builder.add_dfir(
4002                                    if is_top_level {
4003                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
4004                                        // a multiset_delta() to negate the replay behavior
4005                                        parse_quote! {
4006                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
4007                                            #left_ident -> [0]#stream_ident;
4008                                            #right_ident -> [1]#stream_ident;
4009                                        }
4010                                    } else {
4011                                        parse_quote! {
4012                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
4013                                            #left_ident -> [0]#stream_ident;
4014                                            #right_ident -> [1]#stream_ident;
4015                                        }
4016                                    }
4017                                    ,
4018                                    None,
4019                                    Some(&stmt_id.to_string()),
4020                                );
4021                            }
4022                            BuildersOrCallback::Callback(_, node_callback) => {
4023                                node_callback(node, next_stmt_id);
4024                            }
4025                        }
4026
4027                        ident_stack.push(stream_ident);
4028                    }
4029
4030                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
4031                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
4032                            parse_quote!(difference)
4033                        } else {
4034                            parse_quote!(anti_join)
4035                        };
4036
4037                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
4038                            node
4039                        else {
4040                            unreachable!()
4041                        };
4042
4043                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
4044                            quote!('static)
4045                        } else {
4046                            quote!('tick)
4047                        };
4048
4049                        let neg_ident = ident_stack.pop().unwrap();
4050                        let pos_ident = ident_stack.pop().unwrap();
4051
4052                        let stmt_id = next_stmt_id.get_and_increment();
4053                        let stream_ident =
4054                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4055
4056                        match builders_or_callback {
4057                            BuildersOrCallback::Builders(graph_builders) => {
4058                                let builder = graph_builders.get_dfir_mut(&out_location);
4059                                builder.add_dfir(
4060                                    parse_quote! {
4061                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
4062                                        #pos_ident -> [pos]#stream_ident;
4063                                        #neg_ident -> [neg]#stream_ident;
4064                                    },
4065                                    None,
4066                                    Some(&stmt_id.to_string()),
4067                                );
4068                            }
4069                            BuildersOrCallback::Callback(_, node_callback) => {
4070                                node_callback(node, next_stmt_id);
4071                            }
4072                        }
4073
4074                        ident_stack.push(stream_ident);
4075                    }
4076
4077                    HydroNode::JoinHalf { .. } => {
4078                        let HydroNode::JoinHalf { right, .. } = node else {
4079                            unreachable!()
4080                        };
4081
4082                        assert!(
4083                            right.metadata().collection_kind.is_bounded(),
4084                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
4085                            right.metadata().collection_kind
4086                        );
4087
4088                        let build_lifetime = if right.metadata().location_id.is_top_level() {
4089                            quote!('static)
4090                        } else {
4091                            quote!('tick)
4092                        };
4093
4094                        let build_ident = ident_stack.pop().unwrap();
4095                        let probe_ident = ident_stack.pop().unwrap();
4096
4097                        let stmt_id = next_stmt_id.get_and_increment();
4098                        let stream_ident =
4099                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4100
4101                        match builders_or_callback {
4102                            BuildersOrCallback::Builders(graph_builders) => {
4103                                let builder = graph_builders.get_dfir_mut(&out_location);
4104                                builder.add_dfir(
4105                                    parse_quote! {
4106                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
4107                                        #probe_ident -> [probe]#stream_ident;
4108                                        #build_ident -> [build]#stream_ident;
4109                                    },
4110                                    None,
4111                                    Some(&stmt_id.to_string()),
4112                                );
4113                            }
4114                            BuildersOrCallback::Callback(_, node_callback) => {
4115                                node_callback(node, next_stmt_id);
4116                            }
4117                        }
4118
4119                        ident_stack.push(stream_ident);
4120                    }
4121
4122                    HydroNode::ResolveFutures { .. } => {
4123                        let input_ident = ident_stack.pop().unwrap();
4124
4125                        let stmt_id = next_stmt_id.get_and_increment();
4126                        let futures_ident =
4127                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4128
4129                        match builders_or_callback {
4130                            BuildersOrCallback::Builders(graph_builders) => {
4131                                let builder = graph_builders.get_dfir_mut(&out_location);
4132                                builder.add_dfir(
4133                                    parse_quote! {
4134                                        #futures_ident = #input_ident -> resolve_futures();
4135                                    },
4136                                    None,
4137                                    Some(&stmt_id.to_string()),
4138                                );
4139                            }
4140                            BuildersOrCallback::Callback(_, node_callback) => {
4141                                node_callback(node, next_stmt_id);
4142                            }
4143                        }
4144
4145                        ident_stack.push(futures_ident);
4146                    }
4147
4148                    HydroNode::ResolveFuturesBlocking { .. } => {
4149                        let input_ident = ident_stack.pop().unwrap();
4150
4151                        let stmt_id = next_stmt_id.get_and_increment();
4152                        let futures_ident =
4153                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4154
4155                        match builders_or_callback {
4156                            BuildersOrCallback::Builders(graph_builders) => {
4157                                let builder = graph_builders.get_dfir_mut(&out_location);
4158                                builder.add_dfir(
4159                                    parse_quote! {
4160                                        #futures_ident = #input_ident -> resolve_futures_blocking();
4161                                    },
4162                                    None,
4163                                    Some(&stmt_id.to_string()),
4164                                );
4165                            }
4166                            BuildersOrCallback::Callback(_, node_callback) => {
4167                                node_callback(node, next_stmt_id);
4168                            }
4169                        }
4170
4171                        ident_stack.push(futures_ident);
4172                    }
4173
4174                    HydroNode::ResolveFuturesOrdered { .. } => {
4175                        let input_ident = ident_stack.pop().unwrap();
4176
4177                        let stmt_id = next_stmt_id.get_and_increment();
4178                        let futures_ident =
4179                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4180
4181                        match builders_or_callback {
4182                            BuildersOrCallback::Builders(graph_builders) => {
4183                                let builder = graph_builders.get_dfir_mut(&out_location);
4184                                builder.add_dfir(
4185                                    parse_quote! {
4186                                        #futures_ident = #input_ident -> resolve_futures_ordered();
4187                                    },
4188                                    None,
4189                                    Some(&stmt_id.to_string()),
4190                                );
4191                            }
4192                            BuildersOrCallback::Callback(_, node_callback) => {
4193                                node_callback(node, next_stmt_id);
4194                            }
4195                        }
4196
4197                        ident_stack.push(futures_ident);
4198                    }
4199
4200                    HydroNode::Map {
4201                        f,
4202                        input,
4203                        metadata,
4204                    } => {
4205                        // Pop input ident (pushed last by transform_children).
4206                        let input_ident = ident_stack.pop().unwrap();
4207                        let f_tokens = f.emit_tokens(&mut ident_stack);
4208
4209                        let input_ident = maybe_observe_for_mut(
4210                            f,
4211                            input_ident,
4212                            &input.metadata().location_id,
4213                            &input.metadata().collection_kind,
4214                            &metadata.op,
4215                            builders_or_callback,
4216                            next_stmt_id,
4217                        );
4218
4219                        let stmt_id = next_stmt_id.get_and_increment();
4220                        let map_ident =
4221                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4222
4223                        match builders_or_callback {
4224                            BuildersOrCallback::Builders(graph_builders) => {
4225                                let builder = graph_builders.get_dfir_mut(&out_location);
4226                                builder.add_dfir(
4227                                    parse_quote! {
4228                                        #map_ident = #input_ident -> map(#f_tokens);
4229                                    },
4230                                    None,
4231                                    Some(&stmt_id.to_string()),
4232                                );
4233                            }
4234                            BuildersOrCallback::Callback(_, node_callback) => {
4235                                node_callback(node, next_stmt_id);
4236                            }
4237                        }
4238
4239                        ident_stack.push(map_ident);
4240                    }
4241
4242                    HydroNode::FlatMap { f, input, metadata } => {
4243                        let input_ident = ident_stack.pop().unwrap();
4244                        let f_tokens = f.emit_tokens(&mut ident_stack);
4245
4246                        let input_ident = maybe_observe_for_mut(
4247                            f, input_ident,
4248                            &input.metadata().location_id,
4249                            &input.metadata().collection_kind,
4250                            &metadata.op,
4251                            builders_or_callback, next_stmt_id,
4252                        );
4253
4254                        let stmt_id = next_stmt_id.get_and_increment();
4255                        let flat_map_ident =
4256                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4257
4258                        match builders_or_callback {
4259                            BuildersOrCallback::Builders(graph_builders) => {
4260                                let builder = graph_builders.get_dfir_mut(&out_location);
4261                                builder.add_dfir(
4262                                    parse_quote! {
4263                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4264                                    },
4265                                    None,
4266                                    Some(&stmt_id.to_string()),
4267                                );
4268                            }
4269                            BuildersOrCallback::Callback(_, node_callback) => {
4270                                node_callback(node, next_stmt_id);
4271                            }
4272                        }
4273
4274                        ident_stack.push(flat_map_ident);
4275                    }
4276
4277                    HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
4278                        let input_ident = ident_stack.pop().unwrap();
4279                        let f_tokens = f.emit_tokens(&mut ident_stack);
4280
4281                        let input_ident = maybe_observe_for_mut(
4282                            f, input_ident,
4283                            &input.metadata().location_id,
4284                            &input.metadata().collection_kind,
4285                            &metadata.op,
4286                            builders_or_callback, next_stmt_id,
4287                        );
4288
4289                        let stmt_id = next_stmt_id.get_and_increment();
4290                        let flat_map_stream_blocking_ident =
4291                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4292
4293                        match builders_or_callback {
4294                            BuildersOrCallback::Builders(graph_builders) => {
4295                                let builder = graph_builders.get_dfir_mut(&out_location);
4296                                builder.add_dfir(
4297                                    parse_quote! {
4298                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4299                                    },
4300                                    None,
4301                                    Some(&stmt_id.to_string()),
4302                                );
4303                            }
4304                            BuildersOrCallback::Callback(_, node_callback) => {
4305                                node_callback(node, next_stmt_id);
4306                            }
4307                        }
4308
4309                        ident_stack.push(flat_map_stream_blocking_ident);
4310                    }
4311
4312                    HydroNode::Filter { f, input, metadata } => {
4313                        let input_ident = ident_stack.pop().unwrap();
4314                        let f_tokens = f.emit_tokens(&mut ident_stack);
4315
4316                        let input_ident = maybe_observe_for_mut(
4317                            f, input_ident,
4318                            &input.metadata().location_id,
4319                            &input.metadata().collection_kind,
4320                            &metadata.op,
4321                            builders_or_callback, next_stmt_id,
4322                        );
4323
4324                        let stmt_id = next_stmt_id.get_and_increment();
4325                        let filter_ident =
4326                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4327
4328                        match builders_or_callback {
4329                            BuildersOrCallback::Builders(graph_builders) => {
4330                                let builder = graph_builders.get_dfir_mut(&out_location);
4331                                builder.add_dfir(
4332                                    parse_quote! {
4333                                        #filter_ident = #input_ident -> filter(#f_tokens);
4334                                    },
4335                                    None,
4336                                    Some(&stmt_id.to_string()),
4337                                );
4338                            }
4339                            BuildersOrCallback::Callback(_, node_callback) => {
4340                                node_callback(node, next_stmt_id);
4341                            }
4342                        }
4343
4344                        ident_stack.push(filter_ident);
4345                    }
4346
4347                    HydroNode::FilterMap { f, input, metadata } => {
4348                        let input_ident = ident_stack.pop().unwrap();
4349                        let f_tokens = f.emit_tokens(&mut ident_stack);
4350
4351                        let input_ident = maybe_observe_for_mut(
4352                            f, input_ident,
4353                            &input.metadata().location_id,
4354                            &input.metadata().collection_kind,
4355                            &metadata.op,
4356                            builders_or_callback, next_stmt_id,
4357                        );
4358
4359                        let stmt_id = next_stmt_id.get_and_increment();
4360                        let filter_map_ident =
4361                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4362
4363                        match builders_or_callback {
4364                            BuildersOrCallback::Builders(graph_builders) => {
4365                                let builder = graph_builders.get_dfir_mut(&out_location);
4366                                builder.add_dfir(
4367                                    parse_quote! {
4368                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4369                                    },
4370                                    None,
4371                                    Some(&stmt_id.to_string()),
4372                                );
4373                            }
4374                            BuildersOrCallback::Callback(_, node_callback) => {
4375                                node_callback(node, next_stmt_id);
4376                            }
4377                        }
4378
4379                        ident_stack.push(filter_map_ident);
4380                    }
4381
4382                    HydroNode::Sort { .. } => {
4383                        let input_ident = ident_stack.pop().unwrap();
4384
4385                        let stmt_id = next_stmt_id.get_and_increment();
4386                        let sort_ident =
4387                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4388
4389                        match builders_or_callback {
4390                            BuildersOrCallback::Builders(graph_builders) => {
4391                                let builder = graph_builders.get_dfir_mut(&out_location);
4392                                builder.add_dfir(
4393                                    parse_quote! {
4394                                        #sort_ident = #input_ident -> sort();
4395                                    },
4396                                    None,
4397                                    Some(&stmt_id.to_string()),
4398                                );
4399                            }
4400                            BuildersOrCallback::Callback(_, node_callback) => {
4401                                node_callback(node, next_stmt_id);
4402                            }
4403                        }
4404
4405                        ident_stack.push(sort_ident);
4406                    }
4407
4408                    HydroNode::DeferTick { .. } => {
4409                        let input_ident = ident_stack.pop().unwrap();
4410
4411                        let stmt_id = next_stmt_id.get_and_increment();
4412                        let defer_tick_ident =
4413                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4414
4415                        match builders_or_callback {
4416                            BuildersOrCallback::Builders(graph_builders) => {
4417                                let builder = graph_builders.get_dfir_mut(&out_location);
4418                                builder.add_dfir(
4419                                    parse_quote! {
4420                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4421                                    },
4422                                    None,
4423                                    Some(&stmt_id.to_string()),
4424                                );
4425                            }
4426                            BuildersOrCallback::Callback(_, node_callback) => {
4427                                node_callback(node, next_stmt_id);
4428                            }
4429                        }
4430
4431                        ident_stack.push(defer_tick_ident);
4432                    }
4433
4434                    HydroNode::Enumerate { input, .. } => {
4435                        let input_ident = ident_stack.pop().unwrap();
4436
4437                        let stmt_id = next_stmt_id.get_and_increment();
4438                        let enumerate_ident =
4439                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4440
4441                        match builders_or_callback {
4442                            BuildersOrCallback::Builders(graph_builders) => {
4443                                let builder = graph_builders.get_dfir_mut(&out_location);
4444                                let lifetime = if input.metadata().location_id.is_top_level() {
4445                                    quote!('static)
4446                                } else {
4447                                    quote!('tick)
4448                                };
4449                                builder.add_dfir(
4450                                    parse_quote! {
4451                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4452                                    },
4453                                    None,
4454                                    Some(&stmt_id.to_string()),
4455                                );
4456                            }
4457                            BuildersOrCallback::Callback(_, node_callback) => {
4458                                node_callback(node, next_stmt_id);
4459                            }
4460                        }
4461
4462                        ident_stack.push(enumerate_ident);
4463                    }
4464
4465                    HydroNode::Inspect { f, input, metadata } => {
4466                        let input_ident = ident_stack.pop().unwrap();
4467                        let f_tokens = f.emit_tokens(&mut ident_stack);
4468
4469                        let input_ident = maybe_observe_for_mut(
4470                            f, input_ident,
4471                            &input.metadata().location_id,
4472                            &input.metadata().collection_kind,
4473                            &metadata.op,
4474                            builders_or_callback, next_stmt_id,
4475                        );
4476
4477                        let stmt_id = next_stmt_id.get_and_increment();
4478                        let inspect_ident =
4479                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4480
4481                        match builders_or_callback {
4482                            BuildersOrCallback::Builders(graph_builders) => {
4483                                let builder = graph_builders.get_dfir_mut(&out_location);
4484                                builder.add_dfir(
4485                                    parse_quote! {
4486                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4487                                    },
4488                                    None,
4489                                    Some(&stmt_id.to_string()),
4490                                );
4491                            }
4492                            BuildersOrCallback::Callback(_, node_callback) => {
4493                                node_callback(node, next_stmt_id);
4494                            }
4495                        }
4496
4497                        ident_stack.push(inspect_ident);
4498                    }
4499
4500                    HydroNode::Unique { input, .. } => {
4501                        let input_ident = ident_stack.pop().unwrap();
4502
4503                        let stmt_id = next_stmt_id.get_and_increment();
4504                        let unique_ident =
4505                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4506
4507                        match builders_or_callback {
4508                            BuildersOrCallback::Builders(graph_builders) => {
4509                                let builder = graph_builders.get_dfir_mut(&out_location);
4510                                let lifetime = if input.metadata().location_id.is_top_level() {
4511                                    quote!('static)
4512                                } else {
4513                                    quote!('tick)
4514                                };
4515
4516                                builder.add_dfir(
4517                                    parse_quote! {
4518                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4519                                    },
4520                                    None,
4521                                    Some(&stmt_id.to_string()),
4522                                );
4523                            }
4524                            BuildersOrCallback::Callback(_, node_callback) => {
4525                                node_callback(node, next_stmt_id);
4526                            }
4527                        }
4528
4529                        ident_stack.push(unique_ident);
4530                    }
4531
4532                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4533                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4534                            if input.metadata().location_id.is_top_level()
4535                                && input.metadata().collection_kind.is_bounded()
4536                            {
4537                                parse_quote!(fold_no_replay)
4538                            } else {
4539                                parse_quote!(fold)
4540                            }
4541                        } else if matches!(node, HydroNode::Scan { .. }) {
4542                            parse_quote!(scan)
4543                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4544                            parse_quote!(scan_async_blocking)
4545                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4546                            if input.metadata().location_id.is_top_level()
4547                                && input.metadata().collection_kind.is_bounded()
4548                            {
4549                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4550                            } else {
4551                                parse_quote!(fold_keyed)
4552                            }
4553                        } else {
4554                            unreachable!()
4555                        };
4556
4557                        let (HydroNode::Fold { input, .. }
4558                        | HydroNode::FoldKeyed { input, .. }
4559                        | HydroNode::Scan { input, .. }
4560                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4561                        else {
4562                            unreachable!()
4563                        };
4564
4565                        let lifetime = if input.metadata().location_id.is_top_level() {
4566                            quote!('static)
4567                        } else {
4568                            quote!('tick)
4569                        };
4570
4571                        let input_ident = ident_stack.pop().unwrap();
4572
4573                        let (HydroNode::Fold { init, acc, .. }
4574                        | HydroNode::FoldKeyed { init, acc, .. }
4575                        | HydroNode::Scan { init, acc, .. }
4576                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4577                        else {
4578                            unreachable!()
4579                        };
4580
4581                        let acc_tokens = acc.emit_tokens(&mut ident_stack);
4582                        let init_tokens = init.emit_tokens(&mut ident_stack);
4583
4584                        let stmt_id = next_stmt_id.get_and_increment();
4585                        let fold_ident =
4586                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4587
4588                        match builders_or_callback {
4589                            BuildersOrCallback::Builders(graph_builders) => {
4590                                if matches!(node, HydroNode::Fold { .. })
4591                                    && node.metadata().location_id.is_top_level()
4592                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4593                                    && graph_builders.singleton_intermediates()
4594                                    && !node.metadata().collection_kind.is_bounded()
4595                                {
4596                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4597                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4598                                        &input.metadata().location_id,
4599                                        &input_ident,
4600                                        &input.metadata().collection_kind,
4601                                        &node.metadata().op,
4602                                    );
4603
4604                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4605                                        let acc: syn::Expr = parse_quote!({
4606                                            let mut __inner = #acc_tokens;
4607                                            move |__state, __batch: Vec<_>| {
4608                                                if __batch.is_empty() {
4609                                                    return None;
4610                                                }
4611                                                for __value in __batch {
4612                                                    __inner(__state, __value);
4613                                                }
4614                                                Some(__state.clone())
4615                                            }
4616                                        });
4617                                        (hooked, acc)
4618                                    } else {
4619                                        let acc: syn::Expr = parse_quote!({
4620                                            let mut __inner = #acc_tokens;
4621                                            move |__state, __value| {
4622                                                __inner(__state, __value);
4623                                                Some(__state.clone())
4624                                            }
4625                                        });
4626                                        (&input_ident, acc)
4627                                    };
4628
4629                                    let builder = graph_builders.get_dfir_mut(&out_location);
4630                                    builder.add_dfir(
4631                                        parse_quote! {
4632                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4633                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4634                                            #fold_ident = chain();
4635                                        },
4636                                        None,
4637                                        Some(&stmt_id.to_string()),
4638                                    );
4639
4640                                    if hooked_input_ident.is_some() {
4641                                        fold_hooked_idents.insert(fold_ident.to_string());
4642                                    }
4643                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4644                                    && node.metadata().location_id.is_top_level()
4645                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4646                                    && graph_builders.singleton_intermediates()
4647                                    && !node.metadata().collection_kind.is_bounded()
4648                                {
4649                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4650                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4651                                        &input.metadata().location_id,
4652                                        &input_ident,
4653                                        &input.metadata().collection_kind,
4654                                        &node.metadata().op,
4655                                    );
4656                                    let builder = graph_builders.get_dfir_mut(&out_location);
4657
4658                                    let wrapped_acc: syn::Expr = parse_quote!({
4659                                        let mut __init = #init_tokens;
4660                                        let mut __inner = #acc_tokens;
4661                                        move |__state, __kv: (_, _)| {
4662                                            // TODO(shadaj): we can avoid the clone when the entry exists
4663                                            let __state = __state
4664                                                .entry(::std::clone::Clone::clone(&__kv.0))
4665                                                .or_insert_with(|| (__init)());
4666                                            __inner(__state, __kv.1);
4667                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4668                                        }
4669                                    });
4670
4671                                    if let Some(hooked_input_ident) = hooked_input_ident {
4672                                        builder.add_dfir(
4673                                            parse_quote! {
4674                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4675                                            },
4676                                            None,
4677                                            Some(&stmt_id.to_string()),
4678                                        );
4679
4680                                        fold_hooked_idents.insert(fold_ident.to_string());
4681                                    } else {
4682                                        builder.add_dfir(
4683                                            parse_quote! {
4684                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4685                                            },
4686                                            None,
4687                                            Some(&stmt_id.to_string()),
4688                                        );
4689                                    }
4690                                } else if (matches!(node, HydroNode::Fold { .. })
4691                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4692                                    && !node.metadata().location_id.is_top_level()
4693                                    && graph_builders.singleton_intermediates()
4694                                {
4695                                    let input_ref = match &*node {
4696                                        HydroNode::Fold { input, .. } => input,
4697                                        HydroNode::FoldKeyed { input, .. } => input,
4698                                        _ => unreachable!(),
4699                                    };
4700                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4701                                        &input_ref.metadata().location_id,
4702                                        &input_ident,
4703                                        &input_ref.metadata().collection_kind,
4704                                        &node.metadata().op,
4705                                    );
4706
4707                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4708                                    let builder = graph_builders.get_dfir_mut(&out_location);
4709                                    builder.add_dfir(
4710                                        parse_quote! {
4711                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4712                                        },
4713                                        None,
4714                                        Some(&stmt_id.to_string()),
4715                                    );
4716                                } else {
4717                                    let builder = graph_builders.get_dfir_mut(&out_location);
4718                                    builder.add_dfir(
4719                                        parse_quote! {
4720                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4721                                        },
4722                                        None,
4723                                        Some(&stmt_id.to_string()),
4724                                    );
4725                                }
4726                            }
4727                            BuildersOrCallback::Callback(_, node_callback) => {
4728                                node_callback(node, next_stmt_id);
4729                            }
4730                        }
4731
4732                        ident_stack.push(fold_ident);
4733                    }
4734
4735                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4736                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4737                            if input.metadata().location_id.is_top_level()
4738                                && input.metadata().collection_kind.is_bounded()
4739                            {
4740                                parse_quote!(reduce_no_replay)
4741                            } else {
4742                                parse_quote!(reduce)
4743                            }
4744                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4745                            if input.metadata().location_id.is_top_level()
4746                                && input.metadata().collection_kind.is_bounded()
4747                            {
4748                                todo!(
4749                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4750                                )
4751                            } else {
4752                                parse_quote!(reduce_keyed)
4753                            }
4754                        } else {
4755                            unreachable!()
4756                        };
4757
4758                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4759                        else {
4760                            unreachable!()
4761                        };
4762
4763                        let lifetime = if input.metadata().location_id.is_top_level() {
4764                            quote!('static)
4765                        } else {
4766                            quote!('tick)
4767                        };
4768
4769                        let input_ident = ident_stack.pop().unwrap();
4770
4771                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4772                        else {
4773                            unreachable!()
4774                        };
4775
4776                        let f_tokens = f.emit_tokens(&mut ident_stack);
4777
4778                        let stmt_id = next_stmt_id.get_and_increment();
4779                        let reduce_ident =
4780                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4781
4782                        match builders_or_callback {
4783                            BuildersOrCallback::Builders(graph_builders) => {
4784                                if matches!(node, HydroNode::Reduce { .. })
4785                                    && node.metadata().location_id.is_top_level()
4786                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4787                                    && graph_builders.singleton_intermediates()
4788                                    && !node.metadata().collection_kind.is_bounded()
4789                                {
4790                                    todo!(
4791                                        "Reduce with optional intermediates is not yet supported in simulator"
4792                                    );
4793                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4794                                    && node.metadata().location_id.is_top_level()
4795                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4796                                    && graph_builders.singleton_intermediates()
4797                                    && !node.metadata().collection_kind.is_bounded()
4798                                {
4799                                    todo!(
4800                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4801                                    );
4802                                } else {
4803                                    let builder = graph_builders.get_dfir_mut(&out_location);
4804                                    builder.add_dfir(
4805                                        parse_quote! {
4806                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4807                                        },
4808                                        None,
4809                                        Some(&stmt_id.to_string()),
4810                                    );
4811                                }
4812                            }
4813                            BuildersOrCallback::Callback(_, node_callback) => {
4814                                node_callback(node, next_stmt_id);
4815                            }
4816                        }
4817
4818                        ident_stack.push(reduce_ident);
4819                    }
4820
4821                    HydroNode::ReduceKeyedWatermark {
4822                        f,
4823                        input,
4824                        metadata,
4825                        ..
4826                    } => {
4827                        let lifetime = if input.metadata().location_id.is_top_level() {
4828                            quote!('static)
4829                        } else {
4830                            quote!('tick)
4831                        };
4832
4833                        // watermark is processed second, so it's on top
4834                        let watermark_ident = ident_stack.pop().unwrap();
4835                        let input_ident = ident_stack.pop().unwrap();
4836                        let f_tokens = f.emit_tokens(&mut ident_stack);
4837
4838                        let stmt_id = next_stmt_id.get_and_increment();
4839                        let chain_ident = syn::Ident::new(
4840                            &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4841                            Span::call_site(),
4842                        );
4843
4844                        let fold_ident =
4845                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4846
4847                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4848                            && input.metadata().collection_kind.is_bounded()
4849                        {
4850                            parse_quote!(fold_no_replay)
4851                        } else {
4852                            parse_quote!(fold)
4853                        };
4854
4855                        match builders_or_callback {
4856                            BuildersOrCallback::Builders(graph_builders) => {
4857                                if metadata.location_id.is_top_level()
4858                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4859                                    && graph_builders.singleton_intermediates()
4860                                    && !metadata.collection_kind.is_bounded()
4861                                {
4862                                    todo!(
4863                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4864                                    )
4865                                } else {
4866                                    let builder = graph_builders.get_dfir_mut(&out_location);
4867                                    builder.add_dfir(
4868                                        parse_quote! {
4869                                            #chain_ident = chain();
4870                                            #input_ident
4871                                                -> map(|x| (Some(x), None))
4872                                                -> [0]#chain_ident;
4873                                            #watermark_ident
4874                                                -> map(|watermark| (None, Some(watermark)))
4875                                                -> [1]#chain_ident;
4876
4877                                            #fold_ident = #chain_ident
4878                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4879                                                    let __reduce_keyed_fn = #f_tokens;
4880                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4881                                                        if let Some((k, v)) = opt_payload {
4882                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4883                                                                if k < curr_watermark {
4884                                                                    return;
4885                                                                }
4886                                                            }
4887                                                            match map.entry(k) {
4888                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4889                                                                    e.insert(v);
4890                                                                }
4891                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4892                                                                    __reduce_keyed_fn(e.get_mut(), v);
4893                                                                }
4894                                                            }
4895                                                        } else {
4896                                                            let watermark = opt_watermark.unwrap();
4897                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4898                                                                if watermark <= curr_watermark {
4899                                                                    return;
4900                                                                }
4901                                                            }
4902                                                            map.retain(|k, _| *k >= watermark);
4903                                                            *opt_curr_watermark = Some(watermark);
4904                                                        }
4905                                                    }
4906                                                })
4907                                                -> flat_map(|(map, _curr_watermark)| map);
4908                                        },
4909                                        None,
4910                                        Some(&stmt_id.to_string()),
4911                                    );
4912                                }
4913                            }
4914                            BuildersOrCallback::Callback(_, node_callback) => {
4915                                node_callback(node, next_stmt_id);
4916                            }
4917                        }
4918
4919                        ident_stack.push(fold_ident);
4920                    }
4921
4922                    HydroNode::Network {
4923                        networking_info,
4924                        serialize_fn: serialize_pipeline,
4925                        instantiate_fn,
4926                        deserialize_fn: deserialize_pipeline,
4927                        input,
4928                        ..
4929                    } => {
4930                        let input_ident = ident_stack.pop().unwrap();
4931
4932                        let stmt_id = next_stmt_id.get_and_increment();
4933                        let receiver_stream_ident =
4934                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4935
4936                        match builders_or_callback {
4937                            BuildersOrCallback::Builders(graph_builders) => {
4938                                let (sink_expr, source_expr) = match instantiate_fn {
4939                                    DebugInstantiate::Building => (
4940                                        syn::parse_quote!(DUMMY_SINK),
4941                                        syn::parse_quote!(DUMMY_SOURCE),
4942                                    ),
4943
4944                                    DebugInstantiate::Finalized(finalized) => {
4945                                        (finalized.sink.clone(), finalized.source.clone())
4946                                    }
4947                                };
4948
4949                                graph_builders.create_network(
4950                                    &input.metadata().location_id,
4951                                    &out_location,
4952                                    input_ident,
4953                                    &receiver_stream_ident,
4954                                    serialize_pipeline.as_ref(),
4955                                    sink_expr,
4956                                    source_expr,
4957                                    deserialize_pipeline.as_ref(),
4958                                    stmt_id,
4959                                    networking_info,
4960                                );
4961                            }
4962                            BuildersOrCallback::Callback(_, node_callback) => {
4963                                node_callback(node, next_stmt_id);
4964                            }
4965                        }
4966
4967                        ident_stack.push(receiver_stream_ident);
4968                    }
4969
4970                    HydroNode::ExternalInput {
4971                        instantiate_fn,
4972                        deserialize_fn: deserialize_pipeline,
4973                        ..
4974                    } => {
4975                        let stmt_id = next_stmt_id.get_and_increment();
4976                        let receiver_stream_ident =
4977                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4978
4979                        match builders_or_callback {
4980                            BuildersOrCallback::Builders(graph_builders) => {
4981                                let (_, source_expr) = match instantiate_fn {
4982                                    DebugInstantiate::Building => (
4983                                        syn::parse_quote!(DUMMY_SINK),
4984                                        syn::parse_quote!(DUMMY_SOURCE),
4985                                    ),
4986
4987                                    DebugInstantiate::Finalized(finalized) => {
4988                                        (finalized.sink.clone(), finalized.source.clone())
4989                                    }
4990                                };
4991
4992                                graph_builders.create_external_source(
4993                                    &out_location,
4994                                    source_expr,
4995                                    &receiver_stream_ident,
4996                                    deserialize_pipeline.as_ref(),
4997                                    stmt_id,
4998                                );
4999                            }
5000                            BuildersOrCallback::Callback(_, node_callback) => {
5001                                node_callback(node, next_stmt_id);
5002                            }
5003                        }
5004
5005                        ident_stack.push(receiver_stream_ident);
5006                    }
5007
5008                    HydroNode::Counter {
5009                        tag,
5010                        duration,
5011                        prefix,
5012                        ..
5013                    } => {
5014                        let input_ident = ident_stack.pop().unwrap();
5015
5016                        let stmt_id = next_stmt_id.get_and_increment();
5017                        let counter_ident =
5018                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5019
5020                        match builders_or_callback {
5021                            BuildersOrCallback::Builders(graph_builders) => {
5022                                let arg = format!("{}({})", prefix, tag);
5023                                let builder = graph_builders.get_dfir_mut(&out_location);
5024                                builder.add_dfir(
5025                                    parse_quote! {
5026                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
5027                                    },
5028                                    None,
5029                                    Some(&stmt_id.to_string()),
5030                                );
5031                            }
5032                            BuildersOrCallback::Callback(_, node_callback) => {
5033                                node_callback(node, next_stmt_id);
5034                            }
5035                        }
5036
5037                        ident_stack.push(counter_ident);
5038                    }
5039                }
5040            },
5041            seen_tees,
5042            false,
5043        );
5044
5045        let ret = ident_stack
5046            .pop()
5047            .expect("ident_stack should have exactly one element after traversal");
5048        assert!(
5049            ident_stack.is_empty(),
5050            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
5051             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
5052            ident_stack.len()
5053        );
5054        ret
5055    }
5056
5057    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
5058        match self {
5059            HydroNode::Placeholder => {
5060                panic!()
5061            }
5062            HydroNode::Cast { .. }
5063            | HydroNode::ObserveNonDet { .. }
5064            | HydroNode::UnboundSingleton { .. }
5065            | HydroNode::AssertIsConsistent { .. } => {}
5066            HydroNode::Source { source, .. } => match source {
5067                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
5068                HydroSource::ExternalNetwork()
5069                | HydroSource::Spin()
5070                | HydroSource::ClusterMembers(_, _)
5071                | HydroSource::Embedded(_)
5072                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
5073            },
5074            HydroNode::SingletonSource { value, .. } => {
5075                transform(value);
5076            }
5077            HydroNode::CycleSource { .. }
5078            | HydroNode::Tee { .. }
5079            | HydroNode::Reference { .. }
5080            | HydroNode::YieldConcat { .. }
5081            | HydroNode::BeginAtomic { .. }
5082            | HydroNode::EndAtomic { .. }
5083            | HydroNode::Batch { .. }
5084            | HydroNode::Chain { .. }
5085            | HydroNode::MergeOrdered { .. }
5086            | HydroNode::ChainFirst { .. }
5087            | HydroNode::CrossProduct { .. }
5088            | HydroNode::CrossSingleton { .. }
5089            | HydroNode::ResolveFutures { .. }
5090            | HydroNode::ResolveFuturesBlocking { .. }
5091            | HydroNode::ResolveFuturesOrdered { .. }
5092            | HydroNode::Join { .. }
5093            | HydroNode::JoinHalf { .. }
5094            | HydroNode::Difference { .. }
5095            | HydroNode::AntiJoin { .. }
5096            | HydroNode::DeferTick { .. }
5097            | HydroNode::Enumerate { .. }
5098            | HydroNode::Unique { .. }
5099            | HydroNode::Sort { .. } => {}
5100            HydroNode::Map { f, .. }
5101            | HydroNode::FlatMap { f, .. }
5102            | HydroNode::FlatMapStreamBlocking { f, .. }
5103            | HydroNode::Filter { f, .. }
5104            | HydroNode::FilterMap { f, .. }
5105            | HydroNode::Inspect { f, .. }
5106            | HydroNode::Partition { f, .. }
5107            | HydroNode::Reduce { f, .. }
5108            | HydroNode::ReduceKeyed { f, .. }
5109            | HydroNode::ReduceKeyedWatermark { f, .. } => {
5110                transform(&mut f.expr);
5111            }
5112            HydroNode::Fold { init, acc, .. }
5113            | HydroNode::Scan { init, acc, .. }
5114            | HydroNode::ScanAsyncBlocking { init, acc, .. }
5115            | HydroNode::FoldKeyed { init, acc, .. } => {
5116                transform(&mut init.expr);
5117                transform(&mut acc.expr);
5118            }
5119            HydroNode::Network {
5120                serialize_fn,
5121                deserialize_fn,
5122                ..
5123            } => {
5124                if let Some(serialize_fn) = serialize_fn {
5125                    transform(serialize_fn);
5126                }
5127                if let Some(deserialize_fn) = deserialize_fn {
5128                    transform(deserialize_fn);
5129                }
5130            }
5131            HydroNode::ExternalInput { deserialize_fn, .. } => {
5132                if let Some(deserialize_fn) = deserialize_fn {
5133                    transform(deserialize_fn);
5134                }
5135            }
5136            HydroNode::Counter { duration, .. } => {
5137                transform(duration);
5138            }
5139        }
5140    }
5141
5142    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
5143        &self.metadata().op
5144    }
5145
5146    pub fn metadata(&self) -> &HydroIrMetadata {
5147        match self {
5148            HydroNode::Placeholder => {
5149                panic!()
5150            }
5151            HydroNode::Cast { metadata, .. }
5152            | HydroNode::ObserveNonDet { metadata, .. }
5153            | HydroNode::AssertIsConsistent { metadata, .. }
5154            | HydroNode::UnboundSingleton { metadata, .. }
5155            | HydroNode::Source { metadata, .. }
5156            | HydroNode::SingletonSource { metadata, .. }
5157            | HydroNode::CycleSource { metadata, .. }
5158            | HydroNode::Tee { metadata, .. }
5159            | HydroNode::Reference { metadata, .. }
5160            | HydroNode::Partition { metadata, .. }
5161            | HydroNode::YieldConcat { metadata, .. }
5162            | HydroNode::BeginAtomic { metadata, .. }
5163            | HydroNode::EndAtomic { metadata, .. }
5164            | HydroNode::Batch { metadata, .. }
5165            | HydroNode::Chain { metadata, .. }
5166            | HydroNode::MergeOrdered { metadata, .. }
5167            | HydroNode::ChainFirst { metadata, .. }
5168            | HydroNode::CrossProduct { metadata, .. }
5169            | HydroNode::CrossSingleton { metadata, .. }
5170            | HydroNode::Join { metadata, .. }
5171            | HydroNode::JoinHalf { metadata, .. }
5172            | HydroNode::Difference { metadata, .. }
5173            | HydroNode::AntiJoin { metadata, .. }
5174            | HydroNode::ResolveFutures { metadata, .. }
5175            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5176            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5177            | HydroNode::Map { metadata, .. }
5178            | HydroNode::FlatMap { metadata, .. }
5179            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5180            | HydroNode::Filter { metadata, .. }
5181            | HydroNode::FilterMap { metadata, .. }
5182            | HydroNode::DeferTick { metadata, .. }
5183            | HydroNode::Enumerate { metadata, .. }
5184            | HydroNode::Inspect { metadata, .. }
5185            | HydroNode::Unique { metadata, .. }
5186            | HydroNode::Sort { metadata, .. }
5187            | HydroNode::Scan { metadata, .. }
5188            | HydroNode::ScanAsyncBlocking { metadata, .. }
5189            | HydroNode::Fold { metadata, .. }
5190            | HydroNode::FoldKeyed { metadata, .. }
5191            | HydroNode::Reduce { metadata, .. }
5192            | HydroNode::ReduceKeyed { metadata, .. }
5193            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5194            | HydroNode::ExternalInput { metadata, .. }
5195            | HydroNode::Network { metadata, .. }
5196            | HydroNode::Counter { metadata, .. } => metadata,
5197        }
5198    }
5199
5200    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5201        &mut self.metadata_mut().op
5202    }
5203
5204    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5205        match self {
5206            HydroNode::Placeholder => {
5207                panic!()
5208            }
5209            HydroNode::Cast { metadata, .. }
5210            | HydroNode::ObserveNonDet { metadata, .. }
5211            | HydroNode::AssertIsConsistent { metadata, .. }
5212            | HydroNode::UnboundSingleton { metadata, .. }
5213            | HydroNode::Source { metadata, .. }
5214            | HydroNode::SingletonSource { metadata, .. }
5215            | HydroNode::CycleSource { metadata, .. }
5216            | HydroNode::Tee { metadata, .. }
5217            | HydroNode::Reference { metadata, .. }
5218            | HydroNode::Partition { metadata, .. }
5219            | HydroNode::YieldConcat { metadata, .. }
5220            | HydroNode::BeginAtomic { metadata, .. }
5221            | HydroNode::EndAtomic { metadata, .. }
5222            | HydroNode::Batch { metadata, .. }
5223            | HydroNode::Chain { metadata, .. }
5224            | HydroNode::MergeOrdered { metadata, .. }
5225            | HydroNode::ChainFirst { metadata, .. }
5226            | HydroNode::CrossProduct { metadata, .. }
5227            | HydroNode::CrossSingleton { metadata, .. }
5228            | HydroNode::Join { metadata, .. }
5229            | HydroNode::JoinHalf { metadata, .. }
5230            | HydroNode::Difference { metadata, .. }
5231            | HydroNode::AntiJoin { metadata, .. }
5232            | HydroNode::ResolveFutures { metadata, .. }
5233            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5234            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5235            | HydroNode::Map { metadata, .. }
5236            | HydroNode::FlatMap { metadata, .. }
5237            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5238            | HydroNode::Filter { metadata, .. }
5239            | HydroNode::FilterMap { metadata, .. }
5240            | HydroNode::DeferTick { metadata, .. }
5241            | HydroNode::Enumerate { metadata, .. }
5242            | HydroNode::Inspect { metadata, .. }
5243            | HydroNode::Unique { metadata, .. }
5244            | HydroNode::Sort { metadata, .. }
5245            | HydroNode::Scan { metadata, .. }
5246            | HydroNode::ScanAsyncBlocking { metadata, .. }
5247            | HydroNode::Fold { metadata, .. }
5248            | HydroNode::FoldKeyed { metadata, .. }
5249            | HydroNode::Reduce { metadata, .. }
5250            | HydroNode::ReduceKeyed { metadata, .. }
5251            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5252            | HydroNode::ExternalInput { metadata, .. }
5253            | HydroNode::Network { metadata, .. }
5254            | HydroNode::Counter { metadata, .. } => metadata,
5255        }
5256    }
5257
5258    pub fn input(&self) -> Vec<&HydroNode> {
5259        match self {
5260            HydroNode::Placeholder => {
5261                panic!()
5262            }
5263            HydroNode::Source { .. }
5264            | HydroNode::SingletonSource { .. }
5265            | HydroNode::ExternalInput { .. }
5266            | HydroNode::CycleSource { .. }
5267            | HydroNode::Tee { .. }
5268            | HydroNode::Reference { .. }
5269            | HydroNode::Partition { .. } => {
5270                // Tee/Partition should find their input in separate special ways
5271                vec![]
5272            }
5273            HydroNode::Cast { inner, .. }
5274            | HydroNode::ObserveNonDet { inner, .. }
5275            | HydroNode::YieldConcat { inner, .. }
5276            | HydroNode::BeginAtomic { inner, .. }
5277            | HydroNode::EndAtomic { inner, .. }
5278            | HydroNode::Batch { inner, .. }
5279            | HydroNode::UnboundSingleton { inner, .. }
5280            | HydroNode::AssertIsConsistent { inner, .. } => {
5281                vec![inner]
5282            }
5283            HydroNode::Chain { first, second, .. } => {
5284                vec![first, second]
5285            }
5286            HydroNode::MergeOrdered { first, second, .. } => {
5287                vec![first, second]
5288            }
5289            HydroNode::ChainFirst { first, second, .. } => {
5290                vec![first, second]
5291            }
5292            HydroNode::CrossProduct { left, right, .. }
5293            | HydroNode::CrossSingleton { left, right, .. }
5294            | HydroNode::Join { left, right, .. }
5295            | HydroNode::JoinHalf { left, right, .. } => {
5296                vec![left, right]
5297            }
5298            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5299                vec![pos, neg]
5300            }
5301            HydroNode::Map { input, .. }
5302            | HydroNode::FlatMap { input, .. }
5303            | HydroNode::FlatMapStreamBlocking { input, .. }
5304            | HydroNode::Filter { input, .. }
5305            | HydroNode::FilterMap { input, .. }
5306            | HydroNode::Sort { input, .. }
5307            | HydroNode::DeferTick { input, .. }
5308            | HydroNode::Enumerate { input, .. }
5309            | HydroNode::Inspect { input, .. }
5310            | HydroNode::Unique { input, .. }
5311            | HydroNode::Network { input, .. }
5312            | HydroNode::Counter { input, .. }
5313            | HydroNode::ResolveFutures { input, .. }
5314            | HydroNode::ResolveFuturesBlocking { input, .. }
5315            | HydroNode::ResolveFuturesOrdered { input, .. }
5316            | HydroNode::Fold { input, .. }
5317            | HydroNode::FoldKeyed { input, .. }
5318            | HydroNode::Reduce { input, .. }
5319            | HydroNode::ReduceKeyed { input, .. }
5320            | HydroNode::Scan { input, .. }
5321            | HydroNode::ScanAsyncBlocking { input, .. } => {
5322                vec![input]
5323            }
5324            HydroNode::ReduceKeyedWatermark {
5325                input, watermark, ..
5326            } => {
5327                vec![input, watermark]
5328            }
5329        }
5330    }
5331
5332    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5333        self.input()
5334            .iter()
5335            .map(|input_node| input_node.metadata())
5336            .collect()
5337    }
5338
5339    /// Returns `true` if this node is a Tee or Partition whose inner Rc
5340    /// has other live references, meaning the upstream is already driven
5341    /// by another consumer and does not need a Null sink.
5342    pub fn is_shared_with_others(&self) -> bool {
5343        match self {
5344            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5345                Rc::strong_count(&inner.0) > 1
5346            }
5347            // A zero-output reference node is valid in DFIR (it drains itself at
5348            // end of tick), so it doesn't need to be driven by another consumer.
5349            HydroNode::Reference { .. } => false,
5350            _ => false,
5351        }
5352    }
5353
5354    pub fn print_root(&self) -> String {
5355        match self {
5356            HydroNode::Placeholder => {
5357                panic!()
5358            }
5359            HydroNode::Cast { .. } => "Cast()".to_owned(),
5360            HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5361            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5362            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5363            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5364            HydroNode::SingletonSource {
5365                value,
5366                first_tick_only,
5367                ..
5368            } => format!(
5369                "SingletonSource({:?}, first_tick_only={})",
5370                value, first_tick_only
5371            ),
5372            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5373            HydroNode::Tee { inner, .. } => {
5374                format!("Tee({})", inner.0.borrow().print_root())
5375            }
5376            HydroNode::Reference { inner, kind, .. } => {
5377                format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5378            }
5379            HydroNode::Partition { f, is_true, .. } => {
5380                format!("Partition({:?}, is_true={})", f, is_true)
5381            }
5382            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5383            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5384            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5385            HydroNode::Batch { .. } => "Batch()".to_owned(),
5386            HydroNode::Chain { first, second, .. } => {
5387                format!("Chain({}, {})", first.print_root(), second.print_root())
5388            }
5389            HydroNode::MergeOrdered { first, second, .. } => {
5390                format!(
5391                    "MergeOrdered({}, {})",
5392                    first.print_root(),
5393                    second.print_root()
5394                )
5395            }
5396            HydroNode::ChainFirst { first, second, .. } => {
5397                format!(
5398                    "ChainFirst({}, {})",
5399                    first.print_root(),
5400                    second.print_root()
5401                )
5402            }
5403            HydroNode::CrossProduct { left, right, .. } => {
5404                format!(
5405                    "CrossProduct({}, {})",
5406                    left.print_root(),
5407                    right.print_root()
5408                )
5409            }
5410            HydroNode::CrossSingleton { left, right, .. } => {
5411                format!(
5412                    "CrossSingleton({}, {})",
5413                    left.print_root(),
5414                    right.print_root()
5415                )
5416            }
5417            HydroNode::Join { left, right, .. } => {
5418                format!("Join({}, {})", left.print_root(), right.print_root())
5419            }
5420            HydroNode::JoinHalf { left, right, .. } => {
5421                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5422            }
5423            HydroNode::Difference { pos, neg, .. } => {
5424                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5425            }
5426            HydroNode::AntiJoin { pos, neg, .. } => {
5427                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5428            }
5429            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5430            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5431            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5432            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5433            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5434            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5435            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5436            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5437            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5438            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5439            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5440            HydroNode::Unique { .. } => "Unique()".to_owned(),
5441            HydroNode::Sort { .. } => "Sort()".to_owned(),
5442            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5443            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5444            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5445                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5446            }
5447            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5448            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5449            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5450            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5451            HydroNode::Network { .. } => "Network()".to_owned(),
5452            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5453            HydroNode::Counter { tag, duration, .. } => {
5454                format!("Counter({:?}, {:?})", tag, duration)
5455            }
5456        }
5457    }
5458}
5459
5460#[cfg(feature = "build")]
5461fn instantiate_network<'a, D>(
5462    env: &mut D::InstantiateEnv,
5463    from_location: &LocationId,
5464    to_location: &LocationId,
5465    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5466    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5467    name: Option<&str>,
5468    networking_info: &crate::networking::NetworkingInfo,
5469) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5470where
5471    D: Deploy<'a>,
5472{
5473    let ((sink, source), connect_fn) = match (from_location, to_location) {
5474        (&LocationId::Process(from), &LocationId::Process(to)) => {
5475            let from_node = processes
5476                .get(from)
5477                .unwrap_or_else(|| {
5478                    panic!("A process used in the graph was not instantiated: {}", from)
5479                })
5480                .clone();
5481            let to_node = processes
5482                .get(to)
5483                .unwrap_or_else(|| {
5484                    panic!("A process used in the graph was not instantiated: {}", to)
5485                })
5486                .clone();
5487
5488            let sink_port = from_node.next_port();
5489            let source_port = to_node.next_port();
5490
5491            (
5492                D::o2o_sink_source(
5493                    env,
5494                    &from_node,
5495                    &sink_port,
5496                    &to_node,
5497                    &source_port,
5498                    name,
5499                    networking_info,
5500                ),
5501                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5502            )
5503        }
5504        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5505            let from_node = processes
5506                .get(from)
5507                .unwrap_or_else(|| {
5508                    panic!("A process used in the graph was not instantiated: {}", from)
5509                })
5510                .clone();
5511            let to_node = clusters
5512                .get(to)
5513                .unwrap_or_else(|| {
5514                    panic!("A cluster used in the graph was not instantiated: {}", to)
5515                })
5516                .clone();
5517
5518            let sink_port = from_node.next_port();
5519            let source_port = to_node.next_port();
5520
5521            (
5522                D::o2m_sink_source(
5523                    env,
5524                    &from_node,
5525                    &sink_port,
5526                    &to_node,
5527                    &source_port,
5528                    name,
5529                    networking_info,
5530                ),
5531                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5532            )
5533        }
5534        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5535            let from_node = clusters
5536                .get(from)
5537                .unwrap_or_else(|| {
5538                    panic!("A cluster used in the graph was not instantiated: {}", from)
5539                })
5540                .clone();
5541            let to_node = processes
5542                .get(to)
5543                .unwrap_or_else(|| {
5544                    panic!("A process used in the graph was not instantiated: {}", to)
5545                })
5546                .clone();
5547
5548            let sink_port = from_node.next_port();
5549            let source_port = to_node.next_port();
5550
5551            (
5552                D::m2o_sink_source(
5553                    env,
5554                    &from_node,
5555                    &sink_port,
5556                    &to_node,
5557                    &source_port,
5558                    name,
5559                    networking_info,
5560                ),
5561                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5562            )
5563        }
5564        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5565            let from_node = clusters
5566                .get(from)
5567                .unwrap_or_else(|| {
5568                    panic!("A cluster used in the graph was not instantiated: {}", from)
5569                })
5570                .clone();
5571            let to_node = clusters
5572                .get(to)
5573                .unwrap_or_else(|| {
5574                    panic!("A cluster used in the graph was not instantiated: {}", to)
5575                })
5576                .clone();
5577
5578            let sink_port = from_node.next_port();
5579            let source_port = to_node.next_port();
5580
5581            (
5582                D::m2m_sink_source(
5583                    env,
5584                    &from_node,
5585                    &sink_port,
5586                    &to_node,
5587                    &source_port,
5588                    name,
5589                    networking_info,
5590                ),
5591                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5592            )
5593        }
5594        (LocationId::Tick(_, _), _) => panic!(),
5595        (_, LocationId::Tick(_, _)) => panic!(),
5596        (LocationId::Atomic(_), _) => panic!(),
5597        (_, LocationId::Atomic(_)) => panic!(),
5598    };
5599    (sink, source, connect_fn)
5600}
5601
5602#[cfg(test)]
5603mod serde_test;
5604
5605#[cfg(test)]
5606mod test {
5607    use std::mem::size_of;
5608
5609    use stageleft::{QuotedWithContext, q};
5610
5611    use super::*;
5612
5613    #[test]
5614    #[cfg_attr(
5615        not(feature = "build"),
5616        ignore = "expects inclusion of feature-gated fields"
5617    )]
5618    fn hydro_node_size() {
5619        assert_eq!(size_of::<HydroNode>(), 264);
5620    }
5621
5622    #[test]
5623    #[cfg_attr(
5624        not(feature = "build"),
5625        ignore = "expects inclusion of feature-gated fields"
5626    )]
5627    fn hydro_root_size() {
5628        assert_eq!(size_of::<HydroRoot>(), 136);
5629    }
5630
5631    #[test]
5632    fn test_simplify_q_macro_basic() {
5633        // Test basic non-q! expression
5634        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5635        let result = simplify_q_macro(simple_expr.clone());
5636        assert_eq!(result, simple_expr);
5637    }
5638
5639    #[test]
5640    fn test_simplify_q_macro_actual_stageleft_call() {
5641        // Test a simplified version of what a real stageleft call might look like
5642        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5643        let result = simplify_q_macro(stageleft_call);
5644        // This should be processed by our visitor and simplified to q!(...)
5645        // since we detect the stageleft::runtime_support::fn_* pattern
5646        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5647    }
5648
5649    #[test]
5650    fn test_closure_no_pipe_at_start() {
5651        // Test a closure that does not start with a pipe
5652        let stageleft_call = q!({
5653            let foo = 123;
5654            move |b: usize| b + foo
5655        })
5656        .splice_fn1_ctx(&());
5657        let result = simplify_q_macro(stageleft_call);
5658        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5659    }
5660}