Skip to main content

hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(feature = "tokio")]
21use crate::location::TopLevel;
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, DeferTick};
25use crate::location::{Location, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::prelude::KeyedSingleton;
28use crate::properties::{StreamMapFuncAlgebra, ValidMutCommutativityFor, ValidMutIdempotenceFor};
29
30/// A *nullable* Rust value that can asynchronously change over time.
31///
32/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
33/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
34/// asynchronously change over time, including becoming present of uninhabited.
35///
36/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
37/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
38///
39/// Type Parameters:
40/// - `Type`: the type of the value in this optional (when it is not null)
41/// - `Loc`: the [`Location`] where the optional is materialized
42/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
43pub struct Optional<Type, Loc, Bound: Boundedness> {
44    pub(crate) location: Loc,
45    pub(crate) ir_node: RefCell<HydroNode>,
46    pub(crate) flow_state: FlowState,
47
48    _phantom: PhantomData<(Type, Loc, Bound)>,
49}
50
51impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
52    fn drop(&mut self) {
53        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
54        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
55            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
56                input: Box::new(ir_node),
57                op_metadata: HydroIrOpMetadata::new(),
58            });
59        }
60    }
61}
62
63impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
64where
65    T: Clone,
66    L: Location<'a>,
67{
68    fn from(value: Optional<T, L, Bounded>) -> Self {
69        let tick = value.location().tick();
70        value.clone_into_tick(&tick).latest()
71    }
72}
73
74impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
75where
76    L: Location<'a>,
77{
78    fn defer_tick(self) -> Self {
79        Optional::defer_tick(self)
80    }
81}
82
83impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
84where
85    L: Location<'a>,
86{
87    type Location = Tick<L>;
88
89    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
90        Optional::new(
91            location.clone(),
92            HydroNode::CycleSource {
93                cycle_id,
94                metadata: location.new_node_metadata(Self::collection_kind()),
95            },
96        )
97    }
98}
99
100impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
101where
102    L: Location<'a>,
103{
104    type Location = Tick<L>;
105
106    fn location(&self) -> &Self::Location {
107        self.location()
108    }
109
110    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
111        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
112            location.clone(),
113            HydroNode::DeferTick {
114                input: Box::new(HydroNode::CycleSource {
115                    cycle_id,
116                    metadata: location.new_node_metadata(Self::collection_kind()),
117                }),
118                metadata: location
119                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
120            },
121        );
122
123        from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
124    }
125}
126
127impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
128where
129    L: Location<'a>,
130{
131    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
132        assert_eq!(
133            Location::id(&self.location),
134            expected_location,
135            "locations do not match"
136        );
137        self.location
138            .flow_state()
139            .borrow_mut()
140            .push_root(HydroRoot::CycleSink {
141                cycle_id,
142                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
143                op_metadata: HydroIrOpMetadata::new(),
144            });
145    }
146}
147
148impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
149where
150    L: Location<'a>,
151{
152    type Location = L;
153
154    fn create_source(cycle_id: CycleId, location: L) -> Self {
155        Optional::new(
156            location.clone(),
157            HydroNode::CycleSource {
158                cycle_id,
159                metadata: location.new_node_metadata(Self::collection_kind()),
160            },
161        )
162    }
163}
164
165impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
166where
167    L: Location<'a>,
168{
169    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
170        assert_eq!(
171            Location::id(&self.location),
172            expected_location,
173            "locations do not match"
174        );
175        self.location
176            .flow_state()
177            .borrow_mut()
178            .push_root(HydroRoot::CycleSink {
179                cycle_id,
180                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
181                op_metadata: HydroIrOpMetadata::new(),
182            });
183    }
184}
185
186impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
187where
188    L: Location<'a>,
189{
190    fn from(singleton: Singleton<T, L, B>) -> Self {
191        Optional::new(
192            singleton.location.clone(),
193            HydroNode::Cast {
194                inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
195                metadata: singleton
196                    .location
197                    .new_node_metadata(Self::collection_kind()),
198            },
199        )
200    }
201}
202
203#[cfg(stageleft_runtime)]
204pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
205    me: Optional<T, L, B>,
206    other: Optional<O, L, B>,
207) -> Optional<(T, O), L, B> {
208    check_matching_location(&me.location, &other.location);
209
210    Optional::new(
211        me.location.clone(),
212        HydroNode::CrossSingleton {
213            left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
214            right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
215            metadata: me
216                .location
217                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
218        },
219    )
220}
221
222#[cfg(stageleft_runtime)]
223fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
224    me: Optional<T, L, B>,
225    other: Optional<T, L, B>,
226) -> Optional<T, L, B> {
227    check_matching_location(&me.location, &other.location);
228
229    Optional::new(
230        me.location.clone(),
231        HydroNode::ChainFirst {
232            first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
233            second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
234            metadata: me
235                .location
236                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
237        },
238    )
239}
240
241impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
242where
243    T: Clone,
244    L: Location<'a>,
245{
246    fn clone(&self) -> Self {
247        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
248            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
249            *self.ir_node.borrow_mut() = HydroNode::Tee {
250                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
251                metadata: self.location.new_node_metadata(Self::collection_kind()),
252            };
253        }
254
255        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
256            Optional {
257                location: self.location.clone(),
258                flow_state: self.flow_state.clone(),
259                ir_node: HydroNode::Tee {
260                    inner: SharedNode(inner.0.clone()),
261                    metadata: metadata.clone(),
262                }
263                .into(),
264                _phantom: PhantomData,
265            }
266        } else {
267            unreachable!()
268        }
269    }
270}
271
272impl<'a, T, L, B: Boundedness> Optional<T, L, B>
273where
274    L: Location<'a>,
275{
276    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
277        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
278        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
279        let flow_state = location.flow_state().clone();
280        Optional {
281            location,
282            flow_state,
283            ir_node: RefCell::new(ir_node),
284            _phantom: PhantomData,
285        }
286    }
287
288    pub(crate) fn collection_kind() -> CollectionKind {
289        CollectionKind::Optional {
290            bound: B::BOUND_KIND,
291            element_type: stageleft::quote_type::<T>().into(),
292        }
293    }
294
295    /// Returns the [`Location`] where this optional is being materialized.
296    pub fn location(&self) -> &L {
297        &self.location
298    }
299
300    /// Creates a shared reference handle to this optional that can be captured inside `q!()`
301    /// closures. The handle resolves to `&Option<T>` at runtime.
302    ///
303    /// The optional must be bounded, otherwise reading it would be non-deterministic.
304    pub fn by_ref(&self) -> crate::handoff_ref::OptionalRef<'a, '_, T, L>
305    where
306        B: IsBounded,
307    {
308        crate::handoff_ref::OptionalRef::new(&self.ir_node)
309    }
310
311    /// Returns a mutable reference handle to this optional that can be captured inside `q!()`
312    /// closures. The handle resolves to `&mut Option<T>` at runtime.
313    pub fn by_mut(&self) -> crate::handoff_ref::OptionalMut<'a, '_, T, L>
314    where
315        B: IsBounded,
316    {
317        crate::handoff_ref::OptionalMut::new(&self.ir_node)
318    }
319
320    /// Weakens the consistency of this live collection to not guarantee any consistency across
321    /// cluster members (if this collection is on a cluster).
322    pub fn weaken_consistency(self) -> Optional<T, L::DropConsistency, B>
323    where
324        L: Location<'a>,
325    {
326        if L::consistency()
327            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
328        {
329            // already no consistency
330            Optional::new(
331                self.location.drop_consistency(),
332                self.ir_node.replace(HydroNode::Placeholder),
333            )
334        } else {
335            Optional::new(
336                self.location.drop_consistency(),
337                HydroNode::Cast {
338                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
339                    metadata: self
340                        .location
341                        .clone()
342                        .drop_consistency()
343                        .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
344                },
345            )
346        }
347    }
348
349    /// Casts this live collection to have the consistency guarantees specified in the given
350    /// location type parameter. The developer must ensure that the strengthened consistency
351    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
352    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
353        self,
354        _proof: impl crate::properties::ConsistencyProof,
355    ) -> Optional<T, L2, B>
356    where
357        L: Location<'a>,
358    {
359        if L::consistency() == L2::consistency() {
360            Optional::new(
361                self.location.with_consistency_of(),
362                self.ir_node.replace(HydroNode::Placeholder),
363            )
364        } else {
365            Optional::new(
366                self.location.with_consistency_of(),
367                HydroNode::AssertIsConsistent {
368                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
369                    trusted: false,
370                    metadata: self
371                        .location
372                        .clone()
373                        .with_consistency_of::<L2>()
374                        .new_node_metadata(Optional::<T, L2, B>::collection_kind()),
375                },
376            )
377        }
378    }
379
380    /// Transforms the optional value by applying a function `f` to it,
381    /// continuously as the input is updated.
382    ///
383    /// Whenever the optional is empty, the output optional is also empty.
384    ///
385    /// # Example
386    /// ```rust
387    /// # #[cfg(feature = "deploy")] {
388    /// # use hydro_lang::prelude::*;
389    /// # use futures::StreamExt;
390    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
391    /// let tick = process.tick();
392    /// let optional = tick.optional_first_tick(q!(1));
393    /// optional.map(q!(|v| v + 1)).all_ticks()
394    /// # }, |mut stream| async move {
395    /// // 2
396    /// # assert_eq!(stream.next().await.unwrap(), 2);
397    /// # }));
398    /// # }
399    /// ```
400    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
401    where
402        F: Fn(T) -> U + 'a,
403    {
404        let f = f.splice_fn1_ctx(&self.location).into();
405        Optional::new(
406            self.location.clone(),
407            HydroNode::Map {
408                f,
409                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
410                metadata: self
411                    .location
412                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
413            },
414        )
415    }
416
417    /// Transforms the optional value by applying a function `f` to it and then flattening
418    /// the result into a stream, preserving the order of elements.
419    ///
420    /// If the optional is empty, the output stream is also empty. If the optional contains
421    /// a value, `f` is applied to produce an iterator, and all items from that iterator
422    /// are emitted in the output stream in deterministic order.
423    ///
424    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
425    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
426    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
427    ///
428    /// # Example
429    /// ```rust
430    /// # #[cfg(feature = "deploy")] {
431    /// # use hydro_lang::prelude::*;
432    /// # use futures::StreamExt;
433    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
434    /// let tick = process.tick();
435    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
436    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
437    /// # }, |mut stream| async move {
438    /// // 1, 2, 3
439    /// # for w in vec![1, 2, 3] {
440    /// #     assert_eq!(stream.next().await.unwrap(), w);
441    /// # }
442    /// # }));
443    /// # }
444    /// ```
445    pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
446        self,
447        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
448    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
449    where
450        B: IsBounded,
451        I: IntoIterator<Item = U>,
452        F: FnMut(T) -> I + 'a,
453        C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
454        Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
455    {
456        self.into_stream().flat_map_ordered(f)
457    }
458
459    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
460    /// for the output type `I` to produce items in any order.
461    ///
462    /// If the optional is empty, the output stream is also empty. If the optional contains
463    /// a value, `f` is applied to produce an iterator, and all items from that iterator
464    /// are emitted in the output stream in non-deterministic order.
465    ///
466    /// # Example
467    /// ```rust
468    /// # #[cfg(feature = "deploy")] {
469    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
470    /// # use futures::StreamExt;
471    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
472    /// let tick = process.tick();
473    /// let optional = tick.optional_first_tick(q!(
474    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
475    /// ));
476    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
477    /// # }, |mut stream| async move {
478    /// // 1, 2, 3, but in no particular order
479    /// # let mut results = Vec::new();
480    /// # for _ in 0..3 {
481    /// #     results.push(stream.next().await.unwrap());
482    /// # }
483    /// # results.sort();
484    /// # assert_eq!(results, vec![1, 2, 3]);
485    /// # }));
486    /// # }
487    /// ```
488    pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
489        self,
490        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
491    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
492    where
493        B: IsBounded,
494        I: IntoIterator<Item = U>,
495        F: FnMut(T) -> I + 'a,
496        C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
497        Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
498    {
499        self.into_stream().flat_map_unordered(f)
500    }
501
502    /// Flattens the optional value into a stream, preserving the order of elements.
503    ///
504    /// If the optional is empty, the output stream is also empty. If the optional contains
505    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
506    /// in the output stream in deterministic order.
507    ///
508    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
509    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
510    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
511    ///
512    /// # Example
513    /// ```rust
514    /// # #[cfg(feature = "deploy")] {
515    /// # use hydro_lang::prelude::*;
516    /// # use futures::StreamExt;
517    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
518    /// let tick = process.tick();
519    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
520    /// optional.flatten_ordered().all_ticks()
521    /// # }, |mut stream| async move {
522    /// // 1, 2, 3
523    /// # for w in vec![1, 2, 3] {
524    /// #     assert_eq!(stream.next().await.unwrap(), w);
525    /// # }
526    /// # }));
527    /// # }
528    /// ```
529    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
530    where
531        B: IsBounded,
532        T: IntoIterator<Item = U>,
533    {
534        self.flat_map_ordered(q!(|v| v))
535    }
536
537    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
538    /// for the element type `T` to produce items in any order.
539    ///
540    /// If the optional is empty, the output stream is also empty. If the optional contains
541    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
542    /// in the output stream in non-deterministic order.
543    ///
544    /// # Example
545    /// ```rust
546    /// # #[cfg(feature = "deploy")] {
547    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
548    /// # use futures::StreamExt;
549    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
550    /// let tick = process.tick();
551    /// let optional = tick.optional_first_tick(q!(
552    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
553    /// ));
554    /// optional.flatten_unordered().all_ticks()
555    /// # }, |mut stream| async move {
556    /// // 1, 2, 3, but in no particular order
557    /// # let mut results = Vec::new();
558    /// # for _ in 0..3 {
559    /// #     results.push(stream.next().await.unwrap());
560    /// # }
561    /// # results.sort();
562    /// # assert_eq!(results, vec![1, 2, 3]);
563    /// # }));
564    /// # }
565    /// ```
566    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
567    where
568        B: IsBounded,
569        T: IntoIterator<Item = U>,
570    {
571        self.flat_map_unordered(q!(|v| v))
572    }
573
574    /// Creates an optional containing only the value if it satisfies a predicate `f`.
575    ///
576    /// If the optional is empty, the output optional is also empty. If the optional contains
577    /// a value and the predicate returns `true`, the output optional contains the same value.
578    /// If the predicate returns `false`, the output optional is empty.
579    ///
580    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
581    /// not modify or take ownership of the value. If you need to modify the value while filtering
582    /// use [`Optional::filter_map`] instead.
583    ///
584    /// # Example
585    /// ```rust
586    /// # #[cfg(feature = "deploy")] {
587    /// # use hydro_lang::prelude::*;
588    /// # use futures::StreamExt;
589    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
590    /// let tick = process.tick();
591    /// let optional = tick.optional_first_tick(q!(5));
592    /// optional.filter(q!(|&x| x > 3)).all_ticks()
593    /// # }, |mut stream| async move {
594    /// // 5
595    /// # assert_eq!(stream.next().await.unwrap(), 5);
596    /// # }));
597    /// # }
598    /// ```
599    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
600    where
601        F: Fn(&T) -> bool + 'a,
602    {
603        let f = f.splice_fn1_borrow_ctx(&self.location).into();
604        Optional::new(
605            self.location.clone(),
606            HydroNode::Filter {
607                f,
608                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
609                metadata: self.location.new_node_metadata(Self::collection_kind()),
610            },
611        )
612    }
613
614    /// An operator that both filters and maps. It yields only the value if the supplied
615    /// closure `f` returns `Some(value)`.
616    ///
617    /// If the optional is empty, the output optional is also empty. If the optional contains
618    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
619    /// If the closure returns `None`, the output optional is empty.
620    ///
621    /// # Example
622    /// ```rust
623    /// # #[cfg(feature = "deploy")] {
624    /// # use hydro_lang::prelude::*;
625    /// # use futures::StreamExt;
626    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
627    /// let tick = process.tick();
628    /// let optional = tick.optional_first_tick(q!("42"));
629    /// optional
630    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
631    ///     .all_ticks()
632    /// # }, |mut stream| async move {
633    /// // 42
634    /// # assert_eq!(stream.next().await.unwrap(), 42);
635    /// # }));
636    /// # }
637    /// ```
638    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
639    where
640        F: Fn(T) -> Option<U> + 'a,
641    {
642        let f = f.splice_fn1_ctx(&self.location).into();
643        Optional::new(
644            self.location.clone(),
645            HydroNode::FilterMap {
646                f,
647                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
648                metadata: self
649                    .location
650                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
651            },
652        )
653    }
654
655    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
656    ///
657    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
658    /// non-null. This is useful for combining several pieces of state together.
659    ///
660    /// # Example
661    /// ```rust
662    /// # #[cfg(feature = "deploy")] {
663    /// # use hydro_lang::prelude::*;
664    /// # use futures::StreamExt;
665    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
666    /// let tick = process.tick();
667    /// let numbers = process
668    ///   .source_iter(q!(vec![123, 456, 789]))
669    ///   .batch(&tick, nondet!(/** test */));
670    /// let min = numbers.clone().min(); // Optional
671    /// let max = numbers.max(); // Optional
672    /// min.zip(max).all_ticks()
673    /// # }, |mut stream| async move {
674    /// // [(123, 789)]
675    /// # for w in vec![(123, 789)] {
676    /// #     assert_eq!(stream.next().await.unwrap(), w);
677    /// # }
678    /// # }));
679    /// # }
680    /// ```
681    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
682    where
683        B: IsBounded,
684    {
685        let other: Optional<O, L, B> = other.into();
686        check_matching_location(&self.location, &other.location);
687
688        if L::is_top_level()
689            && let Some(tick) = self.location.try_tick()
690        {
691            let self_location = self.location().clone();
692            let out = zip_inside_tick(
693                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
694                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
695            )
696            .latest();
697
698            Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
699        } else {
700            zip_inside_tick(self, other)
701        }
702    }
703
704    /// Passes through `self` when it has a value, otherwise passes through `other`.
705    ///
706    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
707    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
708    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
709    ///
710    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
711    /// of the inputs change (including to/from null states).
712    ///
713    /// # Example
714    /// ```rust
715    /// # #[cfg(feature = "deploy")] {
716    /// # use hydro_lang::prelude::*;
717    /// # use futures::StreamExt;
718    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
719    /// let tick = process.tick();
720    /// // ticks are lazy by default, forces the second tick to run
721    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
722    ///
723    /// let some_first_tick = tick.optional_first_tick(q!(123));
724    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
725    /// some_first_tick.or(some_second_tick).all_ticks()
726    /// # }, |mut stream| async move {
727    /// // [123 /* first tick */, 456 /* second tick */]
728    /// # for w in vec![123, 456] {
729    /// #     assert_eq!(stream.next().await.unwrap(), w);
730    /// # }
731    /// # }));
732    /// # }
733    /// ```
734    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
735        check_matching_location(&self.location, &other.location);
736
737        if L::is_top_level()
738            && !B::BOUNDED // only if unbounded we need to use a tick
739            && let Some(tick) = self.location.try_tick()
740        {
741            let self_location = self.location().clone();
742            let out = or_inside_tick(
743                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
744                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
745            )
746            .latest();
747
748            Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
749        } else {
750            Optional::new(
751                self.location.clone(),
752                HydroNode::ChainFirst {
753                    first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
754                    second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
755                    metadata: self.location.new_node_metadata(Self::collection_kind()),
756                },
757            )
758        }
759    }
760
761    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
762    ///
763    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
764    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
765    ///
766    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
767    /// of the inputs change (including to/from null states).
768    ///
769    /// # Example
770    /// ```rust
771    /// # #[cfg(feature = "deploy")] {
772    /// # use hydro_lang::prelude::*;
773    /// # use futures::StreamExt;
774    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
775    /// let tick = process.tick();
776    /// // ticks are lazy by default, forces the later ticks to run
777    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
778    ///
779    /// let some_first_tick = tick.optional_first_tick(q!(123));
780    /// some_first_tick
781    ///     .unwrap_or(tick.singleton(q!(456)))
782    ///     .all_ticks()
783    /// # }, |mut stream| async move {
784    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
785    /// # for w in vec![123, 456, 456, 456] {
786    /// #     assert_eq!(stream.next().await.unwrap(), w);
787    /// # }
788    /// # }));
789    /// # }
790    /// ```
791    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
792        let res_option = self.or(other.into());
793        Singleton::new(
794            res_option.location.clone(),
795            HydroNode::Cast {
796                inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
797                metadata: res_option
798                    .location
799                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
800            },
801        )
802    }
803
804    /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
805    ///
806    /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
807    /// [`Optional`] when the default value of the type is a suitable fallback.
808    ///
809    /// # Example
810    /// ```rust
811    /// # #[cfg(feature = "deploy")] {
812    /// # use hydro_lang::prelude::*;
813    /// # use futures::StreamExt;
814    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
815    /// let tick = process.tick();
816    /// // ticks are lazy by default, forces the later ticks to run
817    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
818    ///
819    /// let some_first_tick = tick.optional_first_tick(q!(123i32));
820    /// some_first_tick.unwrap_or_default().all_ticks()
821    /// # }, |mut stream| async move {
822    /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
823    /// # for w in vec![123, 0, 0, 0] {
824    /// #     assert_eq!(stream.next().await.unwrap(), w);
825    /// # }
826    /// # }));
827    /// # }
828    /// ```
829    pub fn unwrap_or_default(self) -> Singleton<T, L, B>
830    where
831        T: Default + Clone,
832    {
833        self.into_singleton().map(q!(|v| v.unwrap_or_default()))
834    }
835
836    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
837    ///
838    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
839    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
840    /// so that Hydro can skip any computation on null values.
841    ///
842    /// # Example
843    /// ```rust
844    /// # #[cfg(feature = "deploy")] {
845    /// # use hydro_lang::prelude::*;
846    /// # use futures::StreamExt;
847    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
848    /// let tick = process.tick();
849    /// // ticks are lazy by default, forces the later ticks to run
850    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
851    ///
852    /// let some_first_tick = tick.optional_first_tick(q!(123));
853    /// some_first_tick.into_singleton().all_ticks()
854    /// # }, |mut stream| async move {
855    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
856    /// # for w in vec![Some(123), None, None, None] {
857    /// #     assert_eq!(stream.next().await.unwrap(), w);
858    /// # }
859    /// # }));
860    /// # }
861    /// ```
862    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
863    where
864        T: Clone,
865    {
866        let none: syn::Expr = parse_quote!(::std::option::Option::None);
867
868        let none_singleton = Singleton::new(
869            self.location.clone(),
870            HydroNode::SingletonSource {
871                value: none.into(),
872                first_tick_only: false,
873                metadata: self
874                    .location
875                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
876            },
877        );
878
879        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
880    }
881
882    /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
883    ///
884    /// # Example
885    /// ```rust
886    /// # #[cfg(feature = "deploy")] {
887    /// # use hydro_lang::prelude::*;
888    /// # use futures::StreamExt;
889    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
890    /// let tick = process.tick();
891    /// // ticks are lazy by default, forces the second tick to run
892    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
893    ///
894    /// let some_first_tick = tick.optional_first_tick(q!(42));
895    /// some_first_tick.is_some().all_ticks()
896    /// # }, |mut stream| async move {
897    /// // [true /* first tick */, false /* second tick */, ...]
898    /// # for w in vec![true, false] {
899    /// #     assert_eq!(stream.next().await.unwrap(), w);
900    /// # }
901    /// # }));
902    /// # }
903    /// ```
904    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
905    pub fn is_some(self) -> Singleton<bool, L, B> {
906        self.map(q!(|_| ()))
907            .into_singleton()
908            .map(q!(|o| o.is_some()))
909    }
910
911    /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
912    ///
913    /// # Example
914    /// ```rust
915    /// # #[cfg(feature = "deploy")] {
916    /// # use hydro_lang::prelude::*;
917    /// # use futures::StreamExt;
918    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
919    /// let tick = process.tick();
920    /// // ticks are lazy by default, forces the second tick to run
921    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
922    ///
923    /// let some_first_tick = tick.optional_first_tick(q!(42));
924    /// some_first_tick.is_none().all_ticks()
925    /// # }, |mut stream| async move {
926    /// // [false /* first tick */, true /* second tick */, ...]
927    /// # for w in vec![false, true] {
928    /// #     assert_eq!(stream.next().await.unwrap(), w);
929    /// # }
930    /// # }));
931    /// # }
932    /// ```
933    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
934    pub fn is_none(self) -> Singleton<bool, L, B> {
935        self.map(q!(|_| ()))
936            .into_singleton()
937            .map(q!(|o| o.is_none()))
938    }
939
940    /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
941    /// values are equal, `false` otherwise (including when either is null).
942    ///
943    /// # Example
944    /// ```rust
945    /// # #[cfg(feature = "deploy")] {
946    /// # use hydro_lang::prelude::*;
947    /// # use futures::StreamExt;
948    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
949    /// let tick = process.tick();
950    /// // ticks are lazy by default, forces the second tick to run
951    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
952    ///
953    /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
954    /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
955    /// a.is_some_and_equals(b).all_ticks()
956    /// # }, |mut stream| async move {
957    /// // [true, false]
958    /// # for w in vec![true, false] {
959    /// #     assert_eq!(stream.next().await.unwrap(), w);
960    /// # }
961    /// # }));
962    /// # }
963    /// ```
964    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
965    pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
966    where
967        T: PartialEq + Clone,
968        B: IsBounded,
969    {
970        self.into_singleton()
971            .zip(other.into_singleton())
972            .map(q!(|(a, b)| a.is_some() && a == b))
973    }
974
975    /// An operator which allows you to "name" a `HydroNode`.
976    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
977    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
978        {
979            let mut node = self.ir_node.borrow_mut();
980            let metadata = node.metadata_mut();
981            metadata.tag = Some(name.to_owned());
982        }
983        self
984    }
985
986    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
987    /// implies that `B == Bounded`.
988    pub fn make_bounded(self) -> Optional<T, L, Bounded>
989    where
990        B: IsBounded,
991    {
992        Optional::new(
993            self.location.clone(),
994            self.ir_node.replace(HydroNode::Placeholder),
995        )
996    }
997
998    /// Clones this bounded optional into a tick, returning a optional that has the
999    /// same value as the outer optional. Because the outer optional is bounded, this
1000    /// is deterministic because there is only a single immutable version.
1001    pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
1002    where
1003        B: IsBounded,
1004        T: Clone,
1005    {
1006        // TODO(shadaj): avoid printing simulator logs for this snapshot
1007        let inner = self.snapshot(
1008            tick,
1009            nondet!(/** bounded top-level optional so deterministic */),
1010        );
1011        Optional::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1012    }
1013
1014    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1015    /// non-null. Otherwise, the stream is empty.
1016    ///
1017    /// # Example
1018    /// ```rust
1019    /// # #[cfg(feature = "deploy")] {
1020    /// # use hydro_lang::prelude::*;
1021    /// # use futures::StreamExt;
1022    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1023    /// # let tick = process.tick();
1024    /// # // ticks are lazy by default, forces the second tick to run
1025    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1026    /// # let batch_first_tick = process
1027    /// #   .source_iter(q!(vec![]))
1028    /// #   .batch(&tick, nondet!(/** test */));
1029    /// # let batch_second_tick = process
1030    /// #   .source_iter(q!(vec![123, 456]))
1031    /// #   .batch(&tick, nondet!(/** test */))
1032    /// #   .defer_tick(); // appears on the second tick
1033    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1034    /// input_batch // first tick: [], second tick: [123, 456]
1035    ///     .clone()
1036    ///     .max()
1037    ///     .into_stream()
1038    ///     .chain(input_batch)
1039    ///     .all_ticks()
1040    /// # }, |mut stream| async move {
1041    /// // [456, 123, 456]
1042    /// # for w in vec![456, 123, 456] {
1043    /// #     assert_eq!(stream.next().await.unwrap(), w);
1044    /// # }
1045    /// # }));
1046    /// # }
1047    /// ```
1048    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1049    where
1050        B: IsBounded,
1051    {
1052        Stream::new(
1053            self.location.clone(),
1054            HydroNode::Cast {
1055                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1056                metadata: self.location.new_node_metadata(Stream::<
1057                    T,
1058                    Tick<L>,
1059                    Bounded,
1060                    TotalOrder,
1061                    ExactlyOnce,
1062                >::collection_kind()),
1063            },
1064        )
1065    }
1066
1067    /// Filters this optional, passing through the value if the boolean signal is `true`,
1068    /// otherwise the output is null.
1069    ///
1070    /// # Example
1071    /// ```rust
1072    /// # #[cfg(feature = "deploy")] {
1073    /// # use hydro_lang::prelude::*;
1074    /// # use futures::StreamExt;
1075    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1076    /// let tick = process.tick();
1077    /// // ticks are lazy by default, forces the second tick to run
1078    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1079    ///
1080    /// let some_first_tick = tick.optional_first_tick(q!(()));
1081    /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1082    /// let batch_first_tick = process
1083    ///   .source_iter(q!(vec![456]))
1084    ///   .batch(&tick, nondet!(/** test */));
1085    /// let batch_second_tick = process
1086    ///   .source_iter(q!(vec![789]))
1087    ///   .batch(&tick, nondet!(/** test */))
1088    ///   .defer_tick();
1089    /// batch_first_tick.chain(batch_second_tick).first()
1090    ///   .filter_if(signal)
1091    ///   .unwrap_or(tick.singleton(q!(0)))
1092    ///   .all_ticks()
1093    /// # }, |mut stream| async move {
1094    /// // [456, 0]
1095    /// # for w in vec![456, 0] {
1096    /// #     assert_eq!(stream.next().await.unwrap(), w);
1097    /// # }
1098    /// # }));
1099    /// # }
1100    /// ```
1101    pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1102    where
1103        B: IsBounded,
1104    {
1105        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1106    }
1107
1108    /// Filters this optional, passing through the optional value if it is non-null **and** the
1109    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1110    ///
1111    /// Useful for conditionally processing, such as only emitting an optional's value outside
1112    /// a tick if some other condition is satisfied.
1113    ///
1114    /// # Example
1115    /// ```rust
1116    /// # #[cfg(feature = "deploy")] {
1117    /// # use hydro_lang::prelude::*;
1118    /// # use futures::StreamExt;
1119    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1120    /// let tick = process.tick();
1121    /// // ticks are lazy by default, forces the second tick to run
1122    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1123    ///
1124    /// let batch_first_tick = process
1125    ///   .source_iter(q!(vec![]))
1126    ///   .batch(&tick, nondet!(/** test */));
1127    /// let batch_second_tick = process
1128    ///   .source_iter(q!(vec![456]))
1129    ///   .batch(&tick, nondet!(/** test */))
1130    ///   .defer_tick(); // appears on the second tick
1131    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1132    /// batch_first_tick.chain(batch_second_tick).first()
1133    ///   .filter_if_some(some_on_first_tick)
1134    ///   .unwrap_or(tick.singleton(q!(789)))
1135    ///   .all_ticks()
1136    /// # }, |mut stream| async move {
1137    /// // [789, 789]
1138    /// # for w in vec![789, 789] {
1139    /// #     assert_eq!(stream.next().await.unwrap(), w);
1140    /// # }
1141    /// # }));
1142    /// # }
1143    /// ```
1144    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1145    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1146    where
1147        B: IsBounded,
1148    {
1149        self.filter_if(signal.is_some())
1150    }
1151
1152    /// Filters this optional, passing through the optional value if it is non-null **and** the
1153    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1154    ///
1155    /// Useful for conditionally processing, such as only emitting an optional's value outside
1156    /// a tick if some other condition is satisfied.
1157    ///
1158    /// # Example
1159    /// ```rust
1160    /// # #[cfg(feature = "deploy")] {
1161    /// # use hydro_lang::prelude::*;
1162    /// # use futures::StreamExt;
1163    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1164    /// let tick = process.tick();
1165    /// // ticks are lazy by default, forces the second tick to run
1166    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1167    ///
1168    /// let batch_first_tick = process
1169    ///   .source_iter(q!(vec![]))
1170    ///   .batch(&tick, nondet!(/** test */));
1171    /// let batch_second_tick = process
1172    ///   .source_iter(q!(vec![456]))
1173    ///   .batch(&tick, nondet!(/** test */))
1174    ///   .defer_tick(); // appears on the second tick
1175    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1176    /// batch_first_tick.chain(batch_second_tick).first()
1177    ///   .filter_if_none(some_on_first_tick)
1178    ///   .unwrap_or(tick.singleton(q!(789)))
1179    ///   .all_ticks()
1180    /// # }, |mut stream| async move {
1181    /// // [789, 789]
1182    /// # for w in vec![789, 456] {
1183    /// #     assert_eq!(stream.next().await.unwrap(), w);
1184    /// # }
1185    /// # }));
1186    /// # }
1187    /// ```
1188    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1189    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1190    where
1191        B: IsBounded,
1192    {
1193        self.filter_if(other.is_none())
1194    }
1195
1196    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1197    ///
1198    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1199    /// having a value, such as only releasing a piece of state if the node is the leader.
1200    ///
1201    /// # Example
1202    /// ```rust
1203    /// # #[cfg(feature = "deploy")] {
1204    /// # use hydro_lang::prelude::*;
1205    /// # use futures::StreamExt;
1206    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1207    /// let tick = process.tick();
1208    /// // ticks are lazy by default, forces the second tick to run
1209    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1210    ///
1211    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1212    /// some_on_first_tick
1213    ///     .if_some_then(tick.singleton(q!(456)))
1214    ///     .unwrap_or(tick.singleton(q!(123)))
1215    /// # .all_ticks()
1216    /// # }, |mut stream| async move {
1217    /// // 456 (first tick) ~> 123 (second tick onwards)
1218    /// # for w in vec![456, 123, 123] {
1219    /// #     assert_eq!(stream.next().await.unwrap(), w);
1220    /// # }
1221    /// # }));
1222    /// # }
1223    /// ```
1224    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1225    pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1226    where
1227        B: IsBounded,
1228    {
1229        value.filter_if(self.is_some())
1230    }
1231}
1232
1233impl<'a, K, V, L, B: Boundedness> Optional<(K, V), L, B>
1234where
1235    L: Location<'a>,
1236{
1237    /// Converts this optional into a [`KeyedSingleton`] containing a single entry with the
1238    /// key-value pair of this [`Optional`].
1239    ///
1240    /// If this [`Optional`] is [`Bounded`], the [`KeyedSingleton`] will be [`Bounded`] as well
1241    /// if it is [`Unbounded`], the [`KeyedSingleton`] will be [`Unbounded`], which means that
1242    /// the entry will be updated and appear / disappear according to the state of the
1243    /// [`Optional`].
1244    pub fn into_keyed_singleton(self) -> KeyedSingleton<K, V, L, B> {
1245        KeyedSingleton::new(
1246            self.location.clone(),
1247            HydroNode::Cast {
1248                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1249                metadata: self
1250                    .location
1251                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1252            },
1253        )
1254    }
1255}
1256
1257impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1258where
1259    L: Location<'a>,
1260{
1261    /// Returns an optional value corresponding to the latest snapshot of the optional
1262    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1263    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1264    /// all snapshots of this optional into the atomic-associated tick will observe the
1265    /// same value each tick.
1266    ///
1267    /// # Non-Determinism
1268    /// Because this picks a snapshot of a optional whose value is continuously changing,
1269    /// the output optional has a non-deterministic value since the snapshot can be at an
1270    /// arbitrary point in time.
1271    pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1272        self,
1273        tick: &Tick<L2>,
1274        _nondet: NonDet,
1275    ) -> Optional<T, Tick<L::DropConsistency>, Bounded> {
1276        Optional::new(
1277            tick.drop_consistency(),
1278            HydroNode::Batch {
1279                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1280                metadata: tick
1281                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1282            },
1283        )
1284    }
1285
1286    /// Returns this optional back into a top-level, asynchronous execution context where updates
1287    /// to the value will be asynchronously propagated.
1288    pub fn end_atomic(self) -> Optional<T, L, B> {
1289        Optional::new(
1290            self.location.tick.l.clone(),
1291            HydroNode::EndAtomic {
1292                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1293                metadata: self
1294                    .location
1295                    .tick
1296                    .l
1297                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1298            },
1299        )
1300    }
1301}
1302
1303impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1304where
1305    L: Location<'a>,
1306{
1307    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1308    /// will observe the same version of the value and will be executed synchronously before any
1309    /// outputs are yielded (in [`Optional::end_atomic`]).
1310    ///
1311    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1312    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1313    /// a different version).
1314    pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1315        let id = self.location.flow_state().borrow_mut().next_clock_id();
1316        let out_location = Atomic {
1317            tick: Tick {
1318                id,
1319                l: self.location.clone(),
1320            },
1321        };
1322        Optional::new(
1323            out_location.clone(),
1324            HydroNode::BeginAtomic {
1325                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1326                metadata: out_location
1327                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1328            },
1329        )
1330    }
1331
1332    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1333    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1334    /// relevant data that contributed to the snapshot at tick `t`.
1335    ///
1336    /// # Non-Determinism
1337    /// Because this picks a snapshot of a optional whose value is continuously changing,
1338    /// the output optional has a non-deterministic value since the snapshot can be at an
1339    /// arbitrary point in time.
1340    pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1341        self,
1342        tick: &Tick<L2>,
1343        _nondet: NonDet,
1344    ) -> Optional<T, Tick<L::DropConsistency>, Bounded> {
1345        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1346        Optional::new(
1347            tick.drop_consistency(),
1348            HydroNode::Batch {
1349                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1350                metadata: tick
1351                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1352            },
1353        )
1354    }
1355
1356    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1357    /// with order corresponding to increasing prefixes of data contributing to the optional.
1358    ///
1359    /// # Non-Determinism
1360    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1361    /// to non-deterministic batching and arrival of inputs, the output stream is
1362    /// non-deterministic.
1363    pub fn sample_eager(
1364        self,
1365        nondet: NonDet,
1366    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1367        let tick = self.location.tick();
1368        self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1369    }
1370
1371    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1372    /// value taken at various points in time. Because the input optional may be
1373    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1374    /// represent the value of the optional given some prefix of the streams leading up to
1375    /// it.
1376    ///
1377    /// # Non-Determinism
1378    /// The output stream is non-deterministic in which elements are sampled, since this
1379    /// is controlled by a clock.
1380    #[cfg(feature = "tokio")]
1381    pub fn sample_every(
1382        self,
1383        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1384        nondet: NonDet,
1385    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1386    where
1387        L: TopLevel<'a>,
1388    {
1389        let samples = self.location.source_interval(interval);
1390        let tick = self.location.tick();
1391
1392        self.snapshot(&tick, nondet)
1393            .filter_if(samples.batch(&tick, nondet).first().is_some())
1394            .all_ticks()
1395            .weaken_retries()
1396    }
1397}
1398
1399impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1400where
1401    L: Location<'a>,
1402{
1403    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1404    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1405    /// null values).
1406    ///
1407    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1408    /// producing one element in the output for each (non-null) tick. This is useful for batched
1409    /// computations, where the results from each tick must be combined together.
1410    ///
1411    /// # Example
1412    /// ```rust
1413    /// # #[cfg(feature = "deploy")] {
1414    /// # use hydro_lang::prelude::*;
1415    /// # use futures::StreamExt;
1416    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1417    /// # let tick = process.tick();
1418    /// # // ticks are lazy by default, forces the second tick to run
1419    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1420    /// # let batch_first_tick = process
1421    /// #   .source_iter(q!(vec![]))
1422    /// #   .batch(&tick, nondet!(/** test */));
1423    /// # let batch_second_tick = process
1424    /// #   .source_iter(q!(vec![1, 2, 3]))
1425    /// #   .batch(&tick, nondet!(/** test */))
1426    /// #   .defer_tick(); // appears on the second tick
1427    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1428    /// input_batch // first tick: [], second tick: [1, 2, 3]
1429    ///     .max()
1430    ///     .all_ticks()
1431    /// # }, |mut stream| async move {
1432    /// // [3]
1433    /// # for w in vec![3] {
1434    /// #     assert_eq!(stream.next().await.unwrap(), w);
1435    /// # }
1436    /// # }));
1437    /// # }
1438    /// ```
1439    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1440        self.into_stream().all_ticks()
1441    }
1442
1443    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1444    /// which will stream the value computed in _each_ tick as a separate stream element.
1445    ///
1446    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1447    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1448    /// optional's [`Tick`] context.
1449    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1450        self.into_stream().all_ticks_atomic()
1451    }
1452
1453    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1454    /// be asynchronously updated with the latest value of the optional inside the tick, including
1455    /// whether the optional is null or not.
1456    ///
1457    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1458    /// tick that tracks the inner value. This is useful for getting the value as of the
1459    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1460    ///
1461    /// # Example
1462    /// ```rust
1463    /// # #[cfg(feature = "deploy")] {
1464    /// # use hydro_lang::prelude::*;
1465    /// # use futures::StreamExt;
1466    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1467    /// # let tick = process.tick();
1468    /// # // ticks are lazy by default, forces the second tick to run
1469    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1470    /// # let batch_first_tick = process
1471    /// #   .source_iter(q!(vec![]))
1472    /// #   .batch(&tick, nondet!(/** test */));
1473    /// # let batch_second_tick = process
1474    /// #   .source_iter(q!(vec![1, 2, 3]))
1475    /// #   .batch(&tick, nondet!(/** test */))
1476    /// #   .defer_tick(); // appears on the second tick
1477    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1478    /// input_batch // first tick: [], second tick: [1, 2, 3]
1479    ///     .max()
1480    ///     .latest()
1481    /// # .into_singleton()
1482    /// # .sample_eager(nondet!(/** test */))
1483    /// # }, |mut stream| async move {
1484    /// // asynchronously changes from None ~> 3
1485    /// # for w in vec![None, Some(3)] {
1486    /// #     assert_eq!(stream.next().await.unwrap(), w);
1487    /// # }
1488    /// # }));
1489    /// # }
1490    /// ```
1491    pub fn latest(self) -> Optional<T, L, Unbounded> {
1492        Optional::new(
1493            self.location.outer().clone(),
1494            HydroNode::YieldConcat {
1495                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1496                metadata: self
1497                    .location
1498                    .outer()
1499                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1500            },
1501        )
1502    }
1503
1504    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1505    /// be updated with the latest value of the optional inside the tick.
1506    ///
1507    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1508    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1509    /// optional's [`Tick`] context.
1510    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1511        let out_location = Atomic {
1512            tick: self.location.clone(),
1513        };
1514
1515        Optional::new(
1516            out_location.clone(),
1517            HydroNode::YieldConcat {
1518                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1519                metadata: out_location
1520                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1521            },
1522        )
1523    }
1524
1525    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1526    /// always has the state of `self` at tick `T - 1`.
1527    ///
1528    /// At tick `0`, the output optional is null, since there is no previous tick.
1529    ///
1530    /// This operator enables stateful iterative processing with ticks, by sending data from one
1531    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1532    ///
1533    /// # Example
1534    /// ```rust
1535    /// # #[cfg(feature = "deploy")] {
1536    /// # use hydro_lang::prelude::*;
1537    /// # use futures::StreamExt;
1538    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1539    /// let tick = process.tick();
1540    /// // ticks are lazy by default, forces the second tick to run
1541    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1542    ///
1543    /// let batch_first_tick = process
1544    ///   .source_iter(q!(vec![1, 2]))
1545    ///   .batch(&tick, nondet!(/** test */));
1546    /// let batch_second_tick = process
1547    ///   .source_iter(q!(vec![3, 4]))
1548    ///   .batch(&tick, nondet!(/** test */))
1549    ///   .defer_tick(); // appears on the second tick
1550    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1551    ///   .reduce(q!(|state, v| *state += v));
1552    ///
1553    /// current_tick_sum.clone().into_singleton().zip(
1554    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1555    /// ).all_ticks()
1556    /// # }, |mut stream| async move {
1557    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1558    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1559    /// #     assert_eq!(stream.next().await.unwrap(), w);
1560    /// # }
1561    /// # }));
1562    /// # }
1563    /// ```
1564    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1565        Optional::new(
1566            self.location.clone(),
1567            HydroNode::DeferTick {
1568                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1569                metadata: self.location.new_node_metadata(Self::collection_kind()),
1570            },
1571        )
1572    }
1573}
1574
1575#[cfg(test)]
1576mod tests {
1577    #[cfg(feature = "deploy")]
1578    use futures::StreamExt;
1579    #[cfg(feature = "deploy")]
1580    use hydro_deploy::Deployment;
1581    #[cfg(any(feature = "deploy", feature = "sim"))]
1582    use stageleft::q;
1583
1584    #[cfg(feature = "deploy")]
1585    use super::Optional;
1586    #[cfg(any(feature = "deploy", feature = "sim"))]
1587    use crate::compile::builder::FlowBuilder;
1588    #[cfg(any(feature = "deploy", feature = "sim"))]
1589    use crate::location::Location;
1590    #[cfg(feature = "deploy")]
1591    use crate::nondet::nondet;
1592
1593    #[cfg(feature = "deploy")]
1594    #[tokio::test]
1595    async fn optional_or_cardinality() {
1596        let mut deployment = Deployment::new();
1597
1598        let mut flow = FlowBuilder::new();
1599        let node = flow.process::<()>();
1600        let external = flow.external::<()>();
1601
1602        let node_tick = node.tick();
1603        let tick_singleton = node_tick.singleton(q!(123));
1604        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1605        let counts = tick_optional_inhabited
1606            .clone()
1607            .or(tick_optional_inhabited)
1608            .into_stream()
1609            .count()
1610            .all_ticks()
1611            .send_bincode_external(&external);
1612
1613        let nodes = flow
1614            .with_process(&node, deployment.Localhost())
1615            .with_external(&external, deployment.Localhost())
1616            .deploy(&mut deployment);
1617
1618        deployment.deploy().await.unwrap();
1619
1620        let mut external_out = nodes.connect(counts).await;
1621
1622        deployment.start().await.unwrap();
1623
1624        assert_eq!(external_out.next().await.unwrap(), 1);
1625    }
1626
1627    #[cfg(feature = "deploy")]
1628    #[tokio::test]
1629    async fn into_singleton_top_level_none_cardinality() {
1630        let mut deployment = Deployment::new();
1631
1632        let mut flow = FlowBuilder::new();
1633        let node = flow.process::<()>();
1634        let external = flow.external::<()>();
1635
1636        let node_tick = node.tick();
1637        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1638        let into_singleton = top_level_none.into_singleton();
1639
1640        let tick_driver = node.spin();
1641
1642        let counts = into_singleton
1643            .snapshot(&node_tick, nondet!(/** test */))
1644            .into_stream()
1645            .count()
1646            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1647            .map(q!(|(c, _)| c))
1648            .all_ticks()
1649            .send_bincode_external(&external);
1650
1651        let nodes = flow
1652            .with_process(&node, deployment.Localhost())
1653            .with_external(&external, deployment.Localhost())
1654            .deploy(&mut deployment);
1655
1656        deployment.deploy().await.unwrap();
1657
1658        let mut external_out = nodes.connect(counts).await;
1659
1660        deployment.start().await.unwrap();
1661
1662        assert_eq!(external_out.next().await.unwrap(), 1);
1663        assert_eq!(external_out.next().await.unwrap(), 1);
1664        assert_eq!(external_out.next().await.unwrap(), 1);
1665    }
1666
1667    #[cfg(feature = "deploy")]
1668    #[tokio::test]
1669    async fn into_singleton_unbounded_top_level_none_cardinality() {
1670        let mut deployment = Deployment::new();
1671
1672        let mut flow = FlowBuilder::new();
1673        let node = flow.process::<()>();
1674        let external = flow.external::<()>();
1675
1676        let node_tick = node.tick();
1677        let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1678        let into_singleton = top_level_none.into_singleton();
1679
1680        let tick_driver = node.spin();
1681
1682        let counts = into_singleton
1683            .snapshot(&node_tick, nondet!(/** test */))
1684            .into_stream()
1685            .count()
1686            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1687            .map(q!(|(c, _)| c))
1688            .all_ticks()
1689            .send_bincode_external(&external);
1690
1691        let nodes = flow
1692            .with_process(&node, deployment.Localhost())
1693            .with_external(&external, deployment.Localhost())
1694            .deploy(&mut deployment);
1695
1696        deployment.deploy().await.unwrap();
1697
1698        let mut external_out = nodes.connect(counts).await;
1699
1700        deployment.start().await.unwrap();
1701
1702        assert_eq!(external_out.next().await.unwrap(), 1);
1703        assert_eq!(external_out.next().await.unwrap(), 1);
1704        assert_eq!(external_out.next().await.unwrap(), 1);
1705    }
1706
1707    #[cfg(feature = "sim")]
1708    #[test]
1709    fn top_level_optional_some_into_stream_no_replay() {
1710        let mut flow = FlowBuilder::new();
1711        let node = flow.process::<()>();
1712
1713        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1714        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1715        let filtered_some = folded.filter(q!(|_| true));
1716
1717        let out_recv = filtered_some.into_stream().sim_output();
1718
1719        flow.sim().exhaustive(async || {
1720            out_recv.assert_yields_only([10]).await;
1721        });
1722    }
1723
1724    #[cfg(feature = "sim")]
1725    #[test]
1726    fn top_level_optional_none_into_stream_no_replay() {
1727        let mut flow = FlowBuilder::new();
1728        let node = flow.process::<()>();
1729
1730        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1731        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1732        let filtered_none = folded.filter(q!(|_| false));
1733
1734        let out_recv = filtered_none.into_stream().sim_output();
1735
1736        flow.sim().exhaustive(async || {
1737            out_recv.assert_yields_only([] as [i32; 0]).await;
1738        });
1739    }
1740
1741    #[cfg(feature = "deploy")]
1742    #[tokio::test]
1743    async fn test_optional_ref() {
1744        let mut deployment = Deployment::new();
1745
1746        let mut flow = FlowBuilder::new();
1747        let external = flow.external::<()>();
1748        let p1 = flow.process::<()>();
1749
1750        // Create an optional: reduce 0..5 => Some(10) (sum via reduce)
1751        let my_opt = p1.source_iter(q!(0..5i32)).reduce(q!(|a, b| *a += b));
1752
1753        let opt_ref = my_opt.by_ref();
1754
1755        // Use the optional ref in a map: unwrap_or(0) + x
1756        let out_port = p1
1757            .source_iter(q!(1..=3i32))
1758            .map(q!(|x| x + opt_ref.unwrap_or(0)))
1759            .send_bincode_external(&external);
1760
1761        let nodes = flow
1762            .with_default_optimize()
1763            .with_process(&p1, deployment.Localhost())
1764            .with_external(&external, deployment.Localhost())
1765            .deploy(&mut deployment);
1766
1767        deployment.deploy().await.unwrap();
1768
1769        let mut out_recv = nodes.connect(out_port).await;
1770
1771        deployment.start().await.unwrap();
1772
1773        let mut results = Vec::new();
1774        for _ in 0..3 {
1775            results.push(out_recv.next().await.unwrap());
1776        }
1777        results.sort();
1778        // reduce(0..5) = 10, so results should be 11, 12, 13
1779        assert_eq!(results, vec![11, 12, 13]);
1780    }
1781
1782    #[cfg(feature = "deploy")]
1783    #[tokio::test]
1784    async fn test_optional_ref_none() {
1785        let mut deployment = Deployment::new();
1786
1787        let mut flow = FlowBuilder::new();
1788        let external = flow.external::<()>();
1789        let p1 = flow.process::<()>();
1790
1791        // Create an optional from an empty source => None
1792        let my_opt = p1
1793            .source_iter(q!(std::iter::empty::<i32>()))
1794            .reduce(q!(|a, b| *a += b));
1795
1796        let opt_ref = my_opt.by_ref();
1797
1798        // Use the optional ref: should be None, so unwrap_or(99)
1799        let out_port = p1
1800            .source_iter(q!(1..=2i32))
1801            .map(q!(|x| x + opt_ref.unwrap_or(99)))
1802            .send_bincode_external(&external);
1803
1804        let nodes = flow
1805            .with_default_optimize()
1806            .with_process(&p1, deployment.Localhost())
1807            .with_external(&external, deployment.Localhost())
1808            .deploy(&mut deployment);
1809
1810        deployment.deploy().await.unwrap();
1811
1812        let mut out_recv = nodes.connect(out_port).await;
1813
1814        deployment.start().await.unwrap();
1815
1816        let mut results = Vec::new();
1817        for _ in 0..2 {
1818            results.push(out_recv.next().await.unwrap());
1819        }
1820        results.sort();
1821        // optional is None, so unwrap_or(99) => 100, 101
1822        assert_eq!(results, vec![100, 101]);
1823    }
1824
1825    #[cfg(feature = "deploy")]
1826    #[tokio::test]
1827    async fn test_optional_ref_and_consume() {
1828        let mut deployment = Deployment::new();
1829
1830        let mut flow = FlowBuilder::new();
1831        let external = flow.external::<()>();
1832        let p1 = flow.process::<()>();
1833
1834        // Use reduce to produce an Optional
1835        let my_opt = p1.source_iter(q!(0..5i32)).reduce(q!(|a, b| *a += b));
1836
1837        let opt_ref = my_opt.by_ref();
1838
1839        // Reference path
1840        let out_port_ref = p1
1841            .source_iter(q!(1..=2i32))
1842            .map(q!(|x| x + opt_ref.unwrap_or(0)))
1843            .send_bincode_external(&external);
1844
1845        let nodes = flow
1846            .with_default_optimize()
1847            .with_process(&p1, deployment.Localhost())
1848            .with_external(&external, deployment.Localhost())
1849            .deploy(&mut deployment);
1850
1851        deployment.deploy().await.unwrap();
1852
1853        let mut out_recv_ref = nodes.connect(out_port_ref).await;
1854
1855        deployment.start().await.unwrap();
1856
1857        let mut ref_results = Vec::new();
1858        for _ in 0..2 {
1859            ref_results.push(out_recv_ref.next().await.unwrap());
1860        }
1861        ref_results.sort();
1862        // reduce(0..5) = 10, so 1+10=11, 2+10=12
1863        assert_eq!(ref_results, vec![11, 12]);
1864    }
1865}