1use stageleft::{QuotedWithContext, q};
18
19#[cfg(stageleft_runtime)]
20use super::dynamic::DynLocation;
21use super::{Location, LocationId};
22use crate::compile::builder::{ClockId, FlowState};
23use crate::compile::ir::{HydroNode, HydroSource};
24#[cfg(stageleft_runtime)]
25use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
26use crate::forward_handle::{TickCycle, TickCycleHandle};
27#[cfg(feature = "tokio")]
28use crate::live_collections::Singleton;
29use crate::live_collections::boundedness::Bounded;
30use crate::live_collections::optional::Optional;
31use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
32use crate::location::TopLevel;
33#[cfg(feature = "tokio")]
34use crate::nondet::NonDet;
35use crate::nondet::nondet;
36
37#[derive(Clone)]
48pub struct Atomic<Loc> {
49 pub(crate) tick: Tick<Loc>,
50}
51
52impl<L: DynLocation> DynLocation for Atomic<L> {
53 fn dyn_id(&self) -> LocationId {
54 LocationId::Atomic(Box::new(self.tick.dyn_id()))
55 }
56
57 fn flow_state(&self) -> &FlowState {
58 self.tick.flow_state()
59 }
60
61 fn is_top_level() -> bool {
62 L::is_top_level()
63 }
64
65 fn multiversioned(&self) -> bool {
66 self.tick.multiversioned()
67 }
68
69 fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
70 L::cluster_consistency()
71 }
72}
73
74impl<'a, L> Location<'a> for Atomic<L>
75where
76 L: Location<'a>,
77{
78 type Root = L::Root;
79
80 type DropConsistency = Atomic<L::DropConsistency>;
81
82 fn consistency() -> Option<super::dynamic::ClusterConsistency> {
83 L::consistency()
84 }
85
86 fn root(&self) -> Self::Root {
87 self.tick.root()
88 }
89
90 fn drop_consistency(&self) -> Self::DropConsistency {
91 Atomic {
92 tick: self.tick.drop_consistency(),
93 }
94 }
95
96 fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
97 Atomic {
98 tick: Tick::from_drop_consistency(l2.tick),
99 }
100 }
101}
102
103pub trait DeferTick {
110 fn defer_tick(self) -> Self;
112}
113
114#[derive(Clone)]
116pub struct Tick<L> {
117 pub(crate) id: ClockId,
118 pub(crate) l: L,
120}
121
122impl<L: DynLocation> DynLocation for Tick<L> {
123 fn dyn_id(&self) -> LocationId {
124 LocationId::Tick(self.id, Box::new(self.l.dyn_id()))
125 }
126
127 fn flow_state(&self) -> &FlowState {
128 self.l.flow_state()
129 }
130
131 fn is_top_level() -> bool {
132 false
133 }
134
135 fn multiversioned(&self) -> bool {
136 self.l.multiversioned()
137 }
138
139 fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
140 L::cluster_consistency()
141 }
142}
143
144impl<'a, L> Location<'a> for Tick<L>
145where
146 L: Location<'a>,
147{
148 type Root = L::Root;
149
150 type DropConsistency = Tick<L::DropConsistency>;
151
152 fn consistency() -> Option<super::dynamic::ClusterConsistency> {
153 L::consistency()
154 }
155
156 fn root(&self) -> Self::Root {
157 self.l.root()
158 }
159
160 fn drop_consistency(&self) -> Self::DropConsistency {
161 Tick {
162 id: self.id,
163 l: self.l.drop_consistency(),
164 }
165 }
166
167 fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
168 Tick {
169 id: l2.id,
170 l: L::from_drop_consistency(l2.l),
171 }
172 }
173}
174
175impl<'a, L> Tick<L>
176where
177 L: Location<'a>,
178{
179 pub fn outer(&self) -> &L {
184 &self.l
185 }
186
187 pub fn spin_batch(
193 &self,
194 batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
195 ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
196 where
197 L: TopLevel<'a>,
198 {
199 let out = self
200 .l
201 .spin()
202 .flat_map_ordered(q!(move |_| 0..batch_size))
203 .map(q!(|_| ()));
204
205 let inner = out.batch(self, nondet!());
206 Stream::new(self.clone(), inner.ir_node.replace(HydroNode::Placeholder))
207 }
208
209 pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
228 let e = q!([]);
229 let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
230
231 let unit_optional: Optional<(), Self, Bounded> = Optional::new(
232 self.clone(),
233 HydroNode::Source {
234 source: HydroSource::Iter(e.into()),
235 metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
236 },
237 );
238
239 unit_optional.map(q!(|_| unreachable!())) }
241
242 pub fn optional_first_tick<T: Clone>(
268 &self,
269 e: impl QuotedWithContext<'a, T, Tick<L>>,
270 ) -> Optional<T, Self, Bounded> {
271 let e = e.splice_untyped_ctx(self);
272
273 Optional::new(
274 self.clone(),
275 HydroNode::SingletonSource {
276 value: e.into(),
277 first_tick_only: true,
278 metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
279 },
280 )
281 }
282
283 #[cfg(feature = "tokio")]
291 pub fn current_tick_instant(
292 &self,
293 _nondet: NonDet,
294 ) -> Singleton<tokio::time::Instant, Tick<L::DropConsistency>, Bounded>
295 where
296 Self: Sized,
297 {
298 self.singleton(q!(tokio::time::Instant::now()))
300 }
301
302 #[expect(
311 private_bounds,
312 reason = "only Hydro collections can implement ReceiverComplete"
313 )]
314 pub fn cycle<S, L2: Location<'a, DropConsistency = Tick<L::DropConsistency>>>(
315 &self,
316 ) -> (TickCycleHandle<'a, S>, S)
317 where
318 S: CycleCollection<'a, TickCycle, Location = L2> + DeferTick,
319 {
320 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
321 (
322 TickCycleHandle::new(cycle_id, Location::id(self)),
323 S::create_source(cycle_id, self.clone().with_consistency_of()).defer_tick(),
324 )
325 }
326
327 #[expect(
334 private_bounds,
335 reason = "only Hydro collections can implement ReceiverComplete"
336 )]
337 pub fn cycle_with_initial<S, L2: Location<'a, DropConsistency = Tick<L::DropConsistency>>>(
338 &self,
339 initial: S,
340 ) -> (TickCycleHandle<'a, S>, S)
341 where
342 S: CycleCollectionWithInitial<'a, TickCycle, Location = L2>,
343 {
344 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
345 (
346 TickCycleHandle::new(cycle_id, Location::id(self)),
347 S::create_source_with_initial(cycle_id, initial, self.clone().with_consistency_of()),
349 )
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 #[cfg(feature = "sim")]
356 use stageleft::q;
357
358 #[cfg(feature = "sim")]
359 use crate::live_collections::sliced::sliced;
360 #[cfg(feature = "sim")]
361 use crate::location::Location;
362 #[cfg(feature = "sim")]
363 use crate::nondet::nondet;
364 #[cfg(feature = "sim")]
365 use crate::prelude::FlowBuilder;
366
367 #[cfg(feature = "sim")]
368 #[test]
369 fn sim_atomic_stream() {
370 let mut flow = FlowBuilder::new();
371 let node = flow.process::<()>();
372
373 let (write_send, write_req) = node.sim_input();
374 let (read_send, read_req) = node.sim_input::<(), _, _>();
375
376 let atomic_write = write_req.atomic();
377 let current_state = atomic_write.clone().fold(
378 q!(|| 0),
379 q!(|state: &mut i32, v: i32| {
380 *state += v;
381 }),
382 );
383
384 let write_ack_recv = atomic_write.end_atomic().sim_output();
385 let read_response_recv = sliced! {
386 let batch_of_req = use(read_req, nondet!());
387 let latest_singleton = use::atomic(current_state, nondet!());
388 batch_of_req.cross_singleton(latest_singleton)
389 }
390 .sim_output();
391
392 let sim_compiled = flow.sim().compiled();
393 let instances = sim_compiled.exhaustive(async || {
394 write_send.send(1);
395 write_ack_recv.assert_yields([1]).await;
396 read_send.send(());
397 assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
398 });
399
400 assert_eq!(instances, 1);
401
402 let instances_read_before_write = sim_compiled.exhaustive(async || {
403 write_send.send(1);
404 read_send.send(());
405 write_ack_recv.assert_yields([1]).await;
406 let _ = read_response_recv.next().await;
407 });
408
409 assert_eq!(instances_read_before_write, 3); }
411
412 #[cfg(feature = "sim")]
413 #[test]
414 #[should_panic]
415 fn sim_non_atomic_stream() {
416 let mut flow = FlowBuilder::new();
418 let node = flow.process::<()>();
419
420 let (write_send, write_req) = node.sim_input();
421 let (read_send, read_req) = node.sim_input::<(), _, _>();
422
423 let current_state = write_req.clone().fold(
424 q!(|| 0),
425 q!(|state: &mut i32, v: i32| {
426 *state += v;
427 }),
428 );
429
430 let write_ack_recv = write_req.sim_output();
431
432 let read_response_recv = sliced! {
433 let batch_of_req = use(read_req, nondet!());
434 let latest_singleton = use(current_state, nondet!());
435 batch_of_req.cross_singleton(latest_singleton)
436 }
437 .sim_output();
438
439 flow.sim().exhaustive(async || {
440 write_send.send(1);
441 write_ack_recv.assert_yields([1]).await;
442 read_send.send(());
443
444 if let Some((_, v)) = read_response_recv.next().await {
445 assert_eq!(v, 1);
446 }
447 });
448 }
449}