hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(feature = "tokio")]
21use crate::location::TopLevel;
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, DeferTick};
25use crate::location::{Location, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::prelude::KeyedSingleton;
28use crate::properties::{StreamMapFuncAlgebra, ValidMutCommutativityFor, ValidMutIdempotenceFor};
29
30/// A *nullable* Rust value that can asynchronously change over time.
31///
32/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
33/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
34/// asynchronously change over time, including becoming present of uninhabited.
35///
36/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
37/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
38///
39/// Type Parameters:
40/// - `Type`: the type of the value in this optional (when it is not null)
41/// - `Loc`: the [`Location`] where the optional is materialized
42/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
43pub struct Optional<Type, Loc, Bound: Boundedness> {
44 pub(crate) location: Loc,
45 pub(crate) ir_node: RefCell<HydroNode>,
46 pub(crate) flow_state: FlowState,
47
48 _phantom: PhantomData<(Type, Loc, Bound)>,
49}
50
51impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
52 fn drop(&mut self) {
53 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
54 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
55 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
56 input: Box::new(ir_node),
57 op_metadata: HydroIrOpMetadata::new(),
58 });
59 }
60 }
61}
62
63impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
64where
65 T: Clone,
66 L: Location<'a>,
67{
68 fn from(value: Optional<T, L, Bounded>) -> Self {
69 let tick = value.location().tick();
70 value.clone_into_tick(&tick).latest()
71 }
72}
73
74impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
75where
76 L: Location<'a>,
77{
78 fn defer_tick(self) -> Self {
79 Optional::defer_tick(self)
80 }
81}
82
83impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
84where
85 L: Location<'a>,
86{
87 type Location = Tick<L>;
88
89 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
90 Optional::new(
91 location.clone(),
92 HydroNode::CycleSource {
93 cycle_id,
94 metadata: location.new_node_metadata(Self::collection_kind()),
95 },
96 )
97 }
98}
99
100impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
101where
102 L: Location<'a>,
103{
104 type Location = Tick<L>;
105
106 fn location(&self) -> &Self::Location {
107 self.location()
108 }
109
110 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
111 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
112 location.clone(),
113 HydroNode::DeferTick {
114 input: Box::new(HydroNode::CycleSource {
115 cycle_id,
116 metadata: location.new_node_metadata(Self::collection_kind()),
117 }),
118 metadata: location
119 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
120 },
121 );
122
123 from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
124 }
125}
126
127impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
128where
129 L: Location<'a>,
130{
131 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
132 assert_eq!(
133 Location::id(&self.location),
134 expected_location,
135 "locations do not match"
136 );
137 self.location
138 .flow_state()
139 .borrow_mut()
140 .push_root(HydroRoot::CycleSink {
141 cycle_id,
142 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
143 op_metadata: HydroIrOpMetadata::new(),
144 });
145 }
146}
147
148impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
149where
150 L: Location<'a>,
151{
152 type Location = L;
153
154 fn create_source(cycle_id: CycleId, location: L) -> Self {
155 Optional::new(
156 location.clone(),
157 HydroNode::CycleSource {
158 cycle_id,
159 metadata: location.new_node_metadata(Self::collection_kind()),
160 },
161 )
162 }
163}
164
165impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
166where
167 L: Location<'a>,
168{
169 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
170 assert_eq!(
171 Location::id(&self.location),
172 expected_location,
173 "locations do not match"
174 );
175 self.location
176 .flow_state()
177 .borrow_mut()
178 .push_root(HydroRoot::CycleSink {
179 cycle_id,
180 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
181 op_metadata: HydroIrOpMetadata::new(),
182 });
183 }
184}
185
186impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
187where
188 L: Location<'a>,
189{
190 fn from(singleton: Singleton<T, L, B>) -> Self {
191 Optional::new(
192 singleton.location.clone(),
193 HydroNode::Cast {
194 inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
195 metadata: singleton
196 .location
197 .new_node_metadata(Self::collection_kind()),
198 },
199 )
200 }
201}
202
203#[cfg(stageleft_runtime)]
204pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
205 me: Optional<T, L, B>,
206 other: Optional<O, L, B>,
207) -> Optional<(T, O), L, B> {
208 check_matching_location(&me.location, &other.location);
209
210 Optional::new(
211 me.location.clone(),
212 HydroNode::CrossSingleton {
213 left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
214 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
215 metadata: me
216 .location
217 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
218 },
219 )
220}
221
222#[cfg(stageleft_runtime)]
223fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
224 me: Optional<T, L, B>,
225 other: Optional<T, L, B>,
226) -> Optional<T, L, B> {
227 check_matching_location(&me.location, &other.location);
228
229 Optional::new(
230 me.location.clone(),
231 HydroNode::ChainFirst {
232 first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
233 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
234 metadata: me
235 .location
236 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
237 },
238 )
239}
240
241impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
242where
243 T: Clone,
244 L: Location<'a>,
245{
246 fn clone(&self) -> Self {
247 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
248 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
249 *self.ir_node.borrow_mut() = HydroNode::Tee {
250 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
251 metadata: self.location.new_node_metadata(Self::collection_kind()),
252 };
253 }
254
255 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
256 Optional {
257 location: self.location.clone(),
258 flow_state: self.flow_state.clone(),
259 ir_node: HydroNode::Tee {
260 inner: SharedNode(inner.0.clone()),
261 metadata: metadata.clone(),
262 }
263 .into(),
264 _phantom: PhantomData,
265 }
266 } else {
267 unreachable!()
268 }
269 }
270}
271
272impl<'a, T, L, B: Boundedness> Optional<T, L, B>
273where
274 L: Location<'a>,
275{
276 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
277 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
278 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
279 let flow_state = location.flow_state().clone();
280 Optional {
281 location,
282 flow_state,
283 ir_node: RefCell::new(ir_node),
284 _phantom: PhantomData,
285 }
286 }
287
288 pub(crate) fn collection_kind() -> CollectionKind {
289 CollectionKind::Optional {
290 bound: B::BOUND_KIND,
291 element_type: stageleft::quote_type::<T>().into(),
292 }
293 }
294
295 /// Returns the [`Location`] where this optional is being materialized.
296 pub fn location(&self) -> &L {
297 &self.location
298 }
299
300 /// Creates a shared reference handle to this optional that can be captured inside `q!()`
301 /// closures. The handle resolves to `&Option<T>` at runtime.
302 ///
303 /// The optional must be bounded, otherwise reading it would be non-deterministic.
304 pub fn by_ref(&self) -> crate::handoff_ref::OptionalRef<'a, '_, T, L>
305 where
306 B: IsBounded,
307 {
308 crate::handoff_ref::OptionalRef::new(&self.ir_node)
309 }
310
311 /// Returns a mutable reference handle to this optional that can be captured inside `q!()`
312 /// closures. The handle resolves to `&mut Option<T>` at runtime.
313 pub fn by_mut(&self) -> crate::handoff_ref::OptionalMut<'a, '_, T, L>
314 where
315 B: IsBounded,
316 {
317 crate::handoff_ref::OptionalMut::new(&self.ir_node)
318 }
319
320 /// Weakens the consistency of this live collection to not guarantee any consistency across
321 /// cluster members (if this collection is on a cluster).
322 pub fn weaken_consistency(self) -> Optional<T, L::DropConsistency, B>
323 where
324 L: Location<'a>,
325 {
326 if L::consistency()
327 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
328 {
329 // already no consistency
330 Optional::new(
331 self.location.drop_consistency(),
332 self.ir_node.replace(HydroNode::Placeholder),
333 )
334 } else {
335 Optional::new(
336 self.location.drop_consistency(),
337 HydroNode::Cast {
338 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
339 metadata: self
340 .location
341 .clone()
342 .drop_consistency()
343 .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
344 },
345 )
346 }
347 }
348
349 /// Casts this live collection to have the consistency guarantees specified in the given
350 /// location type parameter. The developer must ensure that the strengthened consistency
351 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
352 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
353 self,
354 _proof: impl crate::properties::ConsistencyProof,
355 ) -> Optional<T, L2, B>
356 where
357 L: Location<'a>,
358 {
359 if L::consistency() == L2::consistency() {
360 Optional::new(
361 self.location.with_consistency_of(),
362 self.ir_node.replace(HydroNode::Placeholder),
363 )
364 } else {
365 Optional::new(
366 self.location.with_consistency_of(),
367 HydroNode::AssertIsConsistent {
368 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
369 trusted: false,
370 metadata: self
371 .location
372 .clone()
373 .with_consistency_of::<L2>()
374 .new_node_metadata(Optional::<T, L2, B>::collection_kind()),
375 },
376 )
377 }
378 }
379
380 /// Transforms the optional value by applying a function `f` to it,
381 /// continuously as the input is updated.
382 ///
383 /// Whenever the optional is empty, the output optional is also empty.
384 ///
385 /// # Example
386 /// ```rust
387 /// # #[cfg(feature = "deploy")] {
388 /// # use hydro_lang::prelude::*;
389 /// # use futures::StreamExt;
390 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
391 /// let tick = process.tick();
392 /// let optional = tick.optional_first_tick(q!(1));
393 /// optional.map(q!(|v| v + 1)).all_ticks()
394 /// # }, |mut stream| async move {
395 /// // 2
396 /// # assert_eq!(stream.next().await.unwrap(), 2);
397 /// # }));
398 /// # }
399 /// ```
400 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
401 where
402 F: Fn(T) -> U + 'a,
403 {
404 let f = f.splice_fn1_ctx(&self.location).into();
405 Optional::new(
406 self.location.clone(),
407 HydroNode::Map {
408 f,
409 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
410 metadata: self
411 .location
412 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
413 },
414 )
415 }
416
417 /// Transforms the optional value by applying a function `f` to it and then flattening
418 /// the result into a stream, preserving the order of elements.
419 ///
420 /// If the optional is empty, the output stream is also empty. If the optional contains
421 /// a value, `f` is applied to produce an iterator, and all items from that iterator
422 /// are emitted in the output stream in deterministic order.
423 ///
424 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
425 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
426 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
427 ///
428 /// # Example
429 /// ```rust
430 /// # #[cfg(feature = "deploy")] {
431 /// # use hydro_lang::prelude::*;
432 /// # use futures::StreamExt;
433 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
434 /// let tick = process.tick();
435 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
436 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
437 /// # }, |mut stream| async move {
438 /// // 1, 2, 3
439 /// # for w in vec![1, 2, 3] {
440 /// # assert_eq!(stream.next().await.unwrap(), w);
441 /// # }
442 /// # }));
443 /// # }
444 /// ```
445 pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
446 self,
447 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
448 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
449 where
450 B: IsBounded,
451 I: IntoIterator<Item = U>,
452 F: FnMut(T) -> I + 'a,
453 C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
454 Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
455 {
456 self.into_stream().flat_map_ordered(f)
457 }
458
459 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
460 /// for the output type `I` to produce items in any order.
461 ///
462 /// If the optional is empty, the output stream is also empty. If the optional contains
463 /// a value, `f` is applied to produce an iterator, and all items from that iterator
464 /// are emitted in the output stream in non-deterministic order.
465 ///
466 /// # Example
467 /// ```rust
468 /// # #[cfg(feature = "deploy")] {
469 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
470 /// # use futures::StreamExt;
471 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
472 /// let tick = process.tick();
473 /// let optional = tick.optional_first_tick(q!(
474 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
475 /// ));
476 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
477 /// # }, |mut stream| async move {
478 /// // 1, 2, 3, but in no particular order
479 /// # let mut results = Vec::new();
480 /// # for _ in 0..3 {
481 /// # results.push(stream.next().await.unwrap());
482 /// # }
483 /// # results.sort();
484 /// # assert_eq!(results, vec![1, 2, 3]);
485 /// # }));
486 /// # }
487 /// ```
488 pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
489 self,
490 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
491 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
492 where
493 B: IsBounded,
494 I: IntoIterator<Item = U>,
495 F: FnMut(T) -> I + 'a,
496 C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
497 Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
498 {
499 self.into_stream().flat_map_unordered(f)
500 }
501
502 /// Flattens the optional value into a stream, preserving the order of elements.
503 ///
504 /// If the optional is empty, the output stream is also empty. If the optional contains
505 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
506 /// in the output stream in deterministic order.
507 ///
508 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
509 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
510 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
511 ///
512 /// # Example
513 /// ```rust
514 /// # #[cfg(feature = "deploy")] {
515 /// # use hydro_lang::prelude::*;
516 /// # use futures::StreamExt;
517 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
518 /// let tick = process.tick();
519 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
520 /// optional.flatten_ordered().all_ticks()
521 /// # }, |mut stream| async move {
522 /// // 1, 2, 3
523 /// # for w in vec![1, 2, 3] {
524 /// # assert_eq!(stream.next().await.unwrap(), w);
525 /// # }
526 /// # }));
527 /// # }
528 /// ```
529 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
530 where
531 B: IsBounded,
532 T: IntoIterator<Item = U>,
533 {
534 self.flat_map_ordered(q!(|v| v))
535 }
536
537 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
538 /// for the element type `T` to produce items in any order.
539 ///
540 /// If the optional is empty, the output stream is also empty. If the optional contains
541 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
542 /// in the output stream in non-deterministic order.
543 ///
544 /// # Example
545 /// ```rust
546 /// # #[cfg(feature = "deploy")] {
547 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
548 /// # use futures::StreamExt;
549 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
550 /// let tick = process.tick();
551 /// let optional = tick.optional_first_tick(q!(
552 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
553 /// ));
554 /// optional.flatten_unordered().all_ticks()
555 /// # }, |mut stream| async move {
556 /// // 1, 2, 3, but in no particular order
557 /// # let mut results = Vec::new();
558 /// # for _ in 0..3 {
559 /// # results.push(stream.next().await.unwrap());
560 /// # }
561 /// # results.sort();
562 /// # assert_eq!(results, vec![1, 2, 3]);
563 /// # }));
564 /// # }
565 /// ```
566 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
567 where
568 B: IsBounded,
569 T: IntoIterator<Item = U>,
570 {
571 self.flat_map_unordered(q!(|v| v))
572 }
573
574 /// Creates an optional containing only the value if it satisfies a predicate `f`.
575 ///
576 /// If the optional is empty, the output optional is also empty. If the optional contains
577 /// a value and the predicate returns `true`, the output optional contains the same value.
578 /// If the predicate returns `false`, the output optional is empty.
579 ///
580 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
581 /// not modify or take ownership of the value. If you need to modify the value while filtering
582 /// use [`Optional::filter_map`] instead.
583 ///
584 /// # Example
585 /// ```rust
586 /// # #[cfg(feature = "deploy")] {
587 /// # use hydro_lang::prelude::*;
588 /// # use futures::StreamExt;
589 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
590 /// let tick = process.tick();
591 /// let optional = tick.optional_first_tick(q!(5));
592 /// optional.filter(q!(|&x| x > 3)).all_ticks()
593 /// # }, |mut stream| async move {
594 /// // 5
595 /// # assert_eq!(stream.next().await.unwrap(), 5);
596 /// # }));
597 /// # }
598 /// ```
599 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
600 where
601 F: Fn(&T) -> bool + 'a,
602 {
603 let f = f.splice_fn1_borrow_ctx(&self.location).into();
604 Optional::new(
605 self.location.clone(),
606 HydroNode::Filter {
607 f,
608 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
609 metadata: self.location.new_node_metadata(Self::collection_kind()),
610 },
611 )
612 }
613
614 /// An operator that both filters and maps. It yields only the value if the supplied
615 /// closure `f` returns `Some(value)`.
616 ///
617 /// If the optional is empty, the output optional is also empty. If the optional contains
618 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
619 /// If the closure returns `None`, the output optional is empty.
620 ///
621 /// # Example
622 /// ```rust
623 /// # #[cfg(feature = "deploy")] {
624 /// # use hydro_lang::prelude::*;
625 /// # use futures::StreamExt;
626 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
627 /// let tick = process.tick();
628 /// let optional = tick.optional_first_tick(q!("42"));
629 /// optional
630 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
631 /// .all_ticks()
632 /// # }, |mut stream| async move {
633 /// // 42
634 /// # assert_eq!(stream.next().await.unwrap(), 42);
635 /// # }));
636 /// # }
637 /// ```
638 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
639 where
640 F: Fn(T) -> Option<U> + 'a,
641 {
642 let f = f.splice_fn1_ctx(&self.location).into();
643 Optional::new(
644 self.location.clone(),
645 HydroNode::FilterMap {
646 f,
647 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
648 metadata: self
649 .location
650 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
651 },
652 )
653 }
654
655 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
656 ///
657 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
658 /// non-null. This is useful for combining several pieces of state together.
659 ///
660 /// # Example
661 /// ```rust
662 /// # #[cfg(feature = "deploy")] {
663 /// # use hydro_lang::prelude::*;
664 /// # use futures::StreamExt;
665 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
666 /// let tick = process.tick();
667 /// let numbers = process
668 /// .source_iter(q!(vec![123, 456, 789]))
669 /// .batch(&tick, nondet!(/** test */));
670 /// let min = numbers.clone().min(); // Optional
671 /// let max = numbers.max(); // Optional
672 /// min.zip(max).all_ticks()
673 /// # }, |mut stream| async move {
674 /// // [(123, 789)]
675 /// # for w in vec![(123, 789)] {
676 /// # assert_eq!(stream.next().await.unwrap(), w);
677 /// # }
678 /// # }));
679 /// # }
680 /// ```
681 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
682 where
683 B: IsBounded,
684 {
685 let other: Optional<O, L, B> = other.into();
686 check_matching_location(&self.location, &other.location);
687
688 if L::is_top_level()
689 && let Some(tick) = self.location.try_tick()
690 {
691 let self_location = self.location().clone();
692 let out = zip_inside_tick(
693 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
694 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
695 )
696 .latest();
697
698 Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
699 } else {
700 zip_inside_tick(self, other)
701 }
702 }
703
704 /// Passes through `self` when it has a value, otherwise passes through `other`.
705 ///
706 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
707 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
708 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
709 ///
710 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
711 /// of the inputs change (including to/from null states).
712 ///
713 /// # Example
714 /// ```rust
715 /// # #[cfg(feature = "deploy")] {
716 /// # use hydro_lang::prelude::*;
717 /// # use futures::StreamExt;
718 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
719 /// let tick = process.tick();
720 /// // ticks are lazy by default, forces the second tick to run
721 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
722 ///
723 /// let some_first_tick = tick.optional_first_tick(q!(123));
724 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
725 /// some_first_tick.or(some_second_tick).all_ticks()
726 /// # }, |mut stream| async move {
727 /// // [123 /* first tick */, 456 /* second tick */]
728 /// # for w in vec![123, 456] {
729 /// # assert_eq!(stream.next().await.unwrap(), w);
730 /// # }
731 /// # }));
732 /// # }
733 /// ```
734 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
735 check_matching_location(&self.location, &other.location);
736
737 if L::is_top_level()
738 && !B::BOUNDED // only if unbounded we need to use a tick
739 && let Some(tick) = self.location.try_tick()
740 {
741 let self_location = self.location().clone();
742 let out = or_inside_tick(
743 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
744 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
745 )
746 .latest();
747
748 Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
749 } else {
750 Optional::new(
751 self.location.clone(),
752 HydroNode::ChainFirst {
753 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
754 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
755 metadata: self.location.new_node_metadata(Self::collection_kind()),
756 },
757 )
758 }
759 }
760
761 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
762 ///
763 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
764 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
765 ///
766 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
767 /// of the inputs change (including to/from null states).
768 ///
769 /// # Example
770 /// ```rust
771 /// # #[cfg(feature = "deploy")] {
772 /// # use hydro_lang::prelude::*;
773 /// # use futures::StreamExt;
774 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
775 /// let tick = process.tick();
776 /// // ticks are lazy by default, forces the later ticks to run
777 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
778 ///
779 /// let some_first_tick = tick.optional_first_tick(q!(123));
780 /// some_first_tick
781 /// .unwrap_or(tick.singleton(q!(456)))
782 /// .all_ticks()
783 /// # }, |mut stream| async move {
784 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
785 /// # for w in vec![123, 456, 456, 456] {
786 /// # assert_eq!(stream.next().await.unwrap(), w);
787 /// # }
788 /// # }));
789 /// # }
790 /// ```
791 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
792 let res_option = self.or(other.into());
793 Singleton::new(
794 res_option.location.clone(),
795 HydroNode::Cast {
796 inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
797 metadata: res_option
798 .location
799 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
800 },
801 )
802 }
803
804 /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
805 ///
806 /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
807 /// [`Optional`] when the default value of the type is a suitable fallback.
808 ///
809 /// # Example
810 /// ```rust
811 /// # #[cfg(feature = "deploy")] {
812 /// # use hydro_lang::prelude::*;
813 /// # use futures::StreamExt;
814 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
815 /// let tick = process.tick();
816 /// // ticks are lazy by default, forces the later ticks to run
817 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
818 ///
819 /// let some_first_tick = tick.optional_first_tick(q!(123i32));
820 /// some_first_tick.unwrap_or_default().all_ticks()
821 /// # }, |mut stream| async move {
822 /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
823 /// # for w in vec![123, 0, 0, 0] {
824 /// # assert_eq!(stream.next().await.unwrap(), w);
825 /// # }
826 /// # }));
827 /// # }
828 /// ```
829 pub fn unwrap_or_default(self) -> Singleton<T, L, B>
830 where
831 T: Default + Clone,
832 {
833 self.into_singleton().map(q!(|v| v.unwrap_or_default()))
834 }
835
836 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
837 ///
838 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
839 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
840 /// so that Hydro can skip any computation on null values.
841 ///
842 /// # Example
843 /// ```rust
844 /// # #[cfg(feature = "deploy")] {
845 /// # use hydro_lang::prelude::*;
846 /// # use futures::StreamExt;
847 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
848 /// let tick = process.tick();
849 /// // ticks are lazy by default, forces the later ticks to run
850 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
851 ///
852 /// let some_first_tick = tick.optional_first_tick(q!(123));
853 /// some_first_tick.into_singleton().all_ticks()
854 /// # }, |mut stream| async move {
855 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
856 /// # for w in vec![Some(123), None, None, None] {
857 /// # assert_eq!(stream.next().await.unwrap(), w);
858 /// # }
859 /// # }));
860 /// # }
861 /// ```
862 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
863 where
864 T: Clone,
865 {
866 let none: syn::Expr = parse_quote!(::std::option::Option::None);
867
868 let none_singleton = Singleton::new(
869 self.location.clone(),
870 HydroNode::SingletonSource {
871 value: none.into(),
872 first_tick_only: false,
873 metadata: self
874 .location
875 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
876 },
877 );
878
879 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
880 }
881
882 /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
883 ///
884 /// # Example
885 /// ```rust
886 /// # #[cfg(feature = "deploy")] {
887 /// # use hydro_lang::prelude::*;
888 /// # use futures::StreamExt;
889 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
890 /// let tick = process.tick();
891 /// // ticks are lazy by default, forces the second tick to run
892 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
893 ///
894 /// let some_first_tick = tick.optional_first_tick(q!(42));
895 /// some_first_tick.is_some().all_ticks()
896 /// # }, |mut stream| async move {
897 /// // [true /* first tick */, false /* second tick */, ...]
898 /// # for w in vec![true, false] {
899 /// # assert_eq!(stream.next().await.unwrap(), w);
900 /// # }
901 /// # }));
902 /// # }
903 /// ```
904 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
905 pub fn is_some(self) -> Singleton<bool, L, B> {
906 self.map(q!(|_| ()))
907 .into_singleton()
908 .map(q!(|o| o.is_some()))
909 }
910
911 /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
912 ///
913 /// # Example
914 /// ```rust
915 /// # #[cfg(feature = "deploy")] {
916 /// # use hydro_lang::prelude::*;
917 /// # use futures::StreamExt;
918 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
919 /// let tick = process.tick();
920 /// // ticks are lazy by default, forces the second tick to run
921 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
922 ///
923 /// let some_first_tick = tick.optional_first_tick(q!(42));
924 /// some_first_tick.is_none().all_ticks()
925 /// # }, |mut stream| async move {
926 /// // [false /* first tick */, true /* second tick */, ...]
927 /// # for w in vec![false, true] {
928 /// # assert_eq!(stream.next().await.unwrap(), w);
929 /// # }
930 /// # }));
931 /// # }
932 /// ```
933 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
934 pub fn is_none(self) -> Singleton<bool, L, B> {
935 self.map(q!(|_| ()))
936 .into_singleton()
937 .map(q!(|o| o.is_none()))
938 }
939
940 /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
941 /// values are equal, `false` otherwise (including when either is null).
942 ///
943 /// # Example
944 /// ```rust
945 /// # #[cfg(feature = "deploy")] {
946 /// # use hydro_lang::prelude::*;
947 /// # use futures::StreamExt;
948 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
949 /// let tick = process.tick();
950 /// // ticks are lazy by default, forces the second tick to run
951 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
952 ///
953 /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
954 /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
955 /// a.is_some_and_equals(b).all_ticks()
956 /// # }, |mut stream| async move {
957 /// // [true, false]
958 /// # for w in vec![true, false] {
959 /// # assert_eq!(stream.next().await.unwrap(), w);
960 /// # }
961 /// # }));
962 /// # }
963 /// ```
964 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
965 pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
966 where
967 T: PartialEq + Clone,
968 B: IsBounded,
969 {
970 self.into_singleton()
971 .zip(other.into_singleton())
972 .map(q!(|(a, b)| a.is_some() && a == b))
973 }
974
975 /// An operator which allows you to "name" a `HydroNode`.
976 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
977 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
978 {
979 let mut node = self.ir_node.borrow_mut();
980 let metadata = node.metadata_mut();
981 metadata.tag = Some(name.to_owned());
982 }
983 self
984 }
985
986 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
987 /// implies that `B == Bounded`.
988 pub fn make_bounded(self) -> Optional<T, L, Bounded>
989 where
990 B: IsBounded,
991 {
992 Optional::new(
993 self.location.clone(),
994 self.ir_node.replace(HydroNode::Placeholder),
995 )
996 }
997
998 /// Clones this bounded optional into a tick, returning a optional that has the
999 /// same value as the outer optional. Because the outer optional is bounded, this
1000 /// is deterministic because there is only a single immutable version.
1001 pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
1002 where
1003 B: IsBounded,
1004 T: Clone,
1005 {
1006 // TODO(shadaj): avoid printing simulator logs for this snapshot
1007 let inner = self.snapshot(
1008 tick,
1009 nondet!(/** bounded top-level optional so deterministic */),
1010 );
1011 Optional::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1012 }
1013
1014 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1015 /// non-null. Otherwise, the stream is empty.
1016 ///
1017 /// # Example
1018 /// ```rust
1019 /// # #[cfg(feature = "deploy")] {
1020 /// # use hydro_lang::prelude::*;
1021 /// # use futures::StreamExt;
1022 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1023 /// # let tick = process.tick();
1024 /// # // ticks are lazy by default, forces the second tick to run
1025 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1026 /// # let batch_first_tick = process
1027 /// # .source_iter(q!(vec![]))
1028 /// # .batch(&tick, nondet!(/** test */));
1029 /// # let batch_second_tick = process
1030 /// # .source_iter(q!(vec![123, 456]))
1031 /// # .batch(&tick, nondet!(/** test */))
1032 /// # .defer_tick(); // appears on the second tick
1033 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1034 /// input_batch // first tick: [], second tick: [123, 456]
1035 /// .clone()
1036 /// .max()
1037 /// .into_stream()
1038 /// .chain(input_batch)
1039 /// .all_ticks()
1040 /// # }, |mut stream| async move {
1041 /// // [456, 123, 456]
1042 /// # for w in vec![456, 123, 456] {
1043 /// # assert_eq!(stream.next().await.unwrap(), w);
1044 /// # }
1045 /// # }));
1046 /// # }
1047 /// ```
1048 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1049 where
1050 B: IsBounded,
1051 {
1052 Stream::new(
1053 self.location.clone(),
1054 HydroNode::Cast {
1055 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1056 metadata: self.location.new_node_metadata(Stream::<
1057 T,
1058 Tick<L>,
1059 Bounded,
1060 TotalOrder,
1061 ExactlyOnce,
1062 >::collection_kind()),
1063 },
1064 )
1065 }
1066
1067 /// Filters this optional, passing through the value if the boolean signal is `true`,
1068 /// otherwise the output is null.
1069 ///
1070 /// # Example
1071 /// ```rust
1072 /// # #[cfg(feature = "deploy")] {
1073 /// # use hydro_lang::prelude::*;
1074 /// # use futures::StreamExt;
1075 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1076 /// let tick = process.tick();
1077 /// // ticks are lazy by default, forces the second tick to run
1078 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1079 ///
1080 /// let some_first_tick = tick.optional_first_tick(q!(()));
1081 /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1082 /// let batch_first_tick = process
1083 /// .source_iter(q!(vec![456]))
1084 /// .batch(&tick, nondet!(/** test */));
1085 /// let batch_second_tick = process
1086 /// .source_iter(q!(vec![789]))
1087 /// .batch(&tick, nondet!(/** test */))
1088 /// .defer_tick();
1089 /// batch_first_tick.chain(batch_second_tick).first()
1090 /// .filter_if(signal)
1091 /// .unwrap_or(tick.singleton(q!(0)))
1092 /// .all_ticks()
1093 /// # }, |mut stream| async move {
1094 /// // [456, 0]
1095 /// # for w in vec![456, 0] {
1096 /// # assert_eq!(stream.next().await.unwrap(), w);
1097 /// # }
1098 /// # }));
1099 /// # }
1100 /// ```
1101 pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1102 where
1103 B: IsBounded,
1104 {
1105 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1106 }
1107
1108 /// Filters this optional, passing through the optional value if it is non-null **and** the
1109 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1110 ///
1111 /// Useful for conditionally processing, such as only emitting an optional's value outside
1112 /// a tick if some other condition is satisfied.
1113 ///
1114 /// # Example
1115 /// ```rust
1116 /// # #[cfg(feature = "deploy")] {
1117 /// # use hydro_lang::prelude::*;
1118 /// # use futures::StreamExt;
1119 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1120 /// let tick = process.tick();
1121 /// // ticks are lazy by default, forces the second tick to run
1122 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1123 ///
1124 /// let batch_first_tick = process
1125 /// .source_iter(q!(vec![]))
1126 /// .batch(&tick, nondet!(/** test */));
1127 /// let batch_second_tick = process
1128 /// .source_iter(q!(vec![456]))
1129 /// .batch(&tick, nondet!(/** test */))
1130 /// .defer_tick(); // appears on the second tick
1131 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1132 /// batch_first_tick.chain(batch_second_tick).first()
1133 /// .filter_if_some(some_on_first_tick)
1134 /// .unwrap_or(tick.singleton(q!(789)))
1135 /// .all_ticks()
1136 /// # }, |mut stream| async move {
1137 /// // [789, 789]
1138 /// # for w in vec![789, 789] {
1139 /// # assert_eq!(stream.next().await.unwrap(), w);
1140 /// # }
1141 /// # }));
1142 /// # }
1143 /// ```
1144 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1145 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1146 where
1147 B: IsBounded,
1148 {
1149 self.filter_if(signal.is_some())
1150 }
1151
1152 /// Filters this optional, passing through the optional value if it is non-null **and** the
1153 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1154 ///
1155 /// Useful for conditionally processing, such as only emitting an optional's value outside
1156 /// a tick if some other condition is satisfied.
1157 ///
1158 /// # Example
1159 /// ```rust
1160 /// # #[cfg(feature = "deploy")] {
1161 /// # use hydro_lang::prelude::*;
1162 /// # use futures::StreamExt;
1163 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1164 /// let tick = process.tick();
1165 /// // ticks are lazy by default, forces the second tick to run
1166 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1167 ///
1168 /// let batch_first_tick = process
1169 /// .source_iter(q!(vec![]))
1170 /// .batch(&tick, nondet!(/** test */));
1171 /// let batch_second_tick = process
1172 /// .source_iter(q!(vec![456]))
1173 /// .batch(&tick, nondet!(/** test */))
1174 /// .defer_tick(); // appears on the second tick
1175 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1176 /// batch_first_tick.chain(batch_second_tick).first()
1177 /// .filter_if_none(some_on_first_tick)
1178 /// .unwrap_or(tick.singleton(q!(789)))
1179 /// .all_ticks()
1180 /// # }, |mut stream| async move {
1181 /// // [789, 789]
1182 /// # for w in vec![789, 456] {
1183 /// # assert_eq!(stream.next().await.unwrap(), w);
1184 /// # }
1185 /// # }));
1186 /// # }
1187 /// ```
1188 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1189 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1190 where
1191 B: IsBounded,
1192 {
1193 self.filter_if(other.is_none())
1194 }
1195
1196 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1197 ///
1198 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1199 /// having a value, such as only releasing a piece of state if the node is the leader.
1200 ///
1201 /// # Example
1202 /// ```rust
1203 /// # #[cfg(feature = "deploy")] {
1204 /// # use hydro_lang::prelude::*;
1205 /// # use futures::StreamExt;
1206 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1207 /// let tick = process.tick();
1208 /// // ticks are lazy by default, forces the second tick to run
1209 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1210 ///
1211 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1212 /// some_on_first_tick
1213 /// .if_some_then(tick.singleton(q!(456)))
1214 /// .unwrap_or(tick.singleton(q!(123)))
1215 /// # .all_ticks()
1216 /// # }, |mut stream| async move {
1217 /// // 456 (first tick) ~> 123 (second tick onwards)
1218 /// # for w in vec![456, 123, 123] {
1219 /// # assert_eq!(stream.next().await.unwrap(), w);
1220 /// # }
1221 /// # }));
1222 /// # }
1223 /// ```
1224 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1225 pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1226 where
1227 B: IsBounded,
1228 {
1229 value.filter_if(self.is_some())
1230 }
1231}
1232
1233impl<'a, K, V, L, B: Boundedness> Optional<(K, V), L, B>
1234where
1235 L: Location<'a>,
1236{
1237 /// Converts this optional into a [`KeyedSingleton`] containing a single entry with the
1238 /// key-value pair of this [`Optional`].
1239 ///
1240 /// If this [`Optional`] is [`Bounded`], the [`KeyedSingleton`] will be [`Bounded`] as well
1241 /// if it is [`Unbounded`], the [`KeyedSingleton`] will be [`Unbounded`], which means that
1242 /// the entry will be updated and appear / disappear according to the state of the
1243 /// [`Optional`].
1244 pub fn into_keyed_singleton(self) -> KeyedSingleton<K, V, L, B> {
1245 KeyedSingleton::new(
1246 self.location.clone(),
1247 HydroNode::Cast {
1248 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1249 metadata: self
1250 .location
1251 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1252 },
1253 )
1254 }
1255}
1256
1257impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1258where
1259 L: Location<'a>,
1260{
1261 /// Returns an optional value corresponding to the latest snapshot of the optional
1262 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1263 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1264 /// all snapshots of this optional into the atomic-associated tick will observe the
1265 /// same value each tick.
1266 ///
1267 /// # Non-Determinism
1268 /// Because this picks a snapshot of a optional whose value is continuously changing,
1269 /// the output optional has a non-deterministic value since the snapshot can be at an
1270 /// arbitrary point in time.
1271 pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1272 self,
1273 tick: &Tick<L2>,
1274 _nondet: NonDet,
1275 ) -> Optional<T, Tick<L::DropConsistency>, Bounded> {
1276 Optional::new(
1277 tick.drop_consistency(),
1278 HydroNode::Batch {
1279 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1280 metadata: tick
1281 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1282 },
1283 )
1284 }
1285
1286 /// Returns this optional back into a top-level, asynchronous execution context where updates
1287 /// to the value will be asynchronously propagated.
1288 pub fn end_atomic(self) -> Optional<T, L, B> {
1289 Optional::new(
1290 self.location.tick.l.clone(),
1291 HydroNode::EndAtomic {
1292 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1293 metadata: self
1294 .location
1295 .tick
1296 .l
1297 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1298 },
1299 )
1300 }
1301}
1302
1303impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1304where
1305 L: Location<'a>,
1306{
1307 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1308 /// will observe the same version of the value and will be executed synchronously before any
1309 /// outputs are yielded (in [`Optional::end_atomic`]).
1310 ///
1311 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1312 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1313 /// a different version).
1314 pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1315 let id = self.location.flow_state().borrow_mut().next_clock_id();
1316 let out_location = Atomic {
1317 tick: Tick {
1318 id,
1319 l: self.location.clone(),
1320 },
1321 };
1322 Optional::new(
1323 out_location.clone(),
1324 HydroNode::BeginAtomic {
1325 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1326 metadata: out_location
1327 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1328 },
1329 )
1330 }
1331
1332 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1333 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1334 /// relevant data that contributed to the snapshot at tick `t`.
1335 ///
1336 /// # Non-Determinism
1337 /// Because this picks a snapshot of a optional whose value is continuously changing,
1338 /// the output optional has a non-deterministic value since the snapshot can be at an
1339 /// arbitrary point in time.
1340 pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1341 self,
1342 tick: &Tick<L2>,
1343 _nondet: NonDet,
1344 ) -> Optional<T, Tick<L::DropConsistency>, Bounded> {
1345 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1346 Optional::new(
1347 tick.drop_consistency(),
1348 HydroNode::Batch {
1349 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1350 metadata: tick
1351 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1352 },
1353 )
1354 }
1355
1356 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1357 /// with order corresponding to increasing prefixes of data contributing to the optional.
1358 ///
1359 /// # Non-Determinism
1360 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1361 /// to non-deterministic batching and arrival of inputs, the output stream is
1362 /// non-deterministic.
1363 pub fn sample_eager(
1364 self,
1365 nondet: NonDet,
1366 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1367 let tick = self.location.tick();
1368 self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1369 }
1370
1371 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1372 /// value taken at various points in time. Because the input optional may be
1373 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1374 /// represent the value of the optional given some prefix of the streams leading up to
1375 /// it.
1376 ///
1377 /// # Non-Determinism
1378 /// The output stream is non-deterministic in which elements are sampled, since this
1379 /// is controlled by a clock.
1380 #[cfg(feature = "tokio")]
1381 pub fn sample_every(
1382 self,
1383 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1384 nondet: NonDet,
1385 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1386 where
1387 L: TopLevel<'a>,
1388 {
1389 let samples = self.location.source_interval(interval);
1390 let tick = self.location.tick();
1391
1392 self.snapshot(&tick, nondet)
1393 .filter_if(samples.batch(&tick, nondet).first().is_some())
1394 .all_ticks()
1395 .weaken_retries()
1396 }
1397}
1398
1399impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1400where
1401 L: Location<'a>,
1402{
1403 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1404 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1405 /// null values).
1406 ///
1407 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1408 /// producing one element in the output for each (non-null) tick. This is useful for batched
1409 /// computations, where the results from each tick must be combined together.
1410 ///
1411 /// # Example
1412 /// ```rust
1413 /// # #[cfg(feature = "deploy")] {
1414 /// # use hydro_lang::prelude::*;
1415 /// # use futures::StreamExt;
1416 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1417 /// # let tick = process.tick();
1418 /// # // ticks are lazy by default, forces the second tick to run
1419 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1420 /// # let batch_first_tick = process
1421 /// # .source_iter(q!(vec![]))
1422 /// # .batch(&tick, nondet!(/** test */));
1423 /// # let batch_second_tick = process
1424 /// # .source_iter(q!(vec![1, 2, 3]))
1425 /// # .batch(&tick, nondet!(/** test */))
1426 /// # .defer_tick(); // appears on the second tick
1427 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1428 /// input_batch // first tick: [], second tick: [1, 2, 3]
1429 /// .max()
1430 /// .all_ticks()
1431 /// # }, |mut stream| async move {
1432 /// // [3]
1433 /// # for w in vec![3] {
1434 /// # assert_eq!(stream.next().await.unwrap(), w);
1435 /// # }
1436 /// # }));
1437 /// # }
1438 /// ```
1439 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1440 self.into_stream().all_ticks()
1441 }
1442
1443 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1444 /// which will stream the value computed in _each_ tick as a separate stream element.
1445 ///
1446 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1447 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1448 /// optional's [`Tick`] context.
1449 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1450 self.into_stream().all_ticks_atomic()
1451 }
1452
1453 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1454 /// be asynchronously updated with the latest value of the optional inside the tick, including
1455 /// whether the optional is null or not.
1456 ///
1457 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1458 /// tick that tracks the inner value. This is useful for getting the value as of the
1459 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1460 ///
1461 /// # Example
1462 /// ```rust
1463 /// # #[cfg(feature = "deploy")] {
1464 /// # use hydro_lang::prelude::*;
1465 /// # use futures::StreamExt;
1466 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1467 /// # let tick = process.tick();
1468 /// # // ticks are lazy by default, forces the second tick to run
1469 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1470 /// # let batch_first_tick = process
1471 /// # .source_iter(q!(vec![]))
1472 /// # .batch(&tick, nondet!(/** test */));
1473 /// # let batch_second_tick = process
1474 /// # .source_iter(q!(vec![1, 2, 3]))
1475 /// # .batch(&tick, nondet!(/** test */))
1476 /// # .defer_tick(); // appears on the second tick
1477 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1478 /// input_batch // first tick: [], second tick: [1, 2, 3]
1479 /// .max()
1480 /// .latest()
1481 /// # .into_singleton()
1482 /// # .sample_eager(nondet!(/** test */))
1483 /// # }, |mut stream| async move {
1484 /// // asynchronously changes from None ~> 3
1485 /// # for w in vec![None, Some(3)] {
1486 /// # assert_eq!(stream.next().await.unwrap(), w);
1487 /// # }
1488 /// # }));
1489 /// # }
1490 /// ```
1491 pub fn latest(self) -> Optional<T, L, Unbounded> {
1492 Optional::new(
1493 self.location.outer().clone(),
1494 HydroNode::YieldConcat {
1495 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1496 metadata: self
1497 .location
1498 .outer()
1499 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1500 },
1501 )
1502 }
1503
1504 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1505 /// be updated with the latest value of the optional inside the tick.
1506 ///
1507 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1508 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1509 /// optional's [`Tick`] context.
1510 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1511 let out_location = Atomic {
1512 tick: self.location.clone(),
1513 };
1514
1515 Optional::new(
1516 out_location.clone(),
1517 HydroNode::YieldConcat {
1518 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1519 metadata: out_location
1520 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1521 },
1522 )
1523 }
1524
1525 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1526 /// always has the state of `self` at tick `T - 1`.
1527 ///
1528 /// At tick `0`, the output optional is null, since there is no previous tick.
1529 ///
1530 /// This operator enables stateful iterative processing with ticks, by sending data from one
1531 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1532 ///
1533 /// # Example
1534 /// ```rust
1535 /// # #[cfg(feature = "deploy")] {
1536 /// # use hydro_lang::prelude::*;
1537 /// # use futures::StreamExt;
1538 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1539 /// let tick = process.tick();
1540 /// // ticks are lazy by default, forces the second tick to run
1541 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1542 ///
1543 /// let batch_first_tick = process
1544 /// .source_iter(q!(vec![1, 2]))
1545 /// .batch(&tick, nondet!(/** test */));
1546 /// let batch_second_tick = process
1547 /// .source_iter(q!(vec![3, 4]))
1548 /// .batch(&tick, nondet!(/** test */))
1549 /// .defer_tick(); // appears on the second tick
1550 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1551 /// .reduce(q!(|state, v| *state += v));
1552 ///
1553 /// current_tick_sum.clone().into_singleton().zip(
1554 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1555 /// ).all_ticks()
1556 /// # }, |mut stream| async move {
1557 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1558 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1559 /// # assert_eq!(stream.next().await.unwrap(), w);
1560 /// # }
1561 /// # }));
1562 /// # }
1563 /// ```
1564 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1565 Optional::new(
1566 self.location.clone(),
1567 HydroNode::DeferTick {
1568 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1569 metadata: self.location.new_node_metadata(Self::collection_kind()),
1570 },
1571 )
1572 }
1573}
1574
1575#[cfg(test)]
1576mod tests {
1577 #[cfg(feature = "deploy")]
1578 use futures::StreamExt;
1579 #[cfg(feature = "deploy")]
1580 use hydro_deploy::Deployment;
1581 #[cfg(any(feature = "deploy", feature = "sim"))]
1582 use stageleft::q;
1583
1584 #[cfg(feature = "deploy")]
1585 use super::Optional;
1586 #[cfg(any(feature = "deploy", feature = "sim"))]
1587 use crate::compile::builder::FlowBuilder;
1588 #[cfg(any(feature = "deploy", feature = "sim"))]
1589 use crate::location::Location;
1590 #[cfg(feature = "deploy")]
1591 use crate::nondet::nondet;
1592
1593 #[cfg(feature = "deploy")]
1594 #[tokio::test]
1595 async fn optional_or_cardinality() {
1596 let mut deployment = Deployment::new();
1597
1598 let mut flow = FlowBuilder::new();
1599 let node = flow.process::<()>();
1600 let external = flow.external::<()>();
1601
1602 let node_tick = node.tick();
1603 let tick_singleton = node_tick.singleton(q!(123));
1604 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1605 let counts = tick_optional_inhabited
1606 .clone()
1607 .or(tick_optional_inhabited)
1608 .into_stream()
1609 .count()
1610 .all_ticks()
1611 .send_bincode_external(&external);
1612
1613 let nodes = flow
1614 .with_process(&node, deployment.Localhost())
1615 .with_external(&external, deployment.Localhost())
1616 .deploy(&mut deployment);
1617
1618 deployment.deploy().await.unwrap();
1619
1620 let mut external_out = nodes.connect(counts).await;
1621
1622 deployment.start().await.unwrap();
1623
1624 assert_eq!(external_out.next().await.unwrap(), 1);
1625 }
1626
1627 #[cfg(feature = "deploy")]
1628 #[tokio::test]
1629 async fn into_singleton_top_level_none_cardinality() {
1630 let mut deployment = Deployment::new();
1631
1632 let mut flow = FlowBuilder::new();
1633 let node = flow.process::<()>();
1634 let external = flow.external::<()>();
1635
1636 let node_tick = node.tick();
1637 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1638 let into_singleton = top_level_none.into_singleton();
1639
1640 let tick_driver = node.spin();
1641
1642 let counts = into_singleton
1643 .snapshot(&node_tick, nondet!(/** test */))
1644 .into_stream()
1645 .count()
1646 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1647 .map(q!(|(c, _)| c))
1648 .all_ticks()
1649 .send_bincode_external(&external);
1650
1651 let nodes = flow
1652 .with_process(&node, deployment.Localhost())
1653 .with_external(&external, deployment.Localhost())
1654 .deploy(&mut deployment);
1655
1656 deployment.deploy().await.unwrap();
1657
1658 let mut external_out = nodes.connect(counts).await;
1659
1660 deployment.start().await.unwrap();
1661
1662 assert_eq!(external_out.next().await.unwrap(), 1);
1663 assert_eq!(external_out.next().await.unwrap(), 1);
1664 assert_eq!(external_out.next().await.unwrap(), 1);
1665 }
1666
1667 #[cfg(feature = "deploy")]
1668 #[tokio::test]
1669 async fn into_singleton_unbounded_top_level_none_cardinality() {
1670 let mut deployment = Deployment::new();
1671
1672 let mut flow = FlowBuilder::new();
1673 let node = flow.process::<()>();
1674 let external = flow.external::<()>();
1675
1676 let node_tick = node.tick();
1677 let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1678 let into_singleton = top_level_none.into_singleton();
1679
1680 let tick_driver = node.spin();
1681
1682 let counts = into_singleton
1683 .snapshot(&node_tick, nondet!(/** test */))
1684 .into_stream()
1685 .count()
1686 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1687 .map(q!(|(c, _)| c))
1688 .all_ticks()
1689 .send_bincode_external(&external);
1690
1691 let nodes = flow
1692 .with_process(&node, deployment.Localhost())
1693 .with_external(&external, deployment.Localhost())
1694 .deploy(&mut deployment);
1695
1696 deployment.deploy().await.unwrap();
1697
1698 let mut external_out = nodes.connect(counts).await;
1699
1700 deployment.start().await.unwrap();
1701
1702 assert_eq!(external_out.next().await.unwrap(), 1);
1703 assert_eq!(external_out.next().await.unwrap(), 1);
1704 assert_eq!(external_out.next().await.unwrap(), 1);
1705 }
1706
1707 #[cfg(feature = "sim")]
1708 #[test]
1709 fn top_level_optional_some_into_stream_no_replay() {
1710 let mut flow = FlowBuilder::new();
1711 let node = flow.process::<()>();
1712
1713 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1714 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1715 let filtered_some = folded.filter(q!(|_| true));
1716
1717 let out_recv = filtered_some.into_stream().sim_output();
1718
1719 flow.sim().exhaustive(async || {
1720 out_recv.assert_yields_only([10]).await;
1721 });
1722 }
1723
1724 #[cfg(feature = "sim")]
1725 #[test]
1726 fn top_level_optional_none_into_stream_no_replay() {
1727 let mut flow = FlowBuilder::new();
1728 let node = flow.process::<()>();
1729
1730 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1731 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1732 let filtered_none = folded.filter(q!(|_| false));
1733
1734 let out_recv = filtered_none.into_stream().sim_output();
1735
1736 flow.sim().exhaustive(async || {
1737 out_recv.assert_yields_only([] as [i32; 0]).await;
1738 });
1739 }
1740
1741 #[cfg(feature = "deploy")]
1742 #[tokio::test]
1743 async fn test_optional_ref() {
1744 let mut deployment = Deployment::new();
1745
1746 let mut flow = FlowBuilder::new();
1747 let external = flow.external::<()>();
1748 let p1 = flow.process::<()>();
1749
1750 // Create an optional: reduce 0..5 => Some(10) (sum via reduce)
1751 let my_opt = p1.source_iter(q!(0..5i32)).reduce(q!(|a, b| *a += b));
1752
1753 let opt_ref = my_opt.by_ref();
1754
1755 // Use the optional ref in a map: unwrap_or(0) + x
1756 let out_port = p1
1757 .source_iter(q!(1..=3i32))
1758 .map(q!(|x| x + opt_ref.unwrap_or(0)))
1759 .send_bincode_external(&external);
1760
1761 let nodes = flow
1762 .with_default_optimize()
1763 .with_process(&p1, deployment.Localhost())
1764 .with_external(&external, deployment.Localhost())
1765 .deploy(&mut deployment);
1766
1767 deployment.deploy().await.unwrap();
1768
1769 let mut out_recv = nodes.connect(out_port).await;
1770
1771 deployment.start().await.unwrap();
1772
1773 let mut results = Vec::new();
1774 for _ in 0..3 {
1775 results.push(out_recv.next().await.unwrap());
1776 }
1777 results.sort();
1778 // reduce(0..5) = 10, so results should be 11, 12, 13
1779 assert_eq!(results, vec![11, 12, 13]);
1780 }
1781
1782 #[cfg(feature = "deploy")]
1783 #[tokio::test]
1784 async fn test_optional_ref_none() {
1785 let mut deployment = Deployment::new();
1786
1787 let mut flow = FlowBuilder::new();
1788 let external = flow.external::<()>();
1789 let p1 = flow.process::<()>();
1790
1791 // Create an optional from an empty source => None
1792 let my_opt = p1
1793 .source_iter(q!(std::iter::empty::<i32>()))
1794 .reduce(q!(|a, b| *a += b));
1795
1796 let opt_ref = my_opt.by_ref();
1797
1798 // Use the optional ref: should be None, so unwrap_or(99)
1799 let out_port = p1
1800 .source_iter(q!(1..=2i32))
1801 .map(q!(|x| x + opt_ref.unwrap_or(99)))
1802 .send_bincode_external(&external);
1803
1804 let nodes = flow
1805 .with_default_optimize()
1806 .with_process(&p1, deployment.Localhost())
1807 .with_external(&external, deployment.Localhost())
1808 .deploy(&mut deployment);
1809
1810 deployment.deploy().await.unwrap();
1811
1812 let mut out_recv = nodes.connect(out_port).await;
1813
1814 deployment.start().await.unwrap();
1815
1816 let mut results = Vec::new();
1817 for _ in 0..2 {
1818 results.push(out_recv.next().await.unwrap());
1819 }
1820 results.sort();
1821 // optional is None, so unwrap_or(99) => 100, 101
1822 assert_eq!(results, vec![100, 101]);
1823 }
1824
1825 #[cfg(feature = "deploy")]
1826 #[tokio::test]
1827 async fn test_optional_ref_and_consume() {
1828 let mut deployment = Deployment::new();
1829
1830 let mut flow = FlowBuilder::new();
1831 let external = flow.external::<()>();
1832 let p1 = flow.process::<()>();
1833
1834 // Use reduce to produce an Optional
1835 let my_opt = p1.source_iter(q!(0..5i32)).reduce(q!(|a, b| *a += b));
1836
1837 let opt_ref = my_opt.by_ref();
1838
1839 // Reference path
1840 let out_port_ref = p1
1841 .source_iter(q!(1..=2i32))
1842 .map(q!(|x| x + opt_ref.unwrap_or(0)))
1843 .send_bincode_external(&external);
1844
1845 let nodes = flow
1846 .with_default_optimize()
1847 .with_process(&p1, deployment.Localhost())
1848 .with_external(&external, deployment.Localhost())
1849 .deploy(&mut deployment);
1850
1851 deployment.deploy().await.unwrap();
1852
1853 let mut out_recv_ref = nodes.connect(out_port_ref).await;
1854
1855 deployment.start().await.unwrap();
1856
1857 let mut ref_results = Vec::new();
1858 for _ in 0..2 {
1859 ref_results.push(out_recv_ref.next().await.unwrap());
1860 }
1861 ref_results.sort();
1862 // reduce(0..5) = 10, so 1+10=11, 2+10=12
1863 assert_eq!(ref_results, vec![11, 12]);
1864 }
1865}