Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(feature = "tokio")]
23use crate::location::TopLevel;
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::Atomic;
27use crate::location::{Location, Tick, check_matching_location};
28use crate::nondet::{NonDet, nondet};
29use crate::properties::{
30    ApplyMonotoneStream, ApplyOrderPreservingSingleton, Proved, SingletonMapFuncAlgebra,
31    StreamMapFuncAlgebra, ValidMutCommutativityFor, ValidMutIdempotenceFor,
32};
33
34/// A marker trait indicating which components of a [`Singleton`] may change.
35///
36/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
37/// includes an additional variant [`Monotonic`], which means that the value will only grow.
38pub trait SingletonBound {
39    /// The [`Boundedness`] that this [`Singleton`] would be erased to.
40    type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
41
42    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
43    type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
44
45    /// Returns the [`SingletonBoundKind`] corresponding to this type.
46    fn bound_kind() -> SingletonBoundKind;
47}
48
49impl SingletonBound for Unbounded {
50    type UnderlyingBound = Unbounded;
51
52    type StreamToMonotone = Monotonic;
53
54    fn bound_kind() -> SingletonBoundKind {
55        SingletonBoundKind::Unbounded
56    }
57}
58
59impl SingletonBound for Bounded {
60    type UnderlyingBound = Bounded;
61
62    type StreamToMonotone = Bounded;
63
64    fn bound_kind() -> SingletonBoundKind {
65        SingletonBoundKind::Bounded
66    }
67}
68
69/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
70pub struct Monotonic;
71
72impl SingletonBound for Monotonic {
73    type UnderlyingBound = Unbounded;
74
75    type StreamToMonotone = Monotonic;
76
77    fn bound_kind() -> SingletonBoundKind {
78        SingletonBoundKind::Monotonic
79    }
80}
81
82#[sealed]
83#[diagnostic::on_unimplemented(
84    message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
85    label = "required here",
86    note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
87)]
88/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
89pub trait IsMonotonic: SingletonBound {}
90
91#[sealed]
92#[diagnostic::do_not_recommend]
93impl IsMonotonic for Monotonic {}
94
95#[sealed]
96#[diagnostic::do_not_recommend]
97impl<B: IsBounded> IsMonotonic for B {}
98
99/// A single Rust value that can asynchronously change over time.
100///
101/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
102/// [`Unbounded`], the value will asynchronously change over time.
103///
104/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
105/// a single number that will asynchronously change as events are processed. Singletons also appear
106/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
107/// such as getting the length of a batch of requests.
108///
109/// Type Parameters:
110/// - `Type`: the type of the value in this singleton
111/// - `Loc`: the [`Location`] where the singleton is materialized
112/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
113pub struct Singleton<Type, Loc, Bound: SingletonBound> {
114    pub(crate) location: Loc,
115    pub(crate) ir_node: RefCell<HydroNode>,
116    pub(crate) flow_state: FlowState,
117
118    _phantom: PhantomData<(Type, Loc, Bound)>,
119}
120
121impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
122    fn drop(&mut self) {
123        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
124        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
125            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
126                input: Box::new(ir_node),
127                op_metadata: HydroIrOpMetadata::new(),
128            });
129        }
130    }
131}
132
133impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
134where
135    T: Clone,
136    L: Location<'a>,
137{
138    fn from(value: Singleton<T, L, Bounded>) -> Self {
139        let location = value.location().clone();
140        Singleton::new(
141            location.clone(),
142            HydroNode::UnboundSingleton {
143                inner: Box::new(value.ir_node.replace(HydroNode::Placeholder)),
144                metadata: location
145                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
146            },
147        )
148    }
149}
150
151impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
152where
153    L: Location<'a>,
154{
155    type Location = Tick<L>;
156
157    fn location(&self) -> &Self::Location {
158        self.location()
159    }
160
161    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
162        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
163            location.clone(),
164            HydroNode::DeferTick {
165                input: Box::new(HydroNode::CycleSource {
166                    cycle_id,
167                    metadata: location.new_node_metadata(Self::collection_kind()),
168                }),
169                metadata: location
170                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
171            },
172        );
173
174        from_previous_tick.unwrap_or(initial)
175    }
176}
177
178impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
179where
180    L: Location<'a>,
181{
182    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
183        assert_eq!(
184            Location::id(&self.location),
185            expected_location,
186            "locations do not match"
187        );
188        self.location
189            .flow_state()
190            .borrow_mut()
191            .push_root(HydroRoot::CycleSink {
192                cycle_id,
193                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
194                op_metadata: HydroIrOpMetadata::new(),
195            });
196    }
197}
198
199impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
200where
201    L: Location<'a>,
202{
203    type Location = L;
204
205    fn create_source(cycle_id: CycleId, location: L) -> Self {
206        Singleton::new(
207            location.clone(),
208            HydroNode::CycleSource {
209                cycle_id,
210                metadata: location.new_node_metadata(Self::collection_kind()),
211            },
212        )
213    }
214}
215
216impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
217where
218    L: Location<'a>,
219{
220    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
221        assert_eq!(
222            Location::id(&self.location),
223            expected_location,
224            "locations do not match"
225        );
226        self.location
227            .flow_state()
228            .borrow_mut()
229            .push_root(HydroRoot::CycleSink {
230                cycle_id,
231                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
232                op_metadata: HydroIrOpMetadata::new(),
233            });
234    }
235}
236
237impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
238where
239    T: Clone,
240    L: Location<'a>,
241{
242    fn clone(&self) -> Self {
243        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
244            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
245            *self.ir_node.borrow_mut() = HydroNode::Tee {
246                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
247                metadata: self.location.new_node_metadata(Self::collection_kind()),
248            };
249        }
250
251        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
252            Singleton {
253                location: self.location.clone(),
254                flow_state: self.flow_state.clone(),
255                ir_node: HydroNode::Tee {
256                    inner: SharedNode(inner.0.clone()),
257                    metadata: metadata.clone(),
258                }
259                .into(),
260                _phantom: PhantomData,
261            }
262        } else {
263            unreachable!()
264        }
265    }
266}
267
268#[cfg(stageleft_runtime)]
269fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
270    me: Singleton<T, Tick<L>, B>,
271    other: Optional<O, Tick<L>, B::UnderlyingBound>,
272) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
273    let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
274    super::optional::zip_inside_tick(me_as_optional, other)
275}
276
277impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
278where
279    L: Location<'a>,
280{
281    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
282        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
283        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
284        let flow_state = location.flow_state().clone();
285        Singleton {
286            location,
287            flow_state,
288            ir_node: RefCell::new(ir_node),
289            _phantom: PhantomData,
290        }
291    }
292
293    pub(crate) fn collection_kind() -> CollectionKind {
294        CollectionKind::Singleton {
295            bound: B::bound_kind(),
296            element_type: stageleft::quote_type::<T>().into(),
297        }
298    }
299
300    /// Returns the [`Location`] where this singleton is being materialized.
301    pub fn location(&self) -> &L {
302        &self.location
303    }
304
305    /// Creates a lightweight reference handle to this singleton that can be captured
306    /// inside `q!()` closures. The handle resolves to `&T` at runtime.
307    ///
308    /// The singleton must be bounded, otherwise reading it would be non-deterministic.
309    ///
310    /// ```rust
311    /// # #[cfg(feature = "deploy")] {
312    /// # use hydro_lang::prelude::*;
313    /// # use futures::StreamExt;
314    /// # tokio_test::block_on(async {
315    /// # let mut deployment = hydro_deploy::Deployment::new();
316    /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
317    /// # let process = builder.process::<()>();
318    /// # let external = builder.external::<()>();
319    /// let my_count = process
320    ///     .source_iter(q!(0..5i32))
321    ///     .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
322    /// let count_ref = my_count.by_ref();
323    /// let out_port = process
324    ///     .source_iter(q!(1..=3i32))
325    ///     .map(q!(|x| x + *count_ref))
326    ///     .send_bincode_external(&external);
327    /// # let nodes = builder
328    /// #     .with_default_optimize()
329    /// #     .with_process(&process, deployment.Localhost())
330    /// #     .with_external(&external, deployment.Localhost())
331    /// #     .deploy(&mut deployment);
332    /// # deployment.deploy().await.unwrap();
333    /// # let mut out_recv = nodes.connect(out_port).await;
334    /// # deployment.start().await.unwrap();
335    /// # let mut results = Vec::new();
336    /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
337    /// # results.sort();
338    /// // fold(0..5) = 10, so results are 11, 12, 13
339    /// # assert_eq!(results, vec![11, 12, 13]);
340    /// # });
341    /// # }
342    /// ```
343    pub fn by_ref(&self) -> crate::handoff_ref::SingletonRef<'a, '_, T, L>
344    where
345        B: IsBounded,
346    {
347        crate::handoff_ref::SingletonRef::new(&self.ir_node)
348    }
349
350    /// Returns a mutable reference handle to this singleton that can be captured inside `q!()`
351    /// closures. The handle resolves to `&mut T` at runtime.
352    ///
353    /// Mutable references are ordered via access groups in the generated DFIR code, ensuring
354    /// exclusive access at each point in the execution order.
355    ///
356    /// ```rust
357    /// # #[cfg(feature = "deploy")] {
358    /// # use hydro_lang::prelude::*;
359    /// # use futures::StreamExt;
360    /// # tokio_test::block_on(async {
361    /// # let mut deployment = hydro_deploy::Deployment::new();
362    /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
363    /// # let process = builder.process::<()>();
364    /// # let external = builder.external::<()>();
365    /// let my_count = process
366    ///     .source_iter(q!(0..5i32))
367    ///     .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
368    /// let count_mut = my_count.by_mut();
369    /// let out_port = process
370    ///     .source_iter(q!(1..=3i32))
371    ///     .map(q!(|x| {
372    ///         *count_mut += x;
373    ///         *count_mut
374    ///     }))
375    ///     .send_bincode_external(&external);
376    /// # let nodes = builder
377    /// #     .with_default_optimize()
378    /// #     .with_process(&process, deployment.Localhost())
379    /// #     .with_external(&external, deployment.Localhost())
380    /// #     .deploy(&mut deployment);
381    /// # deployment.deploy().await.unwrap();
382    /// # let mut out_recv = nodes.connect(out_port).await;
383    /// # deployment.start().await.unwrap();
384    /// # let mut results = Vec::new();
385    /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
386    /// # results.sort();
387    /// // fold(0..5) = 10, then each map adds x: results are 11, 13, 16
388    /// # assert_eq!(results, vec![11, 13, 16]);
389    /// # });
390    /// # }
391    /// ```
392    pub fn by_mut(&self) -> crate::handoff_ref::SingletonMut<'a, '_, T, L>
393    where
394        B: IsBounded,
395    {
396        crate::handoff_ref::SingletonMut::new(&self.ir_node)
397    }
398
399    /// Weakens the consistency of this live collection to not guarantee any consistency across
400    /// cluster members (if this collection is on a cluster).
401    pub fn weaken_consistency(self) -> Singleton<T, L::DropConsistency, B>
402    where
403        L: Location<'a>,
404    {
405        if L::consistency()
406            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
407        {
408            // already no consistency
409            Singleton::new(
410                self.location.drop_consistency(),
411                self.ir_node.replace(HydroNode::Placeholder),
412            )
413        } else {
414            Singleton::new(
415                self.location.drop_consistency(),
416                HydroNode::Cast {
417                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
418                    metadata:
419                        self.location
420                            .clone()
421                            .drop_consistency()
422                            .new_node_metadata(
423                                Singleton::<T, L::DropConsistency, B>::collection_kind(),
424                            ),
425                },
426            )
427        }
428    }
429
430    /// Casts this live collection to have the consistency guarantees specified in the given
431    /// location type parameter. The developer must ensure that the strengthened consistency
432    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
433    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
434        self,
435        _proof: impl crate::properties::ConsistencyProof,
436    ) -> Singleton<T, L2, B>
437    where
438        L: Location<'a>,
439    {
440        if L::consistency() == L2::consistency() {
441            Singleton::new(
442                self.location.with_consistency_of(),
443                self.ir_node.replace(HydroNode::Placeholder),
444            )
445        } else {
446            Singleton::new(
447                self.location.with_consistency_of(),
448                HydroNode::AssertIsConsistent {
449                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
450                    trusted: false,
451                    metadata: self
452                        .location
453                        .clone()
454                        .with_consistency_of::<L2>()
455                        .new_node_metadata(Singleton::<T, L2, B>::collection_kind()),
456                },
457            )
458        }
459    }
460
461    /// Drops the monotonicity property of the [`Singleton`].
462    pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
463        if B::bound_kind() == B::UnderlyingBound::bound_kind() {
464            Singleton::new(
465                self.location.clone(),
466                self.ir_node.replace(HydroNode::Placeholder),
467            )
468        } else {
469            Singleton::new(
470                self.location.clone(),
471                HydroNode::Cast {
472                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
473                    metadata:
474                        self.location.new_node_metadata(
475                            Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
476                        ),
477                },
478            )
479        }
480    }
481
482    /// Transforms the singleton value by applying a function `f` to it,
483    /// continuously as the input is updated.
484    ///
485    /// # Example
486    /// ```rust
487    /// # #[cfg(feature = "deploy")] {
488    /// # use hydro_lang::prelude::*;
489    /// # use futures::StreamExt;
490    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
491    /// let tick = process.tick();
492    /// let singleton = tick.singleton(q!(5));
493    /// singleton.map(q!(|v| v * 2)).all_ticks()
494    /// # }, |mut stream| async move {
495    /// // 10
496    /// # assert_eq!(stream.next().await.unwrap(), 10);
497    /// # }));
498    /// # }
499    /// ```
500    pub fn map<U, F, OP, B2: SingletonBound>(
501        self,
502        f: impl IntoQuotedMut<'a, F, L, SingletonMapFuncAlgebra<OP>>,
503    ) -> Singleton<U, L, B2>
504    where
505        F: Fn(T) -> U + 'a,
506        B: ApplyOrderPreservingSingleton<OP, B2>,
507    {
508        let (f, proof) = f.splice_fn1_ctx_props(&self.location);
509        proof.register_proof(&f);
510        let f = f.into();
511        Singleton::new(
512            self.location.clone(),
513            HydroNode::Map {
514                f,
515                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
516                metadata: self
517                    .location
518                    .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
519            },
520        )
521    }
522
523    /// Transforms the singleton value by applying a function `f` to it and then flattening
524    /// the result into a stream, preserving the order of elements.
525    ///
526    /// The function `f` is applied to the singleton value to produce an iterator, and all items
527    /// from that iterator are emitted in the output stream in deterministic order.
528    ///
529    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
530    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
531    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
532    ///
533    /// # Example
534    /// ```rust
535    /// # #[cfg(feature = "deploy")] {
536    /// # use hydro_lang::prelude::*;
537    /// # use futures::StreamExt;
538    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
539    /// let tick = process.tick();
540    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
541    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
542    /// # }, |mut stream| async move {
543    /// // 1, 2, 3
544    /// # for w in vec![1, 2, 3] {
545    /// #     assert_eq!(stream.next().await.unwrap(), w);
546    /// # }
547    /// # }));
548    /// # }
549    /// ```
550    pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
551        self,
552        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
553    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
554    where
555        B: IsBounded,
556        I: IntoIterator<Item = U>,
557        F: FnMut(T) -> I + 'a,
558        C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
559        Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
560    {
561        self.into_stream().flat_map_ordered(f)
562    }
563
564    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
565    /// for the output type `I` to produce items in any order.
566    ///
567    /// The function `f` is applied to the singleton value to produce an iterator, and all items
568    /// from that iterator are emitted in the output stream in non-deterministic order.
569    ///
570    /// # Example
571    /// ```rust
572    /// # #[cfg(feature = "deploy")] {
573    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
574    /// # use futures::StreamExt;
575    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
576    /// let tick = process.tick();
577    /// let singleton = tick.singleton(q!(
578    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
579    /// ));
580    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
581    /// # }, |mut stream| async move {
582    /// // 1, 2, 3, but in no particular order
583    /// # let mut results = Vec::new();
584    /// # for _ in 0..3 {
585    /// #     results.push(stream.next().await.unwrap());
586    /// # }
587    /// # results.sort();
588    /// # assert_eq!(results, vec![1, 2, 3]);
589    /// # }));
590    /// # }
591    /// ```
592    pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
593        self,
594        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
595    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
596    where
597        B: IsBounded,
598        I: IntoIterator<Item = U>,
599        F: FnMut(T) -> I + 'a,
600        C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
601        Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
602    {
603        self.into_stream().flat_map_unordered(f)
604    }
605
606    /// Flattens the singleton value into a stream, preserving the order of elements.
607    ///
608    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
609    /// are emitted in the output stream in deterministic order.
610    ///
611    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
612    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
613    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
614    ///
615    /// # Example
616    /// ```rust
617    /// # #[cfg(feature = "deploy")] {
618    /// # use hydro_lang::prelude::*;
619    /// # use futures::StreamExt;
620    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
621    /// let tick = process.tick();
622    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
623    /// singleton.flatten_ordered().all_ticks()
624    /// # }, |mut stream| async move {
625    /// // 1, 2, 3
626    /// # for w in vec![1, 2, 3] {
627    /// #     assert_eq!(stream.next().await.unwrap(), w);
628    /// # }
629    /// # }));
630    /// # }
631    /// ```
632    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
633    where
634        B: IsBounded,
635        T: IntoIterator<Item = U>,
636    {
637        self.flat_map_ordered(q!(|x| x))
638    }
639
640    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
641    /// for the element type `T` to produce items in any order.
642    ///
643    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
644    /// are emitted in the output stream in non-deterministic order.
645    ///
646    /// # Example
647    /// ```rust
648    /// # #[cfg(feature = "deploy")] {
649    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
650    /// # use futures::StreamExt;
651    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
652    /// let tick = process.tick();
653    /// let singleton = tick.singleton(q!(
654    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
655    /// ));
656    /// singleton.flatten_unordered().all_ticks()
657    /// # }, |mut stream| async move {
658    /// // 1, 2, 3, but in no particular order
659    /// # let mut results = Vec::new();
660    /// # for _ in 0..3 {
661    /// #     results.push(stream.next().await.unwrap());
662    /// # }
663    /// # results.sort();
664    /// # assert_eq!(results, vec![1, 2, 3]);
665    /// # }));
666    /// # }
667    /// ```
668    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
669    where
670        B: IsBounded,
671        T: IntoIterator<Item = U>,
672    {
673        self.flat_map_unordered(q!(|x| x))
674    }
675
676    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
677    ///
678    /// If the predicate returns `true`, the output optional contains the same value.
679    /// If the predicate returns `false`, the output optional is empty.
680    ///
681    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
682    /// not modify or take ownership of the value. If you need to modify the value while filtering
683    /// use [`Singleton::filter_map`] instead.
684    ///
685    /// # Example
686    /// ```rust
687    /// # #[cfg(feature = "deploy")] {
688    /// # use hydro_lang::prelude::*;
689    /// # use futures::StreamExt;
690    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
691    /// let tick = process.tick();
692    /// let singleton = tick.singleton(q!(5));
693    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
694    /// # }, |mut stream| async move {
695    /// // 5
696    /// # assert_eq!(stream.next().await.unwrap(), 5);
697    /// # }));
698    /// # }
699    /// ```
700    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
701    where
702        F: Fn(&T) -> bool + 'a,
703    {
704        let f = f.splice_fn1_borrow_ctx(&self.location).into();
705        Optional::new(
706            self.location.clone(),
707            HydroNode::Filter {
708                f,
709                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
710                metadata: self
711                    .location
712                    .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
713            },
714        )
715    }
716
717    /// An operator that both filters and maps. It yields the value only if the supplied
718    /// closure `f` returns `Some(value)`.
719    ///
720    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
721    /// If the closure returns `None`, the output optional is empty.
722    ///
723    /// # Example
724    /// ```rust
725    /// # #[cfg(feature = "deploy")] {
726    /// # use hydro_lang::prelude::*;
727    /// # use futures::StreamExt;
728    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
729    /// let tick = process.tick();
730    /// let singleton = tick.singleton(q!("42"));
731    /// singleton
732    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
733    ///     .all_ticks()
734    /// # }, |mut stream| async move {
735    /// // 42
736    /// # assert_eq!(stream.next().await.unwrap(), 42);
737    /// # }));
738    /// # }
739    /// ```
740    pub fn filter_map<U, F>(
741        self,
742        f: impl IntoQuotedMut<'a, F, L>,
743    ) -> Optional<U, L, B::UnderlyingBound>
744    where
745        F: Fn(T) -> Option<U> + 'a,
746    {
747        let f = f.splice_fn1_ctx(&self.location).into();
748        Optional::new(
749            self.location.clone(),
750            HydroNode::FilterMap {
751                f,
752                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
753                metadata: self
754                    .location
755                    .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
756            },
757        )
758    }
759
760    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
761    ///
762    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
763    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
764    /// non-null. This is useful for combining several pieces of state together.
765    ///
766    /// # Example
767    /// ```rust
768    /// # #[cfg(feature = "deploy")] {
769    /// # use hydro_lang::prelude::*;
770    /// # use futures::StreamExt;
771    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
772    /// let tick = process.tick();
773    /// let numbers = process
774    ///   .source_iter(q!(vec![123, 456]))
775    ///   .batch(&tick, nondet!(/** test */));
776    /// let count = numbers.clone().count(); // Singleton
777    /// let max = numbers.max(); // Optional
778    /// count.zip(max).all_ticks()
779    /// # }, |mut stream| async move {
780    /// // [(2, 456)]
781    /// # for w in vec![(2, 456)] {
782    /// #     assert_eq!(stream.next().await.unwrap(), w);
783    /// # }
784    /// # }));
785    /// # }
786    /// ```
787    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
788    where
789        Self: ZipResult<'a, O, Location = L>,
790        B: IsBounded,
791    {
792        check_matching_location(&self.location, &Self::other_location(&other));
793
794        if L::is_top_level()
795            && let Some(tick) = self.location.try_tick()
796        {
797            let self_location = self.location().clone();
798            let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
799            let out = zip_inside_tick(
800                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
801                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
802                    other_location.clone(),
803                    HydroNode::Cast {
804                        inner: Box::new(Self::other_ir_node(other)),
805                        metadata: other_location.new_node_metadata(Optional::<
806                            <Self as ZipResult<'a, O>>::OtherType,
807                            Tick<L>,
808                            Bounded,
809                        >::collection_kind(
810                        )),
811                    },
812                )
813                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
814            )
815            .latest();
816
817            Self::make(self_location, out.ir_node.replace(HydroNode::Placeholder))
818        } else {
819            Self::make(
820                self.location.clone(),
821                HydroNode::CrossSingleton {
822                    left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
823                    right: Box::new(Self::other_ir_node(other)),
824                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
825                        bound: B::BOUND_KIND,
826                        element_type: stageleft::quote_type::<
827                            <Self as ZipResult<'a, O>>::ElementType,
828                        >()
829                        .into(),
830                    }),
831                },
832            )
833        }
834    }
835
836    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
837    /// boolean signal is `true`, otherwise the output is null.
838    ///
839    /// # Example
840    /// ```rust
841    /// # #[cfg(feature = "deploy")] {
842    /// # use hydro_lang::prelude::*;
843    /// # use futures::StreamExt;
844    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
845    /// let tick = process.tick();
846    /// // ticks are lazy by default, forces the second tick to run
847    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
848    ///
849    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
850    /// let batch_first_tick = process
851    ///   .source_iter(q!(vec![1]))
852    ///   .batch(&tick, nondet!(/** test */));
853    /// let batch_second_tick = process
854    ///   .source_iter(q!(vec![1, 2, 3]))
855    ///   .batch(&tick, nondet!(/** test */))
856    ///   .defer_tick();
857    /// batch_first_tick.chain(batch_second_tick).count()
858    ///   .filter_if(signal)
859    ///   .all_ticks()
860    /// # }, |mut stream| async move {
861    /// // [1]
862    /// # for w in vec![1] {
863    /// #     assert_eq!(stream.next().await.unwrap(), w);
864    /// # }
865    /// # }));
866    /// # }
867    /// ```
868    pub fn filter_if(
869        self,
870        signal: Singleton<bool, L, B>,
871    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
872    where
873        B: IsBounded,
874    {
875        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
876    }
877
878    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
879    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
880    ///
881    /// Useful for conditionally processing, such as only emitting a singleton's value outside
882    /// a tick if some other condition is satisfied.
883    ///
884    /// # Example
885    /// ```rust
886    /// # #[cfg(feature = "deploy")] {
887    /// # use hydro_lang::prelude::*;
888    /// # use futures::StreamExt;
889    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
890    /// let tick = process.tick();
891    /// // ticks are lazy by default, forces the second tick to run
892    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
893    ///
894    /// let batch_first_tick = process
895    ///   .source_iter(q!(vec![1]))
896    ///   .batch(&tick, nondet!(/** test */));
897    /// let batch_second_tick = process
898    ///   .source_iter(q!(vec![1, 2, 3]))
899    ///   .batch(&tick, nondet!(/** test */))
900    ///   .defer_tick(); // appears on the second tick
901    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
902    /// batch_first_tick.chain(batch_second_tick).count()
903    ///   .filter_if_some(some_on_first_tick)
904    ///   .all_ticks()
905    /// # }, |mut stream| async move {
906    /// // [1]
907    /// # for w in vec![1] {
908    /// #     assert_eq!(stream.next().await.unwrap(), w);
909    /// # }
910    /// # }));
911    /// # }
912    /// ```
913    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
914    pub fn filter_if_some<U>(
915        self,
916        signal: Optional<U, L, B>,
917    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
918    where
919        B: IsBounded,
920    {
921        self.filter_if(signal.is_some())
922    }
923
924    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
925    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
926    ///
927    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
928    /// the condition.
929    ///
930    /// # Example
931    /// ```rust
932    /// # #[cfg(feature = "deploy")] {
933    /// # use hydro_lang::prelude::*;
934    /// # use futures::StreamExt;
935    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
936    /// let tick = process.tick();
937    /// // ticks are lazy by default, forces the second tick to run
938    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
939    ///
940    /// let batch_first_tick = process
941    ///   .source_iter(q!(vec![1]))
942    ///   .batch(&tick, nondet!(/** test */));
943    /// let batch_second_tick = process
944    ///   .source_iter(q!(vec![1, 2, 3]))
945    ///   .batch(&tick, nondet!(/** test */))
946    ///   .defer_tick(); // appears on the second tick
947    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
948    /// batch_first_tick.chain(batch_second_tick).count()
949    ///   .filter_if_none(some_on_first_tick)
950    ///   .all_ticks()
951    /// # }, |mut stream| async move {
952    /// // [3]
953    /// # for w in vec![3] {
954    /// #     assert_eq!(stream.next().await.unwrap(), w);
955    /// # }
956    /// # }));
957    /// # }
958    /// ```
959    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
960    pub fn filter_if_none<U>(
961        self,
962        other: Optional<U, L, B>,
963    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
964    where
965        B: IsBounded,
966    {
967        self.filter_if(other.is_none())
968    }
969
970    /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
971    ///
972    /// # Example
973    /// ```rust
974    /// # #[cfg(feature = "deploy")] {
975    /// # use hydro_lang::prelude::*;
976    /// # use futures::StreamExt;
977    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
978    /// let tick = process.tick();
979    /// let a = tick.singleton(q!(5));
980    /// let b = tick.singleton(q!(5));
981    /// a.equals(b).all_ticks()
982    /// # }, |mut stream| async move {
983    /// // [true]
984    /// # assert_eq!(stream.next().await.unwrap(), true);
985    /// # }));
986    /// # }
987    /// ```
988    pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
989    where
990        T: PartialEq,
991        B: IsBounded,
992    {
993        self.zip(other).map(q!(|(a, b)| a == b))
994    }
995
996    /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
997    /// greater than or equal to the provided threshold. The event will have the value of the
998    /// given threshold.
999    ///
1000    /// This requires the incoming singleton to be monotonic, because otherwise the detection of
1001    /// the threshold would be non-deterministic.
1002    ///
1003    /// # Example
1004    /// ```rust
1005    /// # #[cfg(feature = "deploy")] {
1006    /// # use hydro_lang::prelude::*;
1007    /// # use futures::StreamExt;
1008    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1009    /// let a = // singleton 1 ~> 5 ~> 10
1010    /// # process.singleton(q!(5));
1011    /// let b = process.singleton(q!(4));
1012    /// a.threshold_greater_or_equal(b)
1013    /// # }, |mut stream| async move {
1014    /// // [4]
1015    /// # assert_eq!(stream.next().await.unwrap(), 4);
1016    /// # }));
1017    /// # }
1018    /// ```
1019    pub fn threshold_greater_or_equal<B2: IsBounded>(
1020        self,
1021        threshold: Singleton<T, L, B2>,
1022    ) -> Stream<T, L, B::UnderlyingBound>
1023    where
1024        T: Clone + PartialOrd,
1025        B: IsMonotonic,
1026    {
1027        let threshold = threshold.make_bounded();
1028        let self_location = self.location().clone();
1029        match self.try_make_bounded() {
1030            Ok(bounded) => {
1031                let uncasted = threshold
1032                    .zip(bounded)
1033                    .into_stream()
1034                    .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
1035
1036                Stream::new(
1037                    uncasted.location.clone(),
1038                    uncasted.ir_node.replace(HydroNode::Placeholder),
1039                )
1040            }
1041            Err(me) => {
1042                let uncasted = sliced! {
1043                    let me = use(me, nondet!(/** thresholds are deterministic */));
1044                    let mut remaining_threshold = use::state(|l| {
1045                        let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
1046                        as_option
1047                    });
1048
1049                    let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
1050                    remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
1051                    passed.map(q!(|(t, _)| t))
1052                };
1053
1054                Stream::new(
1055                    self_location,
1056                    uncasted.ir_node.replace(HydroNode::Placeholder),
1057                )
1058            }
1059        }
1060    }
1061
1062    /// An operator which allows you to "name" a `HydroNode`.
1063    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1064    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
1065        {
1066            let mut node = self.ir_node.borrow_mut();
1067            let metadata = node.metadata_mut();
1068            metadata.tag = Some(name.to_owned());
1069        }
1070        self
1071    }
1072}
1073
1074impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
1075    type Output = Singleton<bool, L, B::UnderlyingBound>;
1076
1077    fn not(self) -> Self::Output {
1078        self.map(q!(|b| !b))
1079    }
1080}
1081
1082impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
1083where
1084    L: Location<'a>,
1085{
1086    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
1087    /// the inner `Option`.
1088    ///
1089    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
1090    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
1091    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
1092    ///
1093    /// # Example
1094    /// ```rust
1095    /// # #[cfg(feature = "deploy")] {
1096    /// # use hydro_lang::prelude::*;
1097    /// # use futures::StreamExt;
1098    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1099    /// let tick = process.tick();
1100    /// let singleton = tick.singleton(q!(Some(42)));
1101    /// singleton.into_optional().all_ticks()
1102    /// # }, |mut stream| async move {
1103    /// // 42
1104    /// # assert_eq!(stream.next().await.unwrap(), 42);
1105    /// # }));
1106    /// # }
1107    /// ```
1108    pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
1109        self.filter_map(q!(|v| v))
1110    }
1111}
1112
1113impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1114where
1115    L: Location<'a>,
1116{
1117    /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1118    ///
1119    /// # Example
1120    /// ```rust
1121    /// # #[cfg(feature = "deploy")] {
1122    /// # use hydro_lang::prelude::*;
1123    /// # use futures::StreamExt;
1124    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1125    /// let tick = process.tick();
1126    /// // ticks are lazy by default, forces the second tick to run
1127    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1128    ///
1129    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1130    /// let b = tick.singleton(q!(true)); // true, true
1131    /// a.and(b).all_ticks()
1132    /// # }, |mut stream| async move {
1133    /// // [true, false]
1134    /// # for w in vec![true, false] {
1135    /// #     assert_eq!(stream.next().await.unwrap(), w);
1136    /// # }
1137    /// # }));
1138    /// # }
1139    /// ```
1140    pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1141    where
1142        B: IsBounded,
1143    {
1144        self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1145    }
1146
1147    /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1148    ///
1149    /// # Example
1150    /// ```rust
1151    /// # #[cfg(feature = "deploy")] {
1152    /// # use hydro_lang::prelude::*;
1153    /// # use futures::StreamExt;
1154    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1155    /// let tick = process.tick();
1156    /// // ticks are lazy by default, forces the second tick to run
1157    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1158    ///
1159    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1160    /// let b = tick.singleton(q!(false)); // false, false
1161    /// a.or(b).all_ticks()
1162    /// # }, |mut stream| async move {
1163    /// // [true, false]
1164    /// # for w in vec![true, false] {
1165    /// #     assert_eq!(stream.next().await.unwrap(), w);
1166    /// # }
1167    /// # }));
1168    /// # }
1169    /// ```
1170    pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1171    where
1172        B: IsBounded,
1173    {
1174        self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1175    }
1176}
1177
1178impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1179where
1180    L: Location<'a>,
1181{
1182    /// Returns a singleton value corresponding to the latest snapshot of the singleton
1183    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1184    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1185    /// all snapshots of this singleton into the atomic-associated tick will observe the
1186    /// same value each tick.
1187    ///
1188    /// # Non-Determinism
1189    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1190    /// the output singleton has a non-deterministic value since the snapshot can be at an
1191    /// arbitrary point in time.
1192    pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1193        self,
1194        tick: &Tick<L2>,
1195        _nondet: NonDet,
1196    ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1197        Singleton::new(
1198            tick.drop_consistency(),
1199            HydroNode::Batch {
1200                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1201                metadata: tick
1202                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1203            },
1204        )
1205    }
1206
1207    /// Returns this singleton back into a top-level, asynchronous execution context where updates
1208    /// to the value will be asynchronously propagated.
1209    pub fn end_atomic(self) -> Singleton<T, L, B> {
1210        Singleton::new(
1211            self.location.tick.l.clone(),
1212            HydroNode::EndAtomic {
1213                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1214                metadata: self
1215                    .location
1216                    .tick
1217                    .l
1218                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1219            },
1220        )
1221    }
1222}
1223
1224impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1225where
1226    L: Location<'a>,
1227{
1228    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1229    /// will observe the same version of the value and will be executed synchronously before any
1230    /// outputs are yielded (in [`Optional::end_atomic`]).
1231    ///
1232    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1233    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1234    /// a different version).
1235    pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1236        let id = self.location.flow_state().borrow_mut().next_clock_id();
1237        let out_location = Atomic {
1238            tick: Tick {
1239                id,
1240                l: self.location.clone(),
1241            },
1242        };
1243        Singleton::new(
1244            out_location.clone(),
1245            HydroNode::BeginAtomic {
1246                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1247                metadata: out_location
1248                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1249            },
1250        )
1251    }
1252
1253    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1254    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1255    /// relevant data that contributed to the snapshot at tick `t`.
1256    ///
1257    /// # Non-Determinism
1258    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1259    /// the output singleton has a non-deterministic value since the snapshot can be at an
1260    /// arbitrary point in time.
1261    pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1262        self,
1263        tick: &Tick<L2>,
1264        _nondet: NonDet,
1265    ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1266        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1267        Singleton::new(
1268            tick.drop_consistency(),
1269            HydroNode::Batch {
1270                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1271                metadata: tick
1272                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1273            },
1274        )
1275    }
1276
1277    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1278    /// with order corresponding to increasing prefixes of data contributing to the singleton.
1279    ///
1280    /// # Non-Determinism
1281    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1282    /// to non-deterministic batching and arrival of inputs, the output stream is
1283    /// non-deterministic.
1284    pub fn sample_eager(
1285        self,
1286        nondet: NonDet,
1287    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1288        sliced! {
1289            let snapshot = use(self, nondet);
1290            snapshot.into_stream()
1291        }
1292        .weaken_retries()
1293    }
1294
1295    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1296    /// value taken at various points in time. Because the input singleton may be
1297    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1298    /// represent the value of the singleton given some prefix of the streams leading up to
1299    /// it.
1300    ///
1301    /// # Non-Determinism
1302    /// The output stream is non-deterministic in which elements are sampled, since this
1303    /// is controlled by a clock.
1304    #[cfg(feature = "tokio")]
1305    pub fn sample_every(
1306        self,
1307        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1308        nondet: NonDet,
1309    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1310    where
1311        L: TopLevel<'a>,
1312    {
1313        let samples = self.location.source_interval(interval);
1314        sliced! {
1315            let snapshot = use(self, nondet);
1316            let sample_batch = use(samples, nondet);
1317
1318            snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1319        }
1320        .weaken_retries()
1321    }
1322
1323    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1324    /// implies that `B == Bounded`.
1325    pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1326    where
1327        B: IsBounded,
1328    {
1329        Singleton::new(
1330            self.location.clone(),
1331            self.ir_node.replace(HydroNode::Placeholder),
1332        )
1333    }
1334
1335    #[expect(clippy::result_large_err, reason = "internal use only")]
1336    fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1337        if B::UnderlyingBound::BOUNDED {
1338            Ok(Singleton::new(
1339                self.location.clone(),
1340                self.ir_node.replace(HydroNode::Placeholder),
1341            ))
1342        } else {
1343            Err(self)
1344        }
1345    }
1346
1347    /// Clones this bounded singleton into a tick, returning a singleton that has the
1348    /// same value as the outer singleton. Because the outer singleton is bounded, this
1349    /// is deterministic because there is only a single immutable version.
1350    pub fn clone_into_tick<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1351        self,
1352        tick: &Tick<L2>,
1353    ) -> Singleton<T, Tick<L2>, Bounded>
1354    where
1355        B: IsBounded,
1356        T: Clone,
1357    {
1358        // TODO(shadaj): avoid printing simulator logs for this snapshot
1359        let inner = self.snapshot(
1360            tick,
1361            nondet!(/** bounded top-level singleton so deterministic */),
1362        );
1363        Singleton::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1364    }
1365
1366    /// Converts this singleton into a [`Stream`] containing a single element, the value.
1367    ///
1368    /// # Example
1369    /// ```rust
1370    /// # #[cfg(feature = "deploy")] {
1371    /// # use hydro_lang::prelude::*;
1372    /// # use futures::StreamExt;
1373    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1374    /// let tick = process.tick();
1375    /// let batch_input = process
1376    ///   .source_iter(q!(vec![123, 456]))
1377    ///   .batch(&tick, nondet!(/** test */));
1378    /// batch_input.clone().chain(
1379    ///   batch_input.count().into_stream()
1380    /// ).all_ticks()
1381    /// # }, |mut stream| async move {
1382    /// // [123, 456, 2]
1383    /// # for w in vec![123, 456, 2] {
1384    /// #     assert_eq!(stream.next().await.unwrap(), w);
1385    /// # }
1386    /// # }));
1387    /// # }
1388    /// ```
1389    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1390    where
1391        B: IsBounded,
1392    {
1393        Stream::new(
1394            self.location.clone(),
1395            HydroNode::Cast {
1396                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1397                metadata: self.location.new_node_metadata(Stream::<
1398                    T,
1399                    Tick<L>,
1400                    Bounded,
1401                    TotalOrder,
1402                    ExactlyOnce,
1403                >::collection_kind()),
1404            },
1405        )
1406    }
1407
1408    /// Resolves the singleton's [`Future`] value by blocking until it completes,
1409    /// producing a singleton of the resolved output.
1410    ///
1411    /// This is useful when the singleton contains an async computation that must
1412    /// be awaited before further processing. The future is polled to completion
1413    /// before the output value is emitted.
1414    ///
1415    /// # Example
1416    /// ```rust
1417    /// # #[cfg(feature = "deploy")] {
1418    /// # use hydro_lang::prelude::*;
1419    /// # use futures::StreamExt;
1420    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1421    /// let tick = process.tick();
1422    /// let singleton = tick.singleton(q!(5));
1423    /// singleton
1424    ///     .map(q!(|v| async move { v * 2 }))
1425    ///     .resolve_future_blocking()
1426    ///     .all_ticks()
1427    /// # }, |mut stream| async move {
1428    /// // 10
1429    /// # assert_eq!(stream.next().await.unwrap(), 10);
1430    /// # }));
1431    /// # }
1432    /// ```
1433    pub fn resolve_future_blocking(
1434        self,
1435    ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1436    where
1437        T: Future,
1438        B: IsBounded,
1439    {
1440        Singleton::new(
1441            self.location.clone(),
1442            HydroNode::ResolveFuturesBlocking {
1443                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1444                metadata: self
1445                    .location
1446                    .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1447            },
1448        )
1449    }
1450}
1451
1452impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1453where
1454    L: Location<'a>,
1455{
1456    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1457    /// which will stream the value computed in _each_ tick as a separate stream element.
1458    ///
1459    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1460    /// producing one element in the output for each tick. This is useful for batched computations,
1461    /// where the results from each tick must be combined together.
1462    ///
1463    /// # Example
1464    /// ```rust
1465    /// # #[cfg(feature = "deploy")] {
1466    /// # use hydro_lang::prelude::*;
1467    /// # use futures::StreamExt;
1468    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1469    /// let tick = process.tick();
1470    /// # // ticks are lazy by default, forces the second tick to run
1471    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1472    /// # let batch_first_tick = process
1473    /// #   .source_iter(q!(vec![1]))
1474    /// #   .batch(&tick, nondet!(/** test */));
1475    /// # let batch_second_tick = process
1476    /// #   .source_iter(q!(vec![1, 2, 3]))
1477    /// #   .batch(&tick, nondet!(/** test */))
1478    /// #   .defer_tick(); // appears on the second tick
1479    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1480    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1481    ///     .count()
1482    ///     .all_ticks()
1483    /// # }, |mut stream| async move {
1484    /// // [1, 3]
1485    /// # for w in vec![1, 3] {
1486    /// #     assert_eq!(stream.next().await.unwrap(), w);
1487    /// # }
1488    /// # }));
1489    /// # }
1490    /// ```
1491    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1492        self.into_stream().all_ticks()
1493    }
1494
1495    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1496    /// which will stream the value computed in _each_ tick as a separate stream element.
1497    ///
1498    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1499    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1500    /// singleton's [`Tick`] context.
1501    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1502        self.into_stream().all_ticks_atomic()
1503    }
1504
1505    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1506    /// be asynchronously updated with the latest value of the singleton inside the tick.
1507    ///
1508    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1509    /// tick that tracks the inner value. This is useful for getting the value as of the
1510    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1511    ///
1512    /// # Example
1513    /// ```rust
1514    /// # #[cfg(feature = "deploy")] {
1515    /// # use hydro_lang::prelude::*;
1516    /// # use futures::StreamExt;
1517    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1518    /// let tick = process.tick();
1519    /// # // ticks are lazy by default, forces the second tick to run
1520    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1521    /// # let batch_first_tick = process
1522    /// #   .source_iter(q!(vec![1]))
1523    /// #   .batch(&tick, nondet!(/** test */));
1524    /// # let batch_second_tick = process
1525    /// #   .source_iter(q!(vec![1, 2, 3]))
1526    /// #   .batch(&tick, nondet!(/** test */))
1527    /// #   .defer_tick(); // appears on the second tick
1528    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1529    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1530    ///     .count()
1531    ///     .latest()
1532    /// # .sample_eager(nondet!(/** test */))
1533    /// # }, |mut stream| async move {
1534    /// // asynchronously changes from 1 ~> 3
1535    /// # for w in vec![1, 3] {
1536    /// #     assert_eq!(stream.next().await.unwrap(), w);
1537    /// # }
1538    /// # }));
1539    /// # }
1540    /// ```
1541    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1542        Singleton::new(
1543            self.location.outer().clone(),
1544            HydroNode::YieldConcat {
1545                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1546                metadata: self
1547                    .location
1548                    .outer()
1549                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1550            },
1551        )
1552    }
1553
1554    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1555    /// be updated with the latest value of the singleton inside the tick.
1556    ///
1557    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1558    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1559    /// singleton's [`Tick`] context.
1560    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1561        let out_location = Atomic {
1562            tick: self.location.clone(),
1563        };
1564        Singleton::new(
1565            out_location.clone(),
1566            HydroNode::YieldConcat {
1567                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1568                metadata: out_location
1569                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1570            },
1571        )
1572    }
1573}
1574
1575#[doc(hidden)]
1576/// Helper trait that determines the output collection type for [`Singleton::zip`].
1577///
1578/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1579/// [`Singleton`].
1580#[sealed::sealed]
1581pub trait ZipResult<'a, Other> {
1582    /// The output collection type.
1583    type Out;
1584    /// The type of the tupled output value.
1585    type ElementType;
1586    /// The type of the other collection's value.
1587    type OtherType;
1588    /// The location where the tupled result will be materialized.
1589    type Location: Location<'a>;
1590
1591    /// The location of the second input to the `zip`.
1592    fn other_location(other: &Other) -> Self::Location;
1593    /// The IR node of the second input to the `zip`.
1594    fn other_ir_node(other: Other) -> HydroNode;
1595
1596    /// Constructs the output live collection given an IR node containing the zip result.
1597    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1598}
1599
1600#[sealed::sealed]
1601impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1602where
1603    L: Location<'a>,
1604{
1605    type Out = Singleton<(T, U), L, B>;
1606    type ElementType = (T, U);
1607    type OtherType = U;
1608    type Location = L;
1609
1610    fn other_location(other: &Singleton<U, L, B>) -> L {
1611        other.location.clone()
1612    }
1613
1614    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1615        other.ir_node.replace(HydroNode::Placeholder)
1616    }
1617
1618    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1619        Singleton::new(
1620            location.clone(),
1621            HydroNode::Cast {
1622                inner: Box::new(ir_node),
1623                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1624            },
1625        )
1626    }
1627}
1628
1629#[sealed::sealed]
1630impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1631    for Singleton<T, L, B>
1632where
1633    L: Location<'a>,
1634{
1635    type Out = Optional<(T, U), L, B::UnderlyingBound>;
1636    type ElementType = (T, U);
1637    type OtherType = U;
1638    type Location = L;
1639
1640    fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1641        other.location.clone()
1642    }
1643
1644    fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1645        other.ir_node.replace(HydroNode::Placeholder)
1646    }
1647
1648    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1649        Optional::new(location, ir_node)
1650    }
1651}
1652
1653#[cfg(test)]
1654mod tests {
1655    #[cfg(feature = "deploy")]
1656    use futures::{SinkExt, StreamExt};
1657    #[cfg(feature = "deploy")]
1658    use hydro_deploy::Deployment;
1659    #[cfg(any(feature = "deploy", feature = "sim"))]
1660    use stageleft::q;
1661
1662    #[cfg(any(feature = "deploy", feature = "sim"))]
1663    use crate::compile::builder::FlowBuilder;
1664    #[cfg(feature = "deploy")]
1665    use crate::live_collections::stream::ExactlyOnce;
1666    #[cfg(any(feature = "deploy", feature = "sim"))]
1667    use crate::location::Location;
1668    #[cfg(any(feature = "deploy", feature = "sim"))]
1669    use crate::nondet::nondet;
1670
1671    #[cfg(feature = "deploy")]
1672    #[tokio::test]
1673    async fn tick_cycle_cardinality() {
1674        let mut deployment = Deployment::new();
1675
1676        let mut flow = FlowBuilder::new();
1677        let node = flow.process::<()>();
1678        let external = flow.external::<()>();
1679
1680        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1681
1682        let node_tick = node.tick();
1683        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1684        let counts = singleton
1685            .clone()
1686            .into_stream()
1687            .count()
1688            .filter_if(
1689                input
1690                    .batch(&node_tick, nondet!(/** testing */))
1691                    .first()
1692                    .is_some(),
1693            )
1694            .all_ticks()
1695            .send_bincode_external(&external);
1696        complete_cycle.complete_next_tick(singleton);
1697
1698        let nodes = flow
1699            .with_process(&node, deployment.Localhost())
1700            .with_external(&external, deployment.Localhost())
1701            .deploy(&mut deployment);
1702
1703        deployment.deploy().await.unwrap();
1704
1705        let mut tick_trigger = nodes.connect(input_send).await;
1706        let mut external_out = nodes.connect(counts).await;
1707
1708        deployment.start().await.unwrap();
1709
1710        tick_trigger.send(()).await.unwrap();
1711
1712        assert_eq!(external_out.next().await.unwrap(), 1);
1713
1714        tick_trigger.send(()).await.unwrap();
1715
1716        assert_eq!(external_out.next().await.unwrap(), 1);
1717    }
1718
1719    #[cfg(feature = "sim")]
1720    #[test]
1721    #[should_panic]
1722    fn sim_fold_intermediate_states() {
1723        let mut flow = FlowBuilder::new();
1724        let node = flow.process::<()>();
1725
1726        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1727        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1728
1729        let tick = node.tick();
1730        let batch = folded.snapshot(&tick, nondet!(/** test */));
1731        let out_recv = batch.all_ticks().sim_output();
1732
1733        flow.sim().exhaustive(async || {
1734            assert_eq!(out_recv.next().await.unwrap(), 10);
1735        });
1736    }
1737
1738    #[cfg(feature = "sim")]
1739    #[test]
1740    fn sim_fold_intermediate_state_count() {
1741        let mut flow = FlowBuilder::new();
1742        let node = flow.process::<()>();
1743
1744        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1745        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1746
1747        let tick = node.tick();
1748        let batch = folded.snapshot(&tick, nondet!(/** test */));
1749        let out_recv = batch.all_ticks().sim_output();
1750
1751        let instance_count = flow.sim().exhaustive(async || {
1752            let out = out_recv.collect::<Vec<_>>().await;
1753            assert_eq!(out.last(), Some(&10));
1754        });
1755
1756        assert_eq!(
1757            instance_count,
1758            16 // 2^4 possible subsets of intermediates (including initial state)
1759        )
1760    }
1761
1762    #[cfg(feature = "sim")]
1763    #[test]
1764    fn sim_fold_no_repeat_initial() {
1765        // check that we don't repeat the initial state of the fold in autonomous decisions
1766
1767        let mut flow = FlowBuilder::new();
1768        let node = flow.process::<()>();
1769
1770        let (in_port, input) = node.sim_input();
1771        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1772
1773        let tick = node.tick();
1774        let batch = folded.snapshot(&tick, nondet!(/** test */));
1775        let out_recv = batch.all_ticks().sim_output();
1776
1777        flow.sim().exhaustive(async || {
1778            assert_eq!(out_recv.next().await.unwrap(), 0);
1779
1780            in_port.send(123);
1781
1782            assert_eq!(out_recv.next().await.unwrap(), 123);
1783        });
1784    }
1785
1786    #[cfg(feature = "sim")]
1787    #[test]
1788    #[should_panic]
1789    fn sim_fold_repeats_snapshots() {
1790        // when the tick is driven by a snapshot AND something else, the snapshot can
1791        // "stutter" and repeat the same state multiple times
1792
1793        let mut flow = FlowBuilder::new();
1794        let node = flow.process::<()>();
1795
1796        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1797        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1798
1799        let tick = node.tick();
1800        let batch = source
1801            .batch(&tick, nondet!(/** test */))
1802            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1803        let out_recv = batch.all_ticks().sim_output();
1804
1805        flow.sim().exhaustive(async || {
1806            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1807            {
1808                panic!("repeated snapshot");
1809            }
1810        });
1811    }
1812
1813    #[cfg(feature = "sim")]
1814    #[test]
1815    fn sim_fold_repeats_snapshots_count() {
1816        // check the number of instances
1817        let mut flow = FlowBuilder::new();
1818        let node = flow.process::<()>();
1819
1820        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1821        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1822
1823        let tick = node.tick();
1824        let batch = source
1825            .batch(&tick, nondet!(/** test */))
1826            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1827        let out_recv = batch.all_ticks().sim_output();
1828
1829        let count = flow.sim().exhaustive(async || {
1830            let _ = out_recv.collect::<Vec<_>>().await;
1831        });
1832
1833        assert_eq!(count, 52);
1834        // don't have a combinatorial explanation for this number yet, but checked via logs
1835    }
1836
1837    #[cfg(feature = "sim")]
1838    #[test]
1839    fn sim_top_level_singleton_exhaustive() {
1840        // ensures that top-level singletons have only one snapshot
1841        let mut flow = FlowBuilder::new();
1842        let node = flow.process::<()>();
1843
1844        let singleton = node.singleton(q!(1));
1845        let tick = node.tick();
1846        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1847        let out_recv = batch.all_ticks().sim_output();
1848
1849        let count = flow.sim().exhaustive(async || {
1850            let _ = out_recv.collect::<Vec<_>>().await;
1851        });
1852
1853        assert_eq!(count, 1);
1854    }
1855
1856    #[cfg(feature = "sim")]
1857    #[test]
1858    fn sim_top_level_singleton_join_count() {
1859        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1860        // exploration
1861
1862        let mut flow = FlowBuilder::new();
1863        let node = flow.process::<()>();
1864
1865        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1866        let tick = node.tick();
1867        let batch = source_iter
1868            .batch(&tick, nondet!(/** test */))
1869            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1870        let out_recv = batch.all_ticks().sim_output();
1871
1872        let instance_count = flow.sim().exhaustive(async || {
1873            let _ = out_recv.collect::<Vec<_>>().await;
1874        });
1875
1876        assert_eq!(
1877            instance_count,
1878            16 // 2^4 ways to split up (including a possibly empty first batch)
1879        )
1880    }
1881
1882    #[cfg(feature = "sim")]
1883    #[test]
1884    fn top_level_singleton_into_stream_no_replay() {
1885        let mut flow = FlowBuilder::new();
1886        let node = flow.process::<()>();
1887
1888        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1889        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1890
1891        let out_recv = folded.into_stream().sim_output();
1892
1893        flow.sim().exhaustive(async || {
1894            out_recv.assert_yields_only([10]).await;
1895        });
1896    }
1897
1898    #[cfg(feature = "sim")]
1899    #[test]
1900    fn inside_tick_singleton_zip() {
1901        use crate::live_collections::Stream;
1902        use crate::live_collections::sliced::sliced;
1903
1904        let mut flow = FlowBuilder::new();
1905        let node = flow.process::<()>();
1906
1907        let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1908        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1909
1910        let out_recv = sliced! {
1911            let v = use(folded, nondet!(/** test */));
1912            v.clone().zip(v).into_stream()
1913        }
1914        .sim_output();
1915
1916        let count = flow.sim().exhaustive(async || {
1917            let out = out_recv.collect::<Vec<_>>().await;
1918            assert_eq!(out.last(), Some(&(3, 3)));
1919        });
1920
1921        assert_eq!(count, 4);
1922    }
1923
1924    /// Reproducer for simulator hang when using cross_singleton on a top-level
1925    /// unbounded stream (not inside sliced!). The exhaustive simulator hangs
1926    /// after the first iteration.
1927    #[cfg(feature = "sim")]
1928    #[test]
1929    fn sim_cross_singleton_top_level_unbounded_hang() {
1930        let mut flow = FlowBuilder::new();
1931        let node = flow.process::<()>();
1932
1933        let (cmd_port, input) = node.sim_input::<String, _, _>();
1934
1935        let top_level_singleton = node.singleton(q!(123));
1936
1937        // cross_singleton on a top-level stream - bug trigger
1938        let crossed = input.cross_singleton(top_level_singleton);
1939
1940        // Output directly
1941        let resp_port = crossed.sim_output();
1942
1943        let count = flow.sim().exhaustive(async || {
1944            cmd_port.send("abc".to_owned());
1945
1946            let responses: Vec<_> = resp_port.collect().await;
1947            assert!(!responses.is_empty());
1948        });
1949
1950        assert_eq!(count, 1);
1951    }
1952
1953    #[cfg(feature = "sim")]
1954    #[test]
1955    fn sim_top_level_singleton_state_count() {
1956        let mut flow = FlowBuilder::new();
1957        let process = flow.process::<()>();
1958
1959        let (cmd_port, input) = process.sim_input();
1960        {
1961            // increases exhaustive inputs from 1 to 2 before we optimized `From`
1962            use super::Singleton;
1963            use crate::live_collections::boundedness::Unbounded;
1964            let _singleton: Singleton<_, _, Unbounded> = process.singleton(q!(false)).into();
1965        }
1966        let tick = process.tick();
1967        let batched_unbatched = input.batch(&tick, nondet!(/** */)).all_ticks();
1968        let resp_port = batched_unbatched.sim_output();
1969
1970        let count = flow.sim().exhaustive(async || {
1971            cmd_port.send(());
1972            let _responses: Vec<_> = resp_port.collect().await;
1973        });
1974
1975        assert_eq!(count, 1);
1976    }
1977
1978    /// Regression test for #2939: singleton mut access-group counter resets per root.
1979    /// Two sequential `by_mut` captures on the same singleton, consumed by separate
1980    /// `for_each` roots, should get distinct access groups and build successfully.
1981    #[cfg(feature = "sim")]
1982    #[test]
1983    #[expect(unused_mut, reason = "sliced! macro generates mut bindings for state")]
1984    fn sim_mut_access_group_across_roots() {
1985        use crate::live_collections::sliced::sliced;
1986
1987        let mut flow = FlowBuilder::new();
1988        let node = flow.process::<()>();
1989
1990        let source = node.source_iter(q!(vec![1i32, 2, 3]));
1991
1992        let (first, second) = sliced! {
1993            let batch = use(source, nondet!(/** test */));
1994            let mut total = use::state(|l| l.singleton(q!(0i32)));
1995            let total_mut = total.by_mut();
1996
1997            let first = batch.clone().map(q!(|x| {
1998                *total_mut += x;
1999                *total_mut
2000            }));
2001            let second = batch.map(q!(|x| {
2002                *total_mut += x;
2003                *total_mut
2004            }));
2005            (first, second)
2006        };
2007
2008        let first_recv = first.sim_output();
2009        let second_recv = second.sim_output();
2010
2011        flow.sim().exhaustive(async || {
2012            // Both outputs should produce values without panicking.
2013            // The exact values depend on ordering, but the graph must build.
2014            let _first: Vec<i32> = first_recv.collect().await;
2015            let _second: Vec<i32> = second_recv.collect().await;
2016        });
2017    }
2018
2019    /// Regression test for #2940: access groups must follow code (staging) order,
2020    /// not IR traversal order. When `second.chain(first)` reverses the consumption
2021    /// order, the mutations must still execute in the order they were staged.
2022    #[cfg(feature = "sim")]
2023    #[test]
2024    #[expect(unused_mut, reason = "sliced! macro generates mut bindings for state")]
2025    fn sim_mut_access_groups_follow_code_order() {
2026        use crate::live_collections::sliced::sliced;
2027
2028        let mut flow = FlowBuilder::new();
2029        let node = flow.process::<()>();
2030
2031        let source = node.source_iter(q!(vec![3i32]));
2032
2033        let out_recv = sliced! {
2034            let batch = use(source, nondet!(/** test */));
2035            let mut total = use::state(|l| l.singleton(q!(0i32)));
2036            let total_mut = total.by_mut();
2037
2038            // Defined FIRST in code: addition
2039            let first = batch.clone().map(q!(|x| {
2040                *total_mut += x;
2041                *total_mut
2042            }));
2043            // Defined SECOND in code: doubling
2044            let second = batch.map(q!(|_x| {
2045                *total_mut *= 2;
2046                *total_mut
2047            }));
2048            // Chain in OPPOSITE order of definition — must not affect mutation order.
2049            second.chain(first)
2050        }
2051        .sim_output();
2052
2053        flow.sim().exhaustive(async || {
2054            let results: Vec<i32> = out_recv.collect().await;
2055            // Code-order semantics: first runs (total = 0 + 3 = 3), then second
2056            // runs (total = 3 * 2 = 6). Output is second.chain(first) => [6, 3].
2057            assert_eq!(results, vec![6, 3]);
2058        });
2059    }
2060}