hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, MinOrder, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::{KeyedSingleton, MonotonicKeys};
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19use crate::location::cluster::{ClusterIds, Consistency, EventualConsistency, NoConsistency};
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::DynLocation;
22use crate::location::external_process::ExternalBincodeStream;
23use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, Process};
24use crate::networking::{NetworkFor, TCP};
25use crate::nondet::{NonDet, nondet};
26use crate::properties::manual_proof;
27#[cfg(feature = "sim")]
28use crate::sim::SimReceiver;
29use crate::staging_util::get_this_crate;
30
31// same as the one in `hydro_std`, but internal use only
32fn track_membership<'a, C, L: Location<'a>>(
33 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
34) -> KeyedSingleton<MemberId<C>, bool, L, MonotonicKeys> {
35 membership.fold(
36 q!(|| false),
37 q!(|present, event| {
38 match event {
39 MembershipEvent::Joined => *present = true,
40 MembershipEvent::Left => *present = false,
41 }
42 }),
43 )
44}
45
46fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
47 let root = get_this_crate();
48
49 if is_demux {
50 parse_quote! {
51 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
52 |(id, data)| {
53 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
54 }
55 )
56 }
57 } else {
58 parse_quote! {
59 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
60 |data| {
61 #root::runtime_support::bincode::serialize(&data).unwrap().into()
62 }
63 )
64 }
65 }
66}
67
68pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
69 serialize_bincode_with_type(is_demux, "e_type::<T>())
70}
71
72fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
73 let root = get_this_crate();
74 if let Some(c_type) = tagged {
75 parse_quote! {
76 |res| {
77 let (id, b) = res.unwrap();
78 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
79 }
80 }
81 } else {
82 parse_quote! {
83 |res| {
84 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
85 }
86 }
87 }
88}
89
90pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
91 deserialize_bincode_with_type(tagged, "e_type::<T>())
92}
93
94impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
95 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
96 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
97 /// using [`bincode`] to serialize/deserialize messages.
98 ///
99 /// The returned stream captures the elements received at the destination, where values will
100 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
101 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
102 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
103 /// dropped no further messages will be sent.
104 ///
105 /// # Example
106 /// ```rust
107 /// # #[cfg(feature = "deploy")] {
108 /// # use hydro_lang::prelude::*;
109 /// # use futures::StreamExt;
110 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
111 /// let p1 = flow.process::<()>();
112 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
113 /// let p2 = flow.process::<()>();
114 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
115 /// // 1, 2, 3
116 /// # on_p2.send_bincode(&p_out)
117 /// # }, |mut stream| async move {
118 /// # for w in 1..=3 {
119 /// # assert_eq!(stream.next().await, Some(w));
120 /// # }
121 /// # }));
122 /// # }
123 /// ```
124 pub fn send_bincode<L2>(
125 self,
126 other: &Process<'a, L2>,
127 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
128 where
129 T: Serialize + DeserializeOwned,
130 {
131 self.send(other, TCP.fail_stop().bincode())
132 }
133
134 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
135 /// using the configuration in `via` to set up the message transport.
136 ///
137 /// The returned stream captures the elements received at the destination, where values will
138 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
139 /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
140 /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
141 /// dropped no further messages will be sent.
142 ///
143 /// # Example
144 /// ```rust
145 /// # #[cfg(feature = "deploy")] {
146 /// # use hydro_lang::prelude::*;
147 /// # use futures::StreamExt;
148 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
149 /// let p1 = flow.process::<()>();
150 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
151 /// let p2 = flow.process::<()>();
152 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
153 /// // 1, 2, 3
154 /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
155 /// # }, |mut stream| async move {
156 /// # for w in 1..=3 {
157 /// # assert_eq!(stream.next().await, Some(w));
158 /// # }
159 /// # }));
160 /// # }
161 /// ```
162 pub fn send<L2, N: NetworkFor<T>>(
163 self,
164 to: &Process<'a, L2>,
165 via: N,
166 ) -> Stream<T, Process<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
167 where
168 T: Serialize + DeserializeOwned,
169 O: MinOrder<N::OrderingGuarantee>,
170 {
171 let serialize_pipeline = Some(N::serialize_thunk(false));
172 let deserialize_pipeline = Some(N::deserialize_thunk(None));
173
174 let name = via.name();
175 if to.multiversioned() && name.is_none() {
176 panic!(
177 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
178 );
179 }
180
181 Stream::new(
182 to.clone(),
183 HydroNode::Network {
184 name: name.map(ToOwned::to_owned),
185 networking_info: N::networking_info(),
186 serialize_fn: serialize_pipeline.map(|e| e.into()),
187 instantiate_fn: DebugInstantiate::Building,
188 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
189 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
190 metadata: to.new_node_metadata(Stream::<
191 T,
192 Process<'a, L2>,
193 Unbounded,
194 <O as MinOrder<N::OrderingGuarantee>>::Min,
195 R,
196 >::collection_kind()),
197 },
198 )
199 }
200
201 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
202 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
203 /// using [`bincode`] to serialize/deserialize messages.
204 ///
205 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
206 /// membership information. This is a common pattern in distributed systems for broadcasting data to
207 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
208 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
209 /// each element to all cluster members.
210 ///
211 /// # Non-Determinism
212 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
213 /// to the current cluster members _at that point in time_. Depending on when we are notified of
214 /// membership changes, we will broadcast each element to different members.
215 ///
216 /// # Example
217 /// ```rust
218 /// # #[cfg(feature = "deploy")] {
219 /// # use hydro_lang::prelude::*;
220 /// # use futures::StreamExt;
221 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
222 /// let p1 = flow.process::<()>();
223 /// let workers: Cluster<()> = flow.cluster::<()>();
224 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
225 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
226 /// # on_worker.send_bincode(&p2).entries()
227 /// // if there are 4 members in the cluster, each receives one element
228 /// // - MemberId::<()>(0): [123]
229 /// // - MemberId::<()>(1): [123]
230 /// // - MemberId::<()>(2): [123]
231 /// // - MemberId::<()>(3): [123]
232 /// # }, |mut stream| async move {
233 /// # let mut results = Vec::new();
234 /// # for w in 0..4 {
235 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
236 /// # }
237 /// # results.sort();
238 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
239 /// # }));
240 /// # }
241 /// ```
242 pub fn broadcast_bincode<L2: 'a>(
243 self,
244 other: &Cluster<'a, L2>,
245 nondet_membership: NonDet,
246 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
247 where
248 T: Clone + Serialize + DeserializeOwned,
249 {
250 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
251 }
252
253 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
254 /// using the configuration in `via` to set up the message transport.
255 ///
256 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
257 /// membership information. This is a common pattern in distributed systems for broadcasting data to
258 /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
259 /// target specific members, `broadcast` takes a stream of **only data elements** and sends
260 /// each element to all cluster members.
261 ///
262 /// # Non-Determinism
263 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
264 /// to the current cluster members _at that point in time_. Depending on when we are notified of
265 /// membership changes, we will broadcast each element to different members.
266 ///
267 /// # Example
268 /// ```rust
269 /// # #[cfg(feature = "deploy")] {
270 /// # use hydro_lang::prelude::*;
271 /// # use futures::StreamExt;
272 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
273 /// let p1 = flow.process::<()>();
274 /// let workers: Cluster<()> = flow.cluster::<()>();
275 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
276 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
277 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
278 /// // if there are 4 members in the cluster, each receives one element
279 /// // - MemberId::<()>(0): [123]
280 /// // - MemberId::<()>(1): [123]
281 /// // - MemberId::<()>(2): [123]
282 /// // - MemberId::<()>(3): [123]
283 /// # }, |mut stream| async move {
284 /// # let mut results = Vec::new();
285 /// # for w in 0..4 {
286 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
287 /// # }
288 /// # results.sort();
289 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
290 /// # }));
291 /// # }
292 /// ```
293 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
294 self,
295 to: &Cluster<'a, L2>,
296 via: N,
297 nondet_membership: NonDet,
298 ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
299 where
300 T: Clone + Serialize + DeserializeOwned,
301 O: MinOrder<N::OrderingGuarantee>,
302 {
303 let ids = track_membership(self.location.source_cluster_membership_stream(
304 to,
305 nondet!(/** dropped prefixes don't affect broadcast */),
306 ));
307 sliced! {
308 let members_snapshot = use(ids, nondet_membership);
309 let elements = use(self, nondet_membership);
310
311 let current_members = members_snapshot.filter(q!(|b| *b));
312 elements.repeat_with_keys(current_members)
313 }
314 .demux(to, via)
315 }
316
317 /// Broadcasts elements of this stream to all members of a cluster,
318 /// assuming membership is closed (fixed at deploy time).
319 ///
320 /// Unlike [`Stream::broadcast`], this does not require a [`NonDet`] guard.
321 /// The membership set is obtained from deploy metadata via
322 /// [`ClusterIds`], producing a
323 /// `Bounded` stream. The cross-product of data × members is fully
324 /// deterministic.
325 ///
326 /// This is only available in deployment targets with static cluster
327 /// membership (legacy Hydro Deploy and simulation). There are no late
328 /// joiners in that context, so broadcast receivers are guaranteed to
329 /// get data from the start of the stream. On dynamic targets
330 /// (e.g. ECS), use [`Stream::broadcast`] instead.
331 ///
332 /// # Example
333 /// ```rust
334 /// # #[cfg(feature = "deploy")] {
335 /// # use hydro_lang::prelude::*;
336 /// # use futures::StreamExt;
337 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
338 /// let p1 = flow.process::<()>();
339 /// let workers: Cluster<()> = flow.cluster::<()>();
340 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
341 /// let on_worker = numbers.broadcast_closed(&workers, TCP.fail_stop().bincode());
342 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
343 /// // each of the 4 cluster members receives 123
344 /// # }, |mut stream| async move {
345 /// # let mut results = Vec::new();
346 /// # for _ in 0..4 {
347 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
348 /// # }
349 /// # results.sort();
350 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
351 /// # }));
352 /// # }
353 /// ```
354 pub fn broadcast_closed<L2: 'a, N: NetworkFor<T>>(
355 self,
356 to: &Cluster<'a, L2>,
357 via: N,
358 ) -> Stream<
359 T,
360 Cluster<'a, L2, EventualConsistency>,
361 Unbounded,
362 <O as MinOrder<N::OrderingGuarantee>>::Min,
363 R,
364 >
365 where
366 T: Clone + Serialize + DeserializeOwned,
367 O: MinOrder<N::OrderingGuarantee>,
368 {
369 let cluster_ids = ClusterIds {
370 key: to.key,
371 _phantom: PhantomData,
372 };
373 let member_ids = self.location.source_iter(q!(cluster_ids
374 .iter()
375 .map(|id| MemberId::from_tagless(id.clone()))));
376
377 // Late joiners will receive no data from this broadcast, which is
378 // future-monotone and eventually consistent (a safe under-approximation).
379 self.cross_product(member_ids)
380 .map(q!(|(data, member_id)| (member_id, data)))
381 .into_keyed()
382 .demux(to, via)
383 .assert_has_consistency_of_trusted(manual_proof!(/** closed broadcast will materialze the same elements on each member */))
384 }
385
386 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
387 /// serialization. The external process can receive these elements by establishing a TCP
388 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
389 ///
390 /// # Example
391 /// ```rust
392 /// # #[cfg(feature = "deploy")] {
393 /// # use hydro_lang::prelude::*;
394 /// # use futures::StreamExt;
395 /// # tokio_test::block_on(async move {
396 /// let mut flow = FlowBuilder::new();
397 /// let process = flow.process::<()>();
398 /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
399 /// let external = flow.external::<()>();
400 /// let external_handle = numbers.send_bincode_external(&external);
401 ///
402 /// let mut deployment = hydro_deploy::Deployment::new();
403 /// let nodes = flow
404 /// .with_process(&process, deployment.Localhost())
405 /// .with_external(&external, deployment.Localhost())
406 /// .deploy(&mut deployment);
407 ///
408 /// deployment.deploy().await.unwrap();
409 /// // establish the TCP connection
410 /// let mut external_recv_stream = nodes.connect(external_handle).await;
411 /// deployment.start().await.unwrap();
412 ///
413 /// for w in 1..=3 {
414 /// assert_eq!(external_recv_stream.next().await, Some(w));
415 /// }
416 /// # });
417 /// # }
418 /// ```
419 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
420 where
421 T: Serialize + DeserializeOwned,
422 {
423 let serialize_pipeline = Some(serialize_bincode::<T>(false));
424
425 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
426
427 let external_port_id = flow_state_borrow.next_external_port();
428
429 flow_state_borrow.push_root(HydroRoot::SendExternal {
430 to_external_key: other.key,
431 to_port_id: external_port_id,
432 to_many: false,
433 unpaired: true,
434 serialize_fn: serialize_pipeline.map(|e| e.into()),
435 instantiate_fn: DebugInstantiate::Building,
436 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
437 op_metadata: HydroIrOpMetadata::new(),
438 });
439
440 ExternalBincodeStream {
441 process_key: other.key,
442 port_id: external_port_id,
443 _phantom: PhantomData,
444 }
445 }
446
447 #[cfg(feature = "sim")]
448 /// Sets up a simulation output port for this stream, allowing test code to receive elements
449 /// sent to this stream during simulation.
450 pub fn sim_output(self) -> SimReceiver<T, O, R>
451 where
452 T: Serialize + DeserializeOwned,
453 {
454 let external_location: External<'a, ()> = External {
455 key: LocationKey::FIRST,
456 flow_state: self.location.flow_state().clone(),
457 _phantom: PhantomData,
458 };
459
460 let external = self.send_bincode_external(&external_location);
461
462 SimReceiver(external.port_id, PhantomData)
463 }
464}
465
466impl<'a, T, L: Location<'a>, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
467 /// Creates an external output for embedded deployment mode.
468 ///
469 /// The `name` parameter specifies the name of the field in the generated
470 /// `EmbeddedOutputs` struct that will receive elements from this stream.
471 /// The generated function will accept an `EmbeddedOutputs` struct with an
472 /// `impl FnMut(T)` field with this name.
473 pub fn embedded_output(self, name: impl Into<String>) {
474 let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
475
476 self.location
477 .flow_state()
478 .borrow_mut()
479 .push_root(HydroRoot::EmbeddedOutput {
480 ident,
481 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
482 op_metadata: HydroIrOpMetadata::new(),
483 });
484 }
485}
486
487impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
488 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
489{
490 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
491 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
492 /// using [`bincode`] to serialize/deserialize messages.
493 ///
494 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
495 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
496 /// this API allows precise targeting of specific cluster members rather than broadcasting to
497 /// all members.
498 ///
499 /// # Example
500 /// ```rust
501 /// # #[cfg(feature = "deploy")] {
502 /// # use hydro_lang::prelude::*;
503 /// # use futures::StreamExt;
504 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
505 /// let p1 = flow.process::<()>();
506 /// let workers: Cluster<()> = flow.cluster::<()>();
507 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
508 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
509 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
510 /// .demux_bincode(&workers);
511 /// # on_worker.send_bincode(&p2).entries()
512 /// // if there are 4 members in the cluster, each receives one element
513 /// // - MemberId::<()>(0): [0]
514 /// // - MemberId::<()>(1): [1]
515 /// // - MemberId::<()>(2): [2]
516 /// // - MemberId::<()>(3): [3]
517 /// # }, |mut stream| async move {
518 /// # let mut results = Vec::new();
519 /// # for w in 0..4 {
520 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
521 /// # }
522 /// # results.sort();
523 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
524 /// # }));
525 /// # }
526 /// ```
527 pub fn demux_bincode(
528 self,
529 other: &Cluster<'a, L2>,
530 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
531 where
532 T: Serialize + DeserializeOwned,
533 {
534 self.demux(other, TCP.fail_stop().bincode())
535 }
536
537 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
538 /// using the configuration in `via` to set up the message transport.
539 ///
540 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
541 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
542 /// this API allows precise targeting of specific cluster members rather than broadcasting to
543 /// all members.
544 ///
545 /// # Example
546 /// ```rust
547 /// # #[cfg(feature = "deploy")] {
548 /// # use hydro_lang::prelude::*;
549 /// # use futures::StreamExt;
550 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
551 /// let p1 = flow.process::<()>();
552 /// let workers: Cluster<()> = flow.cluster::<()>();
553 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
554 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
555 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
556 /// .demux(&workers, TCP.fail_stop().bincode());
557 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
558 /// // if there are 4 members in the cluster, each receives one element
559 /// // - MemberId::<()>(0): [0]
560 /// // - MemberId::<()>(1): [1]
561 /// // - MemberId::<()>(2): [2]
562 /// // - MemberId::<()>(3): [3]
563 /// # }, |mut stream| async move {
564 /// # let mut results = Vec::new();
565 /// # for w in 0..4 {
566 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
567 /// # }
568 /// # results.sort();
569 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
570 /// # }));
571 /// # }
572 /// ```
573 pub fn demux<N: NetworkFor<T>>(
574 self,
575 to: &Cluster<'a, L2>,
576 via: N,
577 ) -> Stream<
578 T,
579 Cluster<'a, L2, NoConsistency>,
580 Unbounded,
581 <O as MinOrder<N::OrderingGuarantee>>::Min,
582 R,
583 >
584 where
585 T: Serialize + DeserializeOwned,
586 O: MinOrder<N::OrderingGuarantee>,
587 {
588 self.into_keyed().demux(to, via)
589 }
590}
591
592impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
593 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
594 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
595 /// [`bincode`] to serialize/deserialize messages.
596 ///
597 /// This provides load balancing by evenly distributing work across cluster members. The
598 /// distribution is deterministic based on element order - the first element goes to member 0,
599 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
600 ///
601 /// # Non-Determinism
602 /// The set of cluster members may asynchronously change over time. Each element is distributed
603 /// based on the current cluster membership _at that point in time_. Depending on when cluster
604 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
605 /// membership is stable, the order of members in the round-robin pattern may change across runs.
606 ///
607 /// # Ordering Requirements
608 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
609 /// order of messages and retries affects the round-robin pattern.
610 ///
611 /// # Example
612 /// ```rust
613 /// # #[cfg(feature = "deploy")] {
614 /// # use hydro_lang::prelude::*;
615 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
616 /// # use futures::StreamExt;
617 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
618 /// let p1 = flow.process::<()>();
619 /// let workers: Cluster<()> = flow.cluster::<()>();
620 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
621 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
622 /// on_worker.send_bincode(&p2)
623 /// # .first().values() // we use first to assert that each member gets one element
624 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
625 /// // - MemberId::<()>(?): [1]
626 /// // - MemberId::<()>(?): [2]
627 /// // - MemberId::<()>(?): [3]
628 /// // - MemberId::<()>(?): [4]
629 /// # }, |mut stream| async move {
630 /// # let mut results = Vec::new();
631 /// # for w in 0..4 {
632 /// # results.push(stream.next().await.unwrap());
633 /// # }
634 /// # results.sort();
635 /// # assert_eq!(results, vec![1, 2, 3, 4]);
636 /// # }));
637 /// # }
638 /// ```
639 pub fn round_robin_bincode<L2: 'a>(
640 self,
641 other: &Cluster<'a, L2>,
642 nondet_membership: NonDet,
643 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
644 where
645 T: Serialize + DeserializeOwned,
646 {
647 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
648 }
649
650 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
651 /// the configuration in `via` to set up the message transport.
652 ///
653 /// This provides load balancing by evenly distributing work across cluster members. The
654 /// distribution is deterministic based on element order - the first element goes to member 0,
655 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
656 ///
657 /// # Non-Determinism
658 /// The set of cluster members may asynchronously change over time. Each element is distributed
659 /// based on the current cluster membership _at that point in time_. Depending on when cluster
660 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
661 /// membership is stable, the order of members in the round-robin pattern may change across runs.
662 ///
663 /// # Ordering Requirements
664 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
665 /// order of messages and retries affects the round-robin pattern.
666 ///
667 /// # Example
668 /// ```rust
669 /// # #[cfg(feature = "deploy")] {
670 /// # use hydro_lang::prelude::*;
671 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
672 /// # use futures::StreamExt;
673 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
674 /// let p1 = flow.process::<()>();
675 /// let workers: Cluster<()> = flow.cluster::<()>();
676 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
677 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
678 /// on_worker.send(&p2, TCP.fail_stop().bincode())
679 /// # .first().values() // we use first to assert that each member gets one element
680 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
681 /// // - MemberId::<()>(?): [1]
682 /// // - MemberId::<()>(?): [2]
683 /// // - MemberId::<()>(?): [3]
684 /// // - MemberId::<()>(?): [4]
685 /// # }, |mut stream| async move {
686 /// # let mut results = Vec::new();
687 /// # for w in 0..4 {
688 /// # results.push(stream.next().await.unwrap());
689 /// # }
690 /// # results.sort();
691 /// # assert_eq!(results, vec![1, 2, 3, 4]);
692 /// # }));
693 /// # }
694 /// ```
695 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
696 self,
697 to: &Cluster<'a, L2>,
698 via: N,
699 nondet_membership: NonDet,
700 ) -> Stream<T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
701 where
702 T: Serialize + DeserializeOwned,
703 {
704 let ids = track_membership(self.location.source_cluster_membership_stream(
705 to,
706 nondet!(/** dropped prefixes don't affect broadcast */),
707 ));
708 sliced! {
709 let members_snapshot = use(ids, nondet_membership);
710 let elements = use(self.enumerate(), nondet_membership);
711
712 let current_members = members_snapshot
713 .filter(q!(|b| *b))
714 .keys()
715 .assume_ordering::<TotalOrder>(nondet_membership)
716 .collect_vec();
717
718 elements
719 .cross_singleton(current_members)
720 .filter_map(q!(|(data, members)| {
721 if members.is_empty() {
722 None
723 } else {
724 Some((members[data.0 % members.len()].clone(), data.1))
725 }
726 }))
727 }
728 .demux(to, via)
729 }
730}
731
732impl<'a, T, L, B: Boundedness, C: Consistency>
733 Stream<T, Cluster<'a, L, C>, B, TotalOrder, ExactlyOnce>
734{
735 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
736 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
737 /// [`bincode`] to serialize/deserialize messages.
738 ///
739 /// This provides load balancing by evenly distributing work across cluster members. The
740 /// distribution is deterministic based on element order - the first element goes to member 0,
741 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
742 ///
743 /// # Non-Determinism
744 /// The set of cluster members may asynchronously change over time. Each element is distributed
745 /// based on the current cluster membership _at that point in time_. Depending on when cluster
746 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
747 /// membership is stable, the order of members in the round-robin pattern may change across runs.
748 ///
749 /// # Ordering Requirements
750 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
751 /// order of messages and retries affects the round-robin pattern.
752 ///
753 /// # Example
754 /// ```rust
755 /// # #[cfg(feature = "deploy")] {
756 /// # use hydro_lang::prelude::*;
757 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
758 /// # use hydro_lang::location::MemberId;
759 /// # use futures::StreamExt;
760 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
761 /// let p1 = flow.process::<()>();
762 /// let workers1: Cluster<()> = flow.cluster::<()>();
763 /// let workers2: Cluster<()> = flow.cluster::<()>();
764 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
765 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
766 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
767 /// on_worker2.send_bincode(&p2)
768 /// # .entries()
769 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
770 /// # }, |mut stream| async move {
771 /// # let mut results = Vec::new();
772 /// # let mut locations = std::collections::HashSet::new();
773 /// # for w in 0..=16 {
774 /// # let (location, v) = stream.next().await.unwrap();
775 /// # locations.insert(location);
776 /// # results.push(v);
777 /// # }
778 /// # results.sort();
779 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
780 /// # assert_eq!(locations.len(), 16);
781 /// # }));
782 /// # }
783 /// ```
784 pub fn round_robin_bincode<L2: 'a>(
785 self,
786 other: &Cluster<'a, L2>,
787 nondet_membership: NonDet,
788 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
789 where
790 T: Serialize + DeserializeOwned,
791 {
792 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
793 }
794
795 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
796 /// the configuration in `via` to set up the message transport.
797 ///
798 /// This provides load balancing by evenly distributing work across cluster members. The
799 /// distribution is deterministic based on element order - the first element goes to member 0,
800 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
801 ///
802 /// # Non-Determinism
803 /// The set of cluster members may asynchronously change over time. Each element is distributed
804 /// based on the current cluster membership _at that point in time_. Depending on when cluster
805 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
806 /// membership is stable, the order of members in the round-robin pattern may change across runs.
807 ///
808 /// # Ordering Requirements
809 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
810 /// order of messages and retries affects the round-robin pattern.
811 ///
812 /// # Example
813 /// ```rust
814 /// # #[cfg(feature = "deploy")] {
815 /// # use hydro_lang::prelude::*;
816 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
817 /// # use hydro_lang::location::MemberId;
818 /// # use futures::StreamExt;
819 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
820 /// let p1 = flow.process::<()>();
821 /// let workers1: Cluster<()> = flow.cluster::<()>();
822 /// let workers2: Cluster<()> = flow.cluster::<()>();
823 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
824 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
825 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
826 /// on_worker2.send(&p2, TCP.fail_stop().bincode())
827 /// # .entries()
828 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
829 /// # }, |mut stream| async move {
830 /// # let mut results = Vec::new();
831 /// # let mut locations = std::collections::HashSet::new();
832 /// # for w in 0..=16 {
833 /// # let (location, v) = stream.next().await.unwrap();
834 /// # locations.insert(location);
835 /// # results.push(v);
836 /// # }
837 /// # results.sort();
838 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
839 /// # assert_eq!(locations.len(), 16);
840 /// # }));
841 /// # }
842 /// ```
843 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
844 self,
845 to: &Cluster<'a, L2>,
846 via: N,
847 nondet_membership: NonDet,
848 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
849 where
850 T: Serialize + DeserializeOwned,
851 {
852 let ids = track_membership(self.location.source_cluster_membership_stream(
853 to,
854 nondet!(/** dropped prefixes don't affect broadcast */),
855 ));
856 sliced! {
857 let members_snapshot = use(ids, nondet_membership);
858 let elements = use(self.enumerate(), nondet_membership);
859
860 let current_members = members_snapshot
861 .filter(q!(|b| *b))
862 .keys()
863 .assume_ordering::<TotalOrder>(nondet_membership)
864 .collect_vec();
865
866 elements
867 .cross_singleton(current_members)
868 .filter_map(q!(|(data, members)| {
869 if members.is_empty() {
870 None
871 } else {
872 Some((members[data.0 % members.len()].clone(), data.1))
873 }
874 }))
875 }
876 .demux(to, via)
877 }
878}
879
880impl<'a, T, L, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
881 Stream<T, Cluster<'a, L, C>, B, O, R>
882{
883 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
884 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
885 /// using [`bincode`] to serialize/deserialize messages.
886 ///
887 /// Each cluster member sends its local stream elements, and they are collected at the destination
888 /// as a [`KeyedStream`] where keys identify the source cluster member.
889 ///
890 /// # Example
891 /// ```rust
892 /// # #[cfg(feature = "deploy")] {
893 /// # use hydro_lang::prelude::*;
894 /// # use futures::StreamExt;
895 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
896 /// let workers: Cluster<()> = flow.cluster::<()>();
897 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
898 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
899 /// # all_received.entries()
900 /// # }, |mut stream| async move {
901 /// // if there are 4 members in the cluster, we should receive 4 elements
902 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
903 /// # let mut results = Vec::new();
904 /// # for w in 0..4 {
905 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
906 /// # }
907 /// # results.sort();
908 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
909 /// # }));
910 /// # }
911 /// ```
912 ///
913 /// If you don't need to know the source for each element, you can use `.values()`
914 /// to get just the data:
915 /// ```rust
916 /// # #[cfg(feature = "deploy")] {
917 /// # use hydro_lang::prelude::*;
918 /// # use hydro_lang::live_collections::stream::NoOrder;
919 /// # use futures::StreamExt;
920 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
921 /// # let workers: Cluster<()> = flow.cluster::<()>();
922 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
923 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
924 /// # values
925 /// # }, |mut stream| async move {
926 /// # let mut results = Vec::new();
927 /// # for w in 0..4 {
928 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
929 /// # }
930 /// # results.sort();
931 /// // if there are 4 members in the cluster, we should receive 4 elements
932 /// // 1, 1, 1, 1
933 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
934 /// # }));
935 /// # }
936 /// ```
937 pub fn send_bincode<L2>(
938 self,
939 other: &Process<'a, L2>,
940 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
941 where
942 T: Serialize + DeserializeOwned,
943 {
944 self.send(other, TCP.fail_stop().bincode())
945 }
946
947 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
948 /// using the configuration in `via` to set up the message transport.
949 ///
950 /// Each cluster member sends its local stream elements, and they are collected at the destination
951 /// as a [`KeyedStream`] where keys identify the source cluster member.
952 ///
953 /// # Example
954 /// ```rust
955 /// # #[cfg(feature = "deploy")] {
956 /// # use hydro_lang::prelude::*;
957 /// # use futures::StreamExt;
958 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
959 /// let workers: Cluster<()> = flow.cluster::<()>();
960 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
961 /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
962 /// # all_received.entries()
963 /// # }, |mut stream| async move {
964 /// // if there are 4 members in the cluster, we should receive 4 elements
965 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
966 /// # let mut results = Vec::new();
967 /// # for w in 0..4 {
968 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
969 /// # }
970 /// # results.sort();
971 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
972 /// # }));
973 /// # }
974 /// ```
975 ///
976 /// If you don't need to know the source for each element, you can use `.values()`
977 /// to get just the data:
978 /// ```rust
979 /// # #[cfg(feature = "deploy")] {
980 /// # use hydro_lang::prelude::*;
981 /// # use hydro_lang::live_collections::stream::NoOrder;
982 /// # use futures::StreamExt;
983 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
984 /// # let workers: Cluster<()> = flow.cluster::<()>();
985 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
986 /// let values: Stream<i32, _, _, NoOrder> =
987 /// numbers.send(&process, TCP.fail_stop().bincode()).values();
988 /// # values
989 /// # }, |mut stream| async move {
990 /// # let mut results = Vec::new();
991 /// # for w in 0..4 {
992 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
993 /// # }
994 /// # results.sort();
995 /// // if there are 4 members in the cluster, we should receive 4 elements
996 /// // 1, 1, 1, 1
997 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
998 /// # }));
999 /// # }
1000 /// ```
1001 pub fn send<L2, N: NetworkFor<T>>(
1002 self,
1003 to: &Process<'a, L2>,
1004 via: N,
1005 ) -> KeyedStream<
1006 MemberId<L>,
1007 T,
1008 Process<'a, L2>,
1009 Unbounded,
1010 <O as MinOrder<N::OrderingGuarantee>>::Min,
1011 R,
1012 >
1013 where
1014 T: Serialize + DeserializeOwned,
1015 O: MinOrder<N::OrderingGuarantee>,
1016 {
1017 let serialize_pipeline = Some(N::serialize_thunk(false));
1018
1019 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
1020
1021 let name = via.name();
1022 if to.multiversioned() && name.is_none() {
1023 panic!(
1024 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
1025 );
1026 }
1027
1028 let raw_stream: Stream<
1029 (MemberId<L>, T),
1030 Process<'a, L2>,
1031 Unbounded,
1032 <O as MinOrder<N::OrderingGuarantee>>::Min,
1033 R,
1034 > = Stream::new(
1035 to.clone(),
1036 HydroNode::Network {
1037 name: name.map(ToOwned::to_owned),
1038 networking_info: N::networking_info(),
1039 serialize_fn: serialize_pipeline.map(|e| e.into()),
1040 instantiate_fn: DebugInstantiate::Building,
1041 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
1042 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1043 metadata: to.new_node_metadata(Stream::<
1044 (MemberId<L>, T),
1045 Process<'a, L2>,
1046 Unbounded,
1047 <O as MinOrder<N::OrderingGuarantee>>::Min,
1048 R,
1049 >::collection_kind()),
1050 },
1051 );
1052
1053 raw_stream.into_keyed()
1054 }
1055
1056 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
1057 /// Broadcasts elements of this stream at each source member to all members of a destination
1058 /// cluster, using [`bincode`] to serialize/deserialize messages.
1059 ///
1060 /// Each source member sends each of its stream elements to **every** member of the cluster
1061 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
1062 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
1063 /// **only data elements** and sends each element to all cluster members.
1064 ///
1065 /// # Non-Determinism
1066 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1067 /// to the current cluster members known _at that point in time_ at the source member. Depending
1068 /// on when each source member is notified of membership changes, it will broadcast each element
1069 /// to different members.
1070 ///
1071 /// # Example
1072 /// ```rust
1073 /// # #[cfg(feature = "deploy")] {
1074 /// # use hydro_lang::prelude::*;
1075 /// # use hydro_lang::location::MemberId;
1076 /// # use futures::StreamExt;
1077 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1078 /// # type Source = ();
1079 /// # type Destination = ();
1080 /// let source: Cluster<Source> = flow.cluster::<Source>();
1081 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1082 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1083 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
1084 /// # on_destination.entries().send_bincode(&p2).entries()
1085 /// // if there are 4 members in the desination, each receives one element from each source member
1086 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1087 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1088 /// // - ...
1089 /// # }, |mut stream| async move {
1090 /// # let mut results = Vec::new();
1091 /// # for w in 0..16 {
1092 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1093 /// # }
1094 /// # results.sort();
1095 /// # assert_eq!(results, vec![
1096 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1097 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1098 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1099 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1100 /// # ]);
1101 /// # }));
1102 /// # }
1103 /// ```
1104 pub fn broadcast_bincode<L2: 'a>(
1105 self,
1106 other: &Cluster<'a, L2>,
1107 nondet_membership: NonDet,
1108 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1109 where
1110 T: Clone + Serialize + DeserializeOwned,
1111 {
1112 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
1113 }
1114
1115 /// Broadcasts elements of this stream at each source member to all members of a destination
1116 /// cluster, using the configuration in `via` to set up the message transport.
1117 ///
1118 /// Each source member sends each of its stream elements to **every** member of the cluster
1119 /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1120 /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1121 /// **only data elements** and sends each element to all cluster members.
1122 ///
1123 /// # Non-Determinism
1124 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1125 /// to the current cluster members known _at that point in time_ at the source member. Depending
1126 /// on when each source member is notified of membership changes, it will broadcast each element
1127 /// to different members.
1128 ///
1129 /// # Example
1130 /// ```rust
1131 /// # #[cfg(feature = "deploy")] {
1132 /// # use hydro_lang::prelude::*;
1133 /// # use hydro_lang::location::MemberId;
1134 /// # use futures::StreamExt;
1135 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1136 /// # type Source = ();
1137 /// # type Destination = ();
1138 /// let source: Cluster<Source> = flow.cluster::<Source>();
1139 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1140 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1141 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1142 /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1143 /// // if there are 4 members in the desination, each receives one element from each source member
1144 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1145 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1146 /// // - ...
1147 /// # }, |mut stream| async move {
1148 /// # let mut results = Vec::new();
1149 /// # for w in 0..16 {
1150 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1151 /// # }
1152 /// # results.sort();
1153 /// # assert_eq!(results, vec![
1154 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1155 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1156 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1157 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1158 /// # ]);
1159 /// # }));
1160 /// # }
1161 /// ```
1162 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1163 self,
1164 to: &Cluster<'a, L2>,
1165 via: N,
1166 nondet_membership: NonDet,
1167 ) -> KeyedStream<
1168 MemberId<L>,
1169 T,
1170 Cluster<'a, L2>,
1171 Unbounded,
1172 <O as MinOrder<N::OrderingGuarantee>>::Min,
1173 R,
1174 >
1175 where
1176 T: Clone + Serialize + DeserializeOwned,
1177 O: MinOrder<N::OrderingGuarantee>,
1178 {
1179 let ids = track_membership(self.location.source_cluster_membership_stream(
1180 to,
1181 nondet!(/** dropped prefixes don't affect broadcast */),
1182 ));
1183 sliced! {
1184 let members_snapshot = use(ids, nondet_membership);
1185 let elements = use(self, nondet_membership);
1186
1187 let current_members = members_snapshot.filter(q!(|b| *b));
1188 elements.repeat_with_keys(current_members)
1189 }
1190 .demux(to, via)
1191 }
1192
1193 /// Broadcasts elements of this stream at each source member to all members of a destination
1194 /// cluster, assuming membership is closed (fixed at deploy time).
1195 ///
1196 /// Unlike [`Stream::broadcast`], this does not require a [`NonDet`] guard.
1197 /// The membership set is obtained from deploy metadata via [`ClusterIds`], making the
1198 /// broadcast fully deterministic. Since all source members send to all destination members
1199 /// and membership is fixed, every destination member receives the same set of elements
1200 /// from each source, guaranteeing [`EventualConsistency`].
1201 ///
1202 /// This is only available in deployment targets with static cluster membership
1203 /// (legacy Hydro Deploy and simulation). On dynamic targets, use [`Stream::broadcast`].
1204 pub fn broadcast_closed<L2: 'a, N: NetworkFor<T>>(
1205 self,
1206 to: &Cluster<'a, L2>,
1207 via: N,
1208 ) -> KeyedStream<
1209 MemberId<L>,
1210 T,
1211 Cluster<'a, L2, EventualConsistency>,
1212 Unbounded,
1213 <O as MinOrder<N::OrderingGuarantee>>::Min,
1214 R,
1215 >
1216 where
1217 T: Clone + Serialize + DeserializeOwned,
1218 O: MinOrder<N::OrderingGuarantee>,
1219 {
1220 let cluster_ids = ClusterIds {
1221 key: to.key,
1222 _phantom: PhantomData,
1223 };
1224 let member_ids = self
1225 .location
1226 .source_iter(q!(cluster_ids
1227 .iter()
1228 .map(|id| MemberId::from_tagless(id.clone()))))
1229 .assert_has_consistency_of_trusted::<Cluster<'a, L, C>>(manual_proof!(
1230 /// ClusterIds is deploy-time metadata, identical on every cluster member.
1231 ));
1232
1233 self.cross_product(member_ids)
1234 .map(q!(|(data, member_id)| (member_id, data)))
1235 .into_keyed()
1236 .demux(to, via)
1237 .assert_has_consistency_of_trusted(manual_proof!(
1238 /// Closed broadcast with fixed membership: every source member sends to every
1239 /// destination member, so all destinations materialize the same elements.
1240 ))
1241 }
1242
1243 #[cfg(feature = "sim")]
1244 /// Sends elements of this cluster stream to an external location using bincode serialization.
1245 fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
1246 where
1247 T: Serialize + DeserializeOwned,
1248 {
1249 let serialize_pipeline = Some(serialize_bincode::<T>(false));
1250
1251 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1252
1253 let external_port_id = flow_state_borrow.next_external_port();
1254
1255 flow_state_borrow.push_root(HydroRoot::SendExternal {
1256 to_external_key: other.key,
1257 to_port_id: external_port_id,
1258 to_many: false,
1259 unpaired: true,
1260 serialize_fn: serialize_pipeline.map(|e| e.into()),
1261 instantiate_fn: DebugInstantiate::Building,
1262 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1263 op_metadata: HydroIrOpMetadata::new(),
1264 });
1265
1266 ExternalBincodeStream {
1267 process_key: other.key,
1268 port_id: external_port_id,
1269 _phantom: PhantomData,
1270 }
1271 }
1272
1273 #[cfg(feature = "sim")]
1274 /// Sets up a simulation output port for this cluster stream, allowing test code
1275 /// to receive `(member_id, T)` pairs during simulation.
1276 pub fn sim_cluster_output(self) -> crate::sim::SimClusterReceiver<T, O, R>
1277 where
1278 T: Serialize + DeserializeOwned,
1279 {
1280 let external_location: External<'a, ()> = External {
1281 key: LocationKey::FIRST,
1282 flow_state: self.location.flow_state().clone(),
1283 _phantom: PhantomData,
1284 };
1285
1286 let external = self.send_bincode_external(&external_location);
1287
1288 crate::sim::SimClusterReceiver(external.port_id, PhantomData)
1289 }
1290}
1291
1292impl<'a, T, L, L2, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
1293 Stream<(MemberId<L2>, T), Cluster<'a, L, C>, B, O, R>
1294{
1295 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1296 /// Sends elements of this stream at each source member to specific members of a destination
1297 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1298 ///
1299 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1300 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1301 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1302 /// all members.
1303 ///
1304 /// Each cluster member sends its local stream elements, and they are collected at each
1305 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1306 ///
1307 /// # Example
1308 /// ```rust
1309 /// # #[cfg(feature = "deploy")] {
1310 /// # use hydro_lang::prelude::*;
1311 /// # use futures::StreamExt;
1312 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1313 /// # type Source = ();
1314 /// # type Destination = ();
1315 /// let source: Cluster<Source> = flow.cluster::<Source>();
1316 /// let to_send: Stream<_, Cluster<_>, _> = source
1317 /// .source_iter(q!(vec![0, 1, 2, 3]))
1318 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1319 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1320 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1321 /// # all_received.entries().send_bincode(&p2).entries()
1322 /// # }, |mut stream| async move {
1323 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1324 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1325 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1326 /// // - ...
1327 /// # let mut results = Vec::new();
1328 /// # for w in 0..16 {
1329 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1330 /// # }
1331 /// # results.sort();
1332 /// # assert_eq!(results, vec![
1333 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1334 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1335 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1336 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1337 /// # ]);
1338 /// # }));
1339 /// # }
1340 /// ```
1341 pub fn demux_bincode(
1342 self,
1343 other: &Cluster<'a, L2>,
1344 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1345 where
1346 T: Serialize + DeserializeOwned,
1347 {
1348 self.demux(other, TCP.fail_stop().bincode())
1349 }
1350
1351 /// Sends elements of this stream at each source member to specific members of a destination
1352 /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1353 /// message transport.
1354 ///
1355 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1356 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1357 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1358 /// all members.
1359 ///
1360 /// Each cluster member sends its local stream elements, and they are collected at each
1361 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1362 ///
1363 /// # Example
1364 /// ```rust
1365 /// # #[cfg(feature = "deploy")] {
1366 /// # use hydro_lang::prelude::*;
1367 /// # use futures::StreamExt;
1368 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1369 /// # type Source = ();
1370 /// # type Destination = ();
1371 /// let source: Cluster<Source> = flow.cluster::<Source>();
1372 /// let to_send: Stream<_, Cluster<_>, _> = source
1373 /// .source_iter(q!(vec![0, 1, 2, 3]))
1374 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1375 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1376 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1377 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1378 /// # }, |mut stream| async move {
1379 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1380 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1381 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1382 /// // - ...
1383 /// # let mut results = Vec::new();
1384 /// # for w in 0..16 {
1385 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1386 /// # }
1387 /// # results.sort();
1388 /// # assert_eq!(results, vec![
1389 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1390 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1391 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1392 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1393 /// # ]);
1394 /// # }));
1395 /// # }
1396 /// ```
1397 pub fn demux<N: NetworkFor<T>>(
1398 self,
1399 to: &Cluster<'a, L2>,
1400 via: N,
1401 ) -> KeyedStream<
1402 MemberId<L>,
1403 T,
1404 Cluster<'a, L2, NoConsistency>,
1405 Unbounded,
1406 <O as MinOrder<N::OrderingGuarantee>>::Min,
1407 R,
1408 >
1409 where
1410 T: Serialize + DeserializeOwned,
1411 O: MinOrder<N::OrderingGuarantee>,
1412 {
1413 self.into_keyed().demux(to, via)
1414 }
1415}
1416
1417#[cfg(test)]
1418mod tests {
1419 #[cfg(feature = "sim")]
1420 use stageleft::q;
1421
1422 #[cfg(feature = "sim")]
1423 use crate::live_collections::sliced::sliced;
1424 #[cfg(feature = "sim")]
1425 use crate::location::{Location, MemberId};
1426 #[cfg(feature = "sim")]
1427 use crate::networking::TCP;
1428 #[cfg(feature = "sim")]
1429 use crate::nondet::nondet;
1430 #[cfg(feature = "sim")]
1431 use crate::prelude::FlowBuilder;
1432
1433 #[cfg(feature = "sim")]
1434 #[test]
1435 fn sim_send_bincode_o2o() {
1436 use crate::networking::TCP;
1437
1438 let mut flow = FlowBuilder::new();
1439 let node = flow.process::<()>();
1440 let node2 = flow.process::<()>();
1441
1442 let (in_send, input) = node.sim_input();
1443
1444 let out_recv = input
1445 .send(&node2, TCP.fail_stop().bincode())
1446 .batch(&node2.tick(), nondet!(/** test */))
1447 .count()
1448 .all_ticks()
1449 .sim_output();
1450
1451 let instances = flow.sim().exhaustive(async || {
1452 in_send.send(());
1453 in_send.send(());
1454 in_send.send(());
1455
1456 let received = out_recv.collect::<Vec<_>>().await;
1457 assert!(received.into_iter().sum::<usize>() == 3);
1458 });
1459
1460 assert_eq!(instances, 4); // 2^{3 - 1}
1461 }
1462
1463 #[cfg(feature = "sim")]
1464 #[test]
1465 fn sim_send_bincode_m2o() {
1466 let mut flow = FlowBuilder::new();
1467 let cluster = flow.cluster::<()>();
1468 let node = flow.process::<()>();
1469
1470 let input = cluster.source_iter(q!(vec![1]));
1471
1472 let out_recv = input
1473 .send(&node, TCP.fail_stop().bincode())
1474 .entries()
1475 .batch(&node.tick(), nondet!(/** test */))
1476 .all_ticks()
1477 .sim_output();
1478
1479 let instances = flow
1480 .sim()
1481 .with_cluster_size(&cluster, 4)
1482 .exhaustive(async || {
1483 out_recv
1484 .assert_yields_only_unordered(vec![
1485 (MemberId::from_raw_id(0), 1),
1486 (MemberId::from_raw_id(1), 1),
1487 (MemberId::from_raw_id(2), 1),
1488 (MemberId::from_raw_id(3), 1),
1489 ])
1490 .await
1491 });
1492
1493 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1494 }
1495
1496 #[cfg(feature = "sim")]
1497 #[test]
1498 fn sim_send_bincode_multiple_m2o() {
1499 let mut flow = FlowBuilder::new();
1500 let cluster1 = flow.cluster::<()>();
1501 let cluster2 = flow.cluster::<()>();
1502 let node = flow.process::<()>();
1503
1504 let out_recv_1 = cluster1
1505 .source_iter(q!(vec![1]))
1506 .send(&node, TCP.fail_stop().bincode())
1507 .entries()
1508 .sim_output();
1509
1510 let out_recv_2 = cluster2
1511 .source_iter(q!(vec![2]))
1512 .send(&node, TCP.fail_stop().bincode())
1513 .entries()
1514 .sim_output();
1515
1516 let instances = flow
1517 .sim()
1518 .with_cluster_size(&cluster1, 3)
1519 .with_cluster_size(&cluster2, 4)
1520 .exhaustive(async || {
1521 out_recv_1
1522 .assert_yields_only_unordered(vec![
1523 (MemberId::from_raw_id(0), 1),
1524 (MemberId::from_raw_id(1), 1),
1525 (MemberId::from_raw_id(2), 1),
1526 ])
1527 .await;
1528
1529 out_recv_2
1530 .assert_yields_only_unordered(vec![
1531 (MemberId::from_raw_id(0), 2),
1532 (MemberId::from_raw_id(1), 2),
1533 (MemberId::from_raw_id(2), 2),
1534 (MemberId::from_raw_id(3), 2),
1535 ])
1536 .await;
1537 });
1538
1539 assert_eq!(instances, 1);
1540 }
1541
1542 #[cfg(feature = "sim")]
1543 #[test]
1544 fn sim_send_bincode_o2m() {
1545 let mut flow = FlowBuilder::new();
1546 let cluster = flow.cluster::<()>();
1547 let node = flow.process::<()>();
1548
1549 let input = node.source_iter(q!(vec![
1550 (MemberId::from_raw_id(0), 123),
1551 (MemberId::from_raw_id(1), 456),
1552 ]));
1553
1554 let out_recv = input
1555 .demux(&cluster, TCP.fail_stop().bincode())
1556 .map(q!(|x| x + 1))
1557 .send(&node, TCP.fail_stop().bincode())
1558 .entries()
1559 .sim_output();
1560
1561 flow.sim()
1562 .with_cluster_size(&cluster, 4)
1563 .exhaustive(async || {
1564 out_recv
1565 .assert_yields_only_unordered(vec![
1566 (MemberId::from_raw_id(0), 124),
1567 (MemberId::from_raw_id(1), 457),
1568 ])
1569 .await
1570 });
1571 }
1572
1573 #[cfg(feature = "sim")]
1574 #[test]
1575 fn sim_broadcast_bincode_o2m() {
1576 let mut flow = FlowBuilder::new();
1577 let cluster = flow.cluster::<()>();
1578 let node = flow.process::<()>();
1579
1580 let input = node.source_iter(q!(vec![123, 456]));
1581
1582 let out_recv = input
1583 .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1584 .map(q!(|x| x + 1))
1585 .send(&node, TCP.fail_stop().bincode())
1586 .entries()
1587 .sim_output();
1588
1589 let mut c_1_produced = false;
1590 let mut c_2_produced = false;
1591 let mut c_1_saw_457_but_not_124 = false;
1592
1593 flow.sim()
1594 .with_cluster_size(&cluster, 2)
1595 .exhaustive(async || {
1596 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1597
1598 // check that order is preserved
1599 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1600 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1601 c_1_produced = true;
1602 }
1603
1604 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1605 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1606 c_2_produced = true;
1607 }
1608
1609 if all_out.contains(&(MemberId::from_raw_id(0), 457))
1610 && !all_out.contains(&(MemberId::from_raw_id(0), 124))
1611 {
1612 c_1_saw_457_but_not_124 = true;
1613 }
1614 });
1615
1616 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1617
1618 // in at least one execution, the cluster member received 457 but not 124, this tests
1619 // that the simulator properly explores dynamic membership additions (a member that joins after 123 is broadcast)
1620 assert!(c_1_saw_457_but_not_124);
1621 }
1622
1623 #[cfg(feature = "sim")]
1624 #[test]
1625 fn sim_send_bincode_m2m() {
1626 let mut flow = FlowBuilder::new();
1627 let cluster = flow.cluster::<()>();
1628 let node = flow.process::<()>();
1629
1630 let input = node.source_iter(q!(vec![
1631 (MemberId::from_raw_id(0), 123),
1632 (MemberId::from_raw_id(1), 456),
1633 ]));
1634
1635 let out_recv = input
1636 .demux(&cluster, TCP.fail_stop().bincode())
1637 .map(q!(|x| x + 1))
1638 .flat_map_ordered(q!(|x| vec![
1639 (MemberId::from_raw_id(0), x),
1640 (MemberId::from_raw_id(1), x),
1641 ]))
1642 .demux(&cluster, TCP.fail_stop().bincode())
1643 .entries()
1644 .send(&node, TCP.fail_stop().bincode())
1645 .entries()
1646 .sim_output();
1647
1648 flow.sim()
1649 .with_cluster_size(&cluster, 4)
1650 .exhaustive(async || {
1651 out_recv
1652 .assert_yields_only_unordered(vec![
1653 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1654 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1655 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1656 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1657 ])
1658 .await
1659 });
1660 }
1661
1662 #[cfg(feature = "sim")]
1663 #[test]
1664 fn sim_lossy_delayed_forever_o2o() {
1665 use std::collections::HashSet;
1666
1667 use crate::properties::manual_proof;
1668
1669 let mut flow = FlowBuilder::new();
1670 let node = flow.process::<()>();
1671 let node2 = flow.process::<()>();
1672
1673 let received = node
1674 .source_iter(q!(0..3_u32))
1675 .send(&node2, TCP.lossy_delayed_forever().bincode())
1676 .fold(
1677 q!(|| std::collections::HashSet::<u32>::new()),
1678 q!(
1679 |set, v| {
1680 set.insert(v);
1681 },
1682 commutative = manual_proof!(/** set insert is commutative */)
1683 ),
1684 );
1685
1686 let out_recv = sliced! {
1687 let snapshot = use(received, nondet!(/** test */));
1688 snapshot.into_stream()
1689 }
1690 .sim_output();
1691
1692 let mut saw_non_contiguous = false;
1693
1694 flow.sim().test_safety_only().exhaustive(async || {
1695 let snapshots = out_recv.collect::<Vec<HashSet<u32>>>().await;
1696
1697 // Check each individual snapshot for a non-contiguous subset.
1698 for set in &snapshots {
1699 #[expect(clippy::disallowed_methods, reason = "min / max are deterministic")]
1700 if set.len() >= 2 && set.len() < 3 {
1701 let min = *set.iter().min().unwrap();
1702 let max = *set.iter().max().unwrap();
1703 if set.len() < (max - min + 1) as usize {
1704 saw_non_contiguous = true;
1705 }
1706 }
1707 }
1708 });
1709
1710 assert!(
1711 saw_non_contiguous,
1712 "Expected at least one execution with a non-contiguous subset of inputs"
1713 );
1714 }
1715
1716 #[cfg(feature = "sim")]
1717 #[test]
1718 fn sim_broadcast_closed_o2m() {
1719 let mut flow = FlowBuilder::new();
1720 let cluster = flow.cluster::<()>();
1721 let node = flow.process::<()>();
1722
1723 let input = node.source_iter(q!(vec![123, 456]));
1724
1725 let out_recv = input
1726 .broadcast_closed(&cluster, TCP.fail_stop().bincode())
1727 .send(&node, TCP.fail_stop().bincode())
1728 .entries()
1729 .sim_output();
1730
1731 flow.sim()
1732 .with_cluster_size(&cluster, 2)
1733 .exhaustive(async || {
1734 out_recv
1735 .assert_yields_only_unordered(vec![
1736 (MemberId::from_raw_id(0), 123),
1737 (MemberId::from_raw_id(0), 456),
1738 (MemberId::from_raw_id(1), 123),
1739 (MemberId::from_raw_id(1), 456),
1740 ])
1741 .await
1742 });
1743 }
1744
1745 #[cfg(feature = "sim")]
1746 #[test]
1747 fn sim_broadcast_closed_m2m() {
1748 let mut flow = FlowBuilder::new();
1749 let source = flow.cluster::<()>();
1750 let dest: crate::location::Cluster<'_, ()> = flow.cluster::<()>();
1751 let node = flow.process::<()>();
1752
1753 let input = source.source_iter(q!(vec![123]));
1754
1755 // Broadcast from source cluster to dest cluster, then collect at a process.
1756 let out_recv = input
1757 .broadcast_closed(&dest, TCP.fail_stop().bincode())
1758 .entries()
1759 .send(&node, TCP.fail_stop().bincode())
1760 .entries()
1761 .sim_output();
1762
1763 flow.sim()
1764 .with_cluster_size(&source, 2)
1765 .with_cluster_size(&dest, 2)
1766 .exhaustive(async || {
1767 // Each source member (0, 1) broadcasts 123 to each dest member (0, 1).
1768 // The dest members then send to the process keyed by dest member id.
1769 // Each dest member receives (source_0, 123) and (source_1, 123).
1770 out_recv
1771 .assert_yields_only_unordered(vec![
1772 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 123)),
1773 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 123)),
1774 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 123)),
1775 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 123)),
1776 ])
1777 .await
1778 });
1779 }
1780}