Skip to main content

hydro_lang/location/
tick.rs

1//! Clock domains for batching streaming data into discrete time steps.
2//!
3//! In Hydro, a [`Tick`] represents a logical clock that can be used to batch
4//! unbounded streaming data into discrete, bounded time steps. This is essential
5//! for implementing iterative algorithms, synchronizing data across multiple
6//! streams, and performing aggregations over windows of data.
7//!
8//! A tick is created from a top-level location (such as [`super::Process`] or [`super::Cluster`])
9//! using [`Location::tick`]. Once inside a tick, bounded live collections can be
10//! manipulated with operations like fold, reduce, and cross-product, and the
11//! results can be emitted back to the unbounded stream using methods like
12//! `all_ticks()`.
13//!
14//! The [`Atomic`] wrapper provides atomicity guarantees within a tick, ensuring
15//! that reads and writes within a tick are serialized.
16
17use stageleft::{QuotedWithContext, q};
18
19#[cfg(stageleft_runtime)]
20use super::dynamic::DynLocation;
21use super::{Location, LocationId};
22use crate::compile::builder::{ClockId, FlowState};
23use crate::compile::ir::{HydroNode, HydroSource};
24#[cfg(stageleft_runtime)]
25use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
26use crate::forward_handle::{TickCycle, TickCycleHandle};
27#[cfg(feature = "tokio")]
28use crate::live_collections::Singleton;
29use crate::live_collections::boundedness::Bounded;
30use crate::live_collections::optional::Optional;
31use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
32use crate::location::TopLevel;
33#[cfg(feature = "tokio")]
34use crate::nondet::NonDet;
35use crate::nondet::nondet;
36
37/// A location wrapper that provides atomicity guarantees within a [`Tick`].
38///
39/// An `Atomic` context establishes a happens-before relationship between operations:
40/// - Downstream computations from `atomic()` are associated with an internal tick
41/// - Outputs from `end_atomic()` are held until all computations in the tick complete
42/// - Snapshots via `use::atomic` are guaranteed to reflect all updates from associated `end_atomic()`
43///
44/// This ensures read-after-write consistency: if a client receives an acknowledgement
45/// from `end_atomic()`, any subsequent `use::atomic` snapshot will include the effects
46/// of that acknowledged operation.
47#[derive(Clone)]
48pub struct Atomic<Loc> {
49    pub(crate) tick: Tick<Loc>,
50}
51
52impl<L: DynLocation> DynLocation for Atomic<L> {
53    fn dyn_id(&self) -> LocationId {
54        LocationId::Atomic(Box::new(self.tick.dyn_id()))
55    }
56
57    fn flow_state(&self) -> &FlowState {
58        self.tick.flow_state()
59    }
60
61    fn is_top_level() -> bool {
62        L::is_top_level()
63    }
64
65    fn multiversioned(&self) -> bool {
66        self.tick.multiversioned()
67    }
68
69    fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
70        L::cluster_consistency()
71    }
72}
73
74impl<'a, L> Location<'a> for Atomic<L>
75where
76    L: Location<'a>,
77{
78    type Root = L::Root;
79
80    type DropConsistency = Atomic<L::DropConsistency>;
81
82    fn consistency() -> Option<super::dynamic::ClusterConsistency> {
83        L::consistency()
84    }
85
86    fn root(&self) -> Self::Root {
87        self.tick.root()
88    }
89
90    fn drop_consistency(&self) -> Self::DropConsistency {
91        Atomic {
92            tick: self.tick.drop_consistency(),
93        }
94    }
95
96    fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
97        Atomic {
98            tick: Tick::from_drop_consistency(l2.tick),
99        }
100    }
101}
102
103/// Trait for live collections that can be deferred by one tick.
104///
105/// When a collection implements `DeferTick`, calling `defer_tick` delays its
106/// values by one clock cycle. This is primarily used internally to implement
107/// tick-based cycles ([`Tick::cycle`]), ensuring that feedback loops advance
108/// by one tick to avoid infinite recursion within a single tick.
109pub trait DeferTick {
110    /// Returns a new collection whose values are delayed by one tick.
111    fn defer_tick(self) -> Self;
112}
113
114/// Marks the stream as being inside the single global clock domain.
115#[derive(Clone)]
116pub struct Tick<L> {
117    pub(crate) id: ClockId,
118    /// Location.
119    pub(crate) l: L,
120}
121
122impl<L: DynLocation> DynLocation for Tick<L> {
123    fn dyn_id(&self) -> LocationId {
124        LocationId::Tick(self.id, Box::new(self.l.dyn_id()))
125    }
126
127    fn flow_state(&self) -> &FlowState {
128        self.l.flow_state()
129    }
130
131    fn is_top_level() -> bool {
132        false
133    }
134
135    fn multiversioned(&self) -> bool {
136        self.l.multiversioned()
137    }
138
139    fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
140        L::cluster_consistency()
141    }
142}
143
144impl<'a, L> Location<'a> for Tick<L>
145where
146    L: Location<'a>,
147{
148    type Root = L::Root;
149
150    type DropConsistency = Tick<L::DropConsistency>;
151
152    fn consistency() -> Option<super::dynamic::ClusterConsistency> {
153        L::consistency()
154    }
155
156    fn root(&self) -> Self::Root {
157        self.l.root()
158    }
159
160    fn drop_consistency(&self) -> Self::DropConsistency {
161        Tick {
162            id: self.id,
163            l: self.l.drop_consistency(),
164        }
165    }
166
167    fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
168        Tick {
169            id: l2.id,
170            l: L::from_drop_consistency(l2.l),
171        }
172    }
173}
174
175impl<'a, L> Tick<L>
176where
177    L: Location<'a>,
178{
179    /// Returns a reference to the outer (parent) location that this tick is nested within.
180    ///
181    /// For example, if a `Tick` was created from a `Process`, this returns a reference
182    /// to that `Process`.
183    pub fn outer(&self) -> &L {
184        &self.l
185    }
186
187    /// Creates a bounded stream of `()` values inside this tick, with a fixed batch size.
188    ///
189    /// This is useful for driving computations inside a tick that need to process
190    /// a specific number of elements per tick. Each tick will produce exactly
191    /// `batch_size` unit values.
192    pub fn spin_batch(
193        &self,
194        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
195    ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
196    where
197        L: TopLevel<'a>,
198    {
199        let out = self
200            .l
201            .spin()
202            .flat_map_ordered(q!(move |_| 0..batch_size))
203            .map(q!(|_| ()));
204
205        let inner = out.batch(self, nondet!(/** at runtime, `spin` produces a single value per tick, so each batch is guaranteed to be the same size. */));
206        Stream::new(self.clone(), inner.ir_node.replace(HydroNode::Placeholder))
207    }
208
209    /// Creates an [`Optional`] which has a null value on every tick.
210    ///
211    /// # Example
212    /// ```rust
213    /// # #[cfg(feature = "deploy")] {
214    /// # use hydro_lang::prelude::*;
215    /// # use futures::StreamExt;
216    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
217    /// let tick = process.tick();
218    /// let optional = tick.none::<i32>();
219    /// optional.unwrap_or(tick.singleton(q!(123)))
220    /// # .all_ticks()
221    /// # }, |mut stream| async move {
222    /// // 123
223    /// # assert_eq!(stream.next().await.unwrap(), 123);
224    /// # }));
225    /// # }
226    /// ```
227    pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
228        let e = q!([]);
229        let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
230
231        let unit_optional: Optional<(), Self, Bounded> = Optional::new(
232            self.clone(),
233            HydroNode::Source {
234                source: HydroSource::Iter(e.into()),
235                metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
236            },
237        );
238
239        unit_optional.map(q!(|_| unreachable!())) // always empty
240    }
241
242    /// Creates an [`Optional`] which will have the provided static value on the first tick, and be
243    /// null on all subsequent ticks.
244    ///
245    /// This is useful for bootstrapping stateful computations which need an initial value.
246    ///
247    /// # Example
248    /// ```rust
249    /// # #[cfg(feature = "deploy")] {
250    /// # use hydro_lang::prelude::*;
251    /// # use futures::StreamExt;
252    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
253    /// let tick = process.tick();
254    /// // ticks are lazy by default, forces the second tick to run
255    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
256    /// let optional = tick.optional_first_tick(q!(5));
257    /// optional.unwrap_or(tick.singleton(q!(123))).all_ticks()
258    /// # }, |mut stream| async move {
259    /// // 5, 123, 123, 123, ...
260    /// # assert_eq!(stream.next().await.unwrap(), 5);
261    /// # assert_eq!(stream.next().await.unwrap(), 123);
262    /// # assert_eq!(stream.next().await.unwrap(), 123);
263    /// # assert_eq!(stream.next().await.unwrap(), 123);
264    /// # }));
265    /// # }
266    /// ```
267    pub fn optional_first_tick<T: Clone>(
268        &self,
269        e: impl QuotedWithContext<'a, T, Tick<L>>,
270    ) -> Optional<T, Self, Bounded> {
271        let e = e.splice_untyped_ctx(self);
272
273        Optional::new(
274            self.clone(),
275            HydroNode::SingletonSource {
276                value: e.into(),
277                first_tick_only: true,
278                metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
279            },
280        )
281    }
282
283    /// Returns the current wall-clock time as a [`Singleton`] containing a
284    /// [`tokio::time::Instant`].
285    ///
286    /// # Non-Determinism
287    /// Reading wall-clock time is inherently non-deterministic because the
288    /// value depends on when the tick executes. A [`NonDet`] guard is required
289    /// to acknowledge this.
290    #[cfg(feature = "tokio")]
291    pub fn current_tick_instant(
292        &self,
293        _nondet: NonDet,
294    ) -> Singleton<tokio::time::Instant, Tick<L::DropConsistency>, Bounded>
295    where
296        Self: Sized,
297    {
298        // TODO(shadaj): this is a simulator hole, should be reported as unsupported until it is
299        self.singleton(q!(tokio::time::Instant::now()))
300    }
301
302    /// Creates a feedback cycle within this tick for implementing iterative computations.
303    ///
304    /// Returns a handle that must be completed with the actual collection, and a placeholder
305    /// collection that represents the output of the previous tick (deferred by one tick).
306    /// This is useful for implementing fixed-point computations where the output of one
307    /// tick feeds into the input of the next.
308    ///
309    /// The cycle automatically defers values by one tick to prevent infinite recursion.
310    #[expect(
311        private_bounds,
312        reason = "only Hydro collections can implement ReceiverComplete"
313    )]
314    pub fn cycle<S, L2: Location<'a, DropConsistency = Tick<L::DropConsistency>>>(
315        &self,
316    ) -> (TickCycleHandle<'a, S>, S)
317    where
318        S: CycleCollection<'a, TickCycle, Location = L2> + DeferTick,
319    {
320        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
321        (
322            TickCycleHandle::new(cycle_id, Location::id(self)),
323            S::create_source(cycle_id, self.clone().with_consistency_of()).defer_tick(),
324        )
325    }
326
327    /// Creates a feedback cycle with an initial value for the first tick.
328    ///
329    /// Similar to [`Tick::cycle`], but allows providing an initial collection
330    /// that will be used as the value on the first tick before any feedback
331    /// is available. This is useful for bootstrapping iterative computations
332    /// that need a starting state.
333    #[expect(
334        private_bounds,
335        reason = "only Hydro collections can implement ReceiverComplete"
336    )]
337    pub fn cycle_with_initial<S, L2: Location<'a, DropConsistency = Tick<L::DropConsistency>>>(
338        &self,
339        initial: S,
340    ) -> (TickCycleHandle<'a, S>, S)
341    where
342        S: CycleCollectionWithInitial<'a, TickCycle, Location = L2>,
343    {
344        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
345        (
346            TickCycleHandle::new(cycle_id, Location::id(self)),
347            // no need to defer_tick, create_source_with_initial does it for us
348            S::create_source_with_initial(cycle_id, initial, self.clone().with_consistency_of()),
349        )
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    #[cfg(feature = "sim")]
356    use stageleft::q;
357
358    #[cfg(feature = "sim")]
359    use crate::live_collections::sliced::sliced;
360    #[cfg(feature = "sim")]
361    use crate::location::Location;
362    #[cfg(feature = "sim")]
363    use crate::nondet::nondet;
364    #[cfg(feature = "sim")]
365    use crate::prelude::FlowBuilder;
366
367    #[cfg(feature = "sim")]
368    #[test]
369    fn sim_atomic_stream() {
370        let mut flow = FlowBuilder::new();
371        let node = flow.process::<()>();
372
373        let (write_send, write_req) = node.sim_input();
374        let (read_send, read_req) = node.sim_input::<(), _, _>();
375
376        let atomic_write = write_req.atomic();
377        let current_state = atomic_write.clone().fold(
378            q!(|| 0),
379            q!(|state: &mut i32, v: i32| {
380                *state += v;
381            }),
382        );
383
384        let write_ack_recv = atomic_write.end_atomic().sim_output();
385        let read_response_recv = sliced! {
386            let batch_of_req = use(read_req, nondet!(/** test */));
387            let latest_singleton = use::atomic(current_state, nondet!(/** test */));
388            batch_of_req.cross_singleton(latest_singleton)
389        }
390        .sim_output();
391
392        let sim_compiled = flow.sim().compiled();
393        let instances = sim_compiled.exhaustive(async || {
394            write_send.send(1);
395            write_ack_recv.assert_yields([1]).await;
396            read_send.send(());
397            assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
398        });
399
400        assert_eq!(instances, 1);
401
402        let instances_read_before_write = sim_compiled.exhaustive(async || {
403            write_send.send(1);
404            read_send.send(());
405            write_ack_recv.assert_yields([1]).await;
406            let _ = read_response_recv.next().await;
407        });
408
409        assert_eq!(instances_read_before_write, 3); // read before write, write before read, both in same tick
410    }
411
412    #[cfg(feature = "sim")]
413    #[test]
414    #[should_panic]
415    fn sim_non_atomic_stream() {
416        // shows that atomic is necessary
417        let mut flow = FlowBuilder::new();
418        let node = flow.process::<()>();
419
420        let (write_send, write_req) = node.sim_input();
421        let (read_send, read_req) = node.sim_input::<(), _, _>();
422
423        let current_state = write_req.clone().fold(
424            q!(|| 0),
425            q!(|state: &mut i32, v: i32| {
426                *state += v;
427            }),
428        );
429
430        let write_ack_recv = write_req.sim_output();
431
432        let read_response_recv = sliced! {
433            let batch_of_req = use(read_req, nondet!(/** test */));
434            let latest_singleton = use(current_state, nondet!(/** test */));
435            batch_of_req.cross_singleton(latest_singleton)
436        }
437        .sim_output();
438
439        flow.sim().exhaustive(async || {
440            write_send.send(1);
441            write_ack_recv.assert_yields([1]).await;
442            read_send.send(());
443
444            if let Some((_, v)) = read_response_recv.next().await {
445                assert_eq!(v, 1);
446            }
447        });
448    }
449}