hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26#[cfg(stageleft_runtime)]
27use crate::location::dynamic::{DynLocation, LocationId};
28use crate::location::tick::{Atomic, DeferTick, NoAtomic};
29use crate::location::{Location, NoTick, Tick, check_matching_location};
30use crate::manual_expr::ManualExpr;
31use crate::nondet::{NonDet, nondet};
32use crate::prelude::manual_proof;
33use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
34
35pub mod networking;
36
37/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
38#[sealed::sealed]
39pub trait Ordering:
40 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
41{
42 /// The [`StreamOrder`] corresponding to this type.
43 const ORDERING_KIND: StreamOrder;
44}
45
46/// Marks the stream as being totally ordered, which means that there are
47/// no sources of non-determinism (other than intentional ones) that will
48/// affect the order of elements.
49pub enum TotalOrder {}
50
51#[sealed::sealed]
52impl Ordering for TotalOrder {
53 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
54}
55
56/// Marks the stream as having no order, which means that the order of
57/// elements may be affected by non-determinism.
58///
59/// This restricts certain operators, such as `fold` and `reduce`, to only
60/// be used with commutative aggregation functions.
61pub enum NoOrder {}
62
63#[sealed::sealed]
64impl Ordering for NoOrder {
65 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
66}
67
68/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
69/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
70/// have `Self` guarantees instead.
71#[sealed::sealed]
72pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
73#[sealed::sealed]
74impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
75
76/// Helper trait for determining the weakest of two orderings.
77#[sealed::sealed]
78pub trait MinOrder<Other: ?Sized> {
79 /// The weaker of the two orderings.
80 type Min: Ordering;
81}
82
83#[sealed::sealed]
84impl<O: Ordering> MinOrder<O> for TotalOrder {
85 type Min = O;
86}
87
88#[sealed::sealed]
89impl<O: Ordering> MinOrder<O> for NoOrder {
90 type Min = NoOrder;
91}
92
93/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
94#[sealed::sealed]
95pub trait Retries:
96 MinRetries<Self, Min = Self>
97 + MinRetries<ExactlyOnce, Min = Self>
98 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
99{
100 /// The [`StreamRetry`] corresponding to this type.
101 const RETRIES_KIND: StreamRetry;
102}
103
104/// Marks the stream as having deterministic message cardinality, with no
105/// possibility of duplicates.
106pub enum ExactlyOnce {}
107
108#[sealed::sealed]
109impl Retries for ExactlyOnce {
110 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
111}
112
113/// Marks the stream as having non-deterministic message cardinality, which
114/// means that duplicates may occur, but messages will not be dropped.
115pub enum AtLeastOnce {}
116
117#[sealed::sealed]
118impl Retries for AtLeastOnce {
119 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
120}
121
122/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
123/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
124/// have `Self` guarantees instead.
125#[sealed::sealed]
126pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
127#[sealed::sealed]
128impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
129
130/// Helper trait for determining the weakest of two retry guarantees.
131#[sealed::sealed]
132pub trait MinRetries<Other: ?Sized> {
133 /// The weaker of the two retry guarantees.
134 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
135}
136
137#[sealed::sealed]
138impl<R: Retries> MinRetries<R> for ExactlyOnce {
139 type Min = R;
140}
141
142#[sealed::sealed]
143impl<R: Retries> MinRetries<R> for AtLeastOnce {
144 type Min = AtLeastOnce;
145}
146
147#[sealed::sealed]
148#[diagnostic::on_unimplemented(
149 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
150 label = "required here",
151 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
152)]
153/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
154pub trait IsOrdered: Ordering {}
155
156#[sealed::sealed]
157#[diagnostic::do_not_recommend]
158impl IsOrdered for TotalOrder {}
159
160#[sealed::sealed]
161#[diagnostic::on_unimplemented(
162 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
163 label = "required here",
164 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
165)]
166/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
167pub trait IsExactlyOnce: Retries {}
168
169#[sealed::sealed]
170#[diagnostic::do_not_recommend]
171impl IsExactlyOnce for ExactlyOnce {}
172
173/// Streaming sequence of elements with type `Type`.
174///
175/// This live collection represents a growing sequence of elements, with new elements being
176/// asynchronously appended to the end of the sequence. This can be used to model the arrival
177/// of network input, such as API requests, or streaming ingestion.
178///
179/// By default, all streams have deterministic ordering and each element is materialized exactly
180/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
181/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
182/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
183///
184/// Type Parameters:
185/// - `Type`: the type of elements in the stream
186/// - `Loc`: the location where the stream is being materialized
187/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
188/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
189/// (default is [`TotalOrder`])
190/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
191/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
192pub struct Stream<
193 Type,
194 Loc,
195 Bound: Boundedness = Unbounded,
196 Order: Ordering = TotalOrder,
197 Retry: Retries = ExactlyOnce,
198> {
199 pub(crate) location: Loc,
200 pub(crate) ir_node: RefCell<HydroNode>,
201 pub(crate) flow_state: FlowState,
202
203 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
204}
205
206impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
207 fn drop(&mut self) {
208 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
209 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
210 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
211 input: Box::new(ir_node),
212 op_metadata: HydroIrOpMetadata::new(),
213 });
214 }
215 }
216}
217
218impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
219 for Stream<T, L, Unbounded, O, R>
220where
221 L: Location<'a>,
222{
223 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
224 let new_meta = stream
225 .location
226 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
227
228 Stream {
229 location: stream.location.clone(),
230 flow_state: stream.flow_state.clone(),
231 ir_node: RefCell::new(HydroNode::Cast {
232 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
233 metadata: new_meta,
234 }),
235 _phantom: PhantomData,
236 }
237 }
238}
239
240impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
241 for Stream<T, L, B, NoOrder, R>
242where
243 L: Location<'a>,
244{
245 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
246 stream.weaken_ordering()
247 }
248}
249
250impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
251 for Stream<T, L, B, O, AtLeastOnce>
252where
253 L: Location<'a>,
254{
255 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
256 stream.weaken_retries()
257 }
258}
259
260impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
261where
262 L: Location<'a>,
263{
264 fn defer_tick(self) -> Self {
265 Stream::defer_tick(self)
266 }
267}
268
269impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
270 for Stream<T, Tick<L>, Bounded, O, R>
271where
272 L: Location<'a>,
273{
274 type Location = Tick<L>;
275
276 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
277 Stream::new(
278 location.clone(),
279 HydroNode::CycleSource {
280 cycle_id,
281 metadata: location.new_node_metadata(Self::collection_kind()),
282 },
283 )
284 }
285}
286
287impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
288 for Stream<T, Tick<L>, Bounded, O, R>
289where
290 L: Location<'a>,
291{
292 type Location = Tick<L>;
293
294 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
295 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
296 location.clone(),
297 HydroNode::DeferTick {
298 input: Box::new(HydroNode::CycleSource {
299 cycle_id,
300 metadata: location.new_node_metadata(Self::collection_kind()),
301 }),
302 metadata: location.new_node_metadata(Self::collection_kind()),
303 },
304 );
305
306 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
307 }
308}
309
310impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
311 for Stream<T, Tick<L>, Bounded, O, R>
312where
313 L: Location<'a>,
314{
315 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
316 assert_eq!(
317 Location::id(&self.location),
318 expected_location,
319 "locations do not match"
320 );
321 self.location
322 .flow_state()
323 .borrow_mut()
324 .push_root(HydroRoot::CycleSink {
325 cycle_id,
326 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
327 op_metadata: HydroIrOpMetadata::new(),
328 });
329 }
330}
331
332impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
333 for Stream<T, L, B, O, R>
334where
335 L: Location<'a> + NoTick,
336{
337 type Location = L;
338
339 fn create_source(cycle_id: CycleId, location: L) -> Self {
340 Stream::new(
341 location.clone(),
342 HydroNode::CycleSource {
343 cycle_id,
344 metadata: location.new_node_metadata(Self::collection_kind()),
345 },
346 )
347 }
348}
349
350impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
351 for Stream<T, L, B, O, R>
352where
353 L: Location<'a> + NoTick,
354{
355 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
356 assert_eq!(
357 Location::id(&self.location),
358 expected_location,
359 "locations do not match"
360 );
361 self.location
362 .flow_state()
363 .borrow_mut()
364 .push_root(HydroRoot::CycleSink {
365 cycle_id,
366 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
367 op_metadata: HydroIrOpMetadata::new(),
368 });
369 }
370}
371
372impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
373where
374 T: Clone,
375 L: Location<'a>,
376{
377 fn clone(&self) -> Self {
378 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
379 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
380 *self.ir_node.borrow_mut() = HydroNode::Tee {
381 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
382 metadata: self.location.new_node_metadata(Self::collection_kind()),
383 };
384 }
385
386 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
387 Stream {
388 location: self.location.clone(),
389 flow_state: self.flow_state.clone(),
390 ir_node: HydroNode::Tee {
391 inner: SharedNode(inner.0.clone()),
392 metadata: metadata.clone(),
393 }
394 .into(),
395 _phantom: PhantomData,
396 }
397 } else {
398 unreachable!()
399 }
400 }
401}
402
403impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
404where
405 L: Location<'a>,
406{
407 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
408 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
409 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
410
411 let flow_state = location.flow_state().clone();
412 Stream {
413 location,
414 flow_state,
415 ir_node: RefCell::new(ir_node),
416 _phantom: PhantomData,
417 }
418 }
419
420 /// Returns the [`Location`] where this stream is being materialized.
421 pub fn location(&self) -> &L {
422 &self.location
423 }
424
425 pub(crate) fn collection_kind() -> CollectionKind {
426 CollectionKind::Stream {
427 bound: B::BOUND_KIND,
428 order: O::ORDERING_KIND,
429 retry: R::RETRIES_KIND,
430 element_type: quote_type::<T>().into(),
431 }
432 }
433
434 /// Produces a stream based on invoking `f` on each element.
435 /// If you do not want to modify the stream and instead only want to view
436 /// each item use [`Stream::inspect`] instead.
437 ///
438 /// # Example
439 /// ```rust
440 /// # #[cfg(feature = "deploy")] {
441 /// # use hydro_lang::prelude::*;
442 /// # use futures::StreamExt;
443 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
444 /// let words = process.source_iter(q!(vec!["hello", "world"]));
445 /// words.map(q!(|x| x.to_uppercase()))
446 /// # }, |mut stream| async move {
447 /// # for w in vec!["HELLO", "WORLD"] {
448 /// # assert_eq!(stream.next().await.unwrap(), w);
449 /// # }
450 /// # }));
451 /// # }
452 /// ```
453 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
454 where
455 F: Fn(T) -> U + 'a,
456 {
457 let f = f.splice_fn1_ctx(&self.location).into();
458 Stream::new(
459 self.location.clone(),
460 HydroNode::Map {
461 f,
462 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
463 metadata: self
464 .location
465 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
466 },
467 )
468 }
469
470 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
471 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
472 /// for the output type `U` must produce items in a **deterministic** order.
473 ///
474 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
475 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
476 ///
477 /// # Example
478 /// ```rust
479 /// # #[cfg(feature = "deploy")] {
480 /// # use hydro_lang::prelude::*;
481 /// # use futures::StreamExt;
482 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
483 /// process
484 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
485 /// .flat_map_ordered(q!(|x| x))
486 /// # }, |mut stream| async move {
487 /// // 1, 2, 3, 4
488 /// # for w in (1..5) {
489 /// # assert_eq!(stream.next().await.unwrap(), w);
490 /// # }
491 /// # }));
492 /// # }
493 /// ```
494 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
495 where
496 I: IntoIterator<Item = U>,
497 F: Fn(T) -> I + 'a,
498 {
499 let f = f.splice_fn1_ctx(&self.location).into();
500 Stream::new(
501 self.location.clone(),
502 HydroNode::FlatMap {
503 f,
504 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
505 metadata: self
506 .location
507 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
508 },
509 )
510 }
511
512 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
513 /// for the output type `U` to produce items in any order.
514 ///
515 /// # Example
516 /// ```rust
517 /// # #[cfg(feature = "deploy")] {
518 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
519 /// # use futures::StreamExt;
520 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
521 /// process
522 /// .source_iter(q!(vec![
523 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
524 /// std::collections::HashSet::from_iter(vec![3, 4]),
525 /// ]))
526 /// .flat_map_unordered(q!(|x| x))
527 /// # }, |mut stream| async move {
528 /// // 1, 2, 3, 4, but in no particular order
529 /// # let mut results = Vec::new();
530 /// # for w in (1..5) {
531 /// # results.push(stream.next().await.unwrap());
532 /// # }
533 /// # results.sort();
534 /// # assert_eq!(results, vec![1, 2, 3, 4]);
535 /// # }));
536 /// # }
537 /// ```
538 pub fn flat_map_unordered<U, I, F>(
539 self,
540 f: impl IntoQuotedMut<'a, F, L>,
541 ) -> Stream<U, L, B, NoOrder, R>
542 where
543 I: IntoIterator<Item = U>,
544 F: Fn(T) -> I + 'a,
545 {
546 let f = f.splice_fn1_ctx(&self.location).into();
547 Stream::new(
548 self.location.clone(),
549 HydroNode::FlatMap {
550 f,
551 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
552 metadata: self
553 .location
554 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
555 },
556 )
557 }
558
559 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
560 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
561 ///
562 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
563 /// not deterministic, use [`Stream::flatten_unordered`] instead.
564 ///
565 /// ```rust
566 /// # #[cfg(feature = "deploy")] {
567 /// # use hydro_lang::prelude::*;
568 /// # use futures::StreamExt;
569 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
570 /// process
571 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
572 /// .flatten_ordered()
573 /// # }, |mut stream| async move {
574 /// // 1, 2, 3, 4
575 /// # for w in (1..5) {
576 /// # assert_eq!(stream.next().await.unwrap(), w);
577 /// # }
578 /// # }));
579 /// # }
580 /// ```
581 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
582 where
583 T: IntoIterator<Item = U>,
584 {
585 self.flat_map_ordered(q!(|d| d))
586 }
587
588 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
589 /// for the element type `T` to produce items in any order.
590 ///
591 /// # Example
592 /// ```rust
593 /// # #[cfg(feature = "deploy")] {
594 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
595 /// # use futures::StreamExt;
596 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
597 /// process
598 /// .source_iter(q!(vec![
599 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
600 /// std::collections::HashSet::from_iter(vec![3, 4]),
601 /// ]))
602 /// .flatten_unordered()
603 /// # }, |mut stream| async move {
604 /// // 1, 2, 3, 4, but in no particular order
605 /// # let mut results = Vec::new();
606 /// # for w in (1..5) {
607 /// # results.push(stream.next().await.unwrap());
608 /// # }
609 /// # results.sort();
610 /// # assert_eq!(results, vec![1, 2, 3, 4]);
611 /// # }));
612 /// # }
613 /// ```
614 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
615 where
616 T: IntoIterator<Item = U>,
617 {
618 self.flat_map_unordered(q!(|d| d))
619 }
620
621 /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
622 /// then emit the elements of that stream one by one. When the inner stream yields
623 /// `Pending`, this operator yields as well.
624 pub fn flat_map_stream_blocking<U, S, F>(
625 self,
626 f: impl IntoQuotedMut<'a, F, L>,
627 ) -> Stream<U, L, B, O, R>
628 where
629 S: futures::Stream<Item = U>,
630 F: Fn(T) -> S + 'a,
631 {
632 let f = f.splice_fn1_ctx(&self.location).into();
633 Stream::new(
634 self.location.clone(),
635 HydroNode::FlatMapStreamBlocking {
636 f,
637 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
638 metadata: self
639 .location
640 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
641 },
642 )
643 }
644
645 /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
646 /// emit its elements one by one. When the inner stream yields `Pending`, this operator
647 /// yields as well.
648 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
649 where
650 T: futures::Stream<Item = U>,
651 {
652 self.flat_map_stream_blocking(q!(|d| d))
653 }
654
655 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
656 /// `f`, preserving the order of the elements.
657 ///
658 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
659 /// not modify or take ownership of the values. If you need to modify the values while filtering
660 /// use [`Stream::filter_map`] instead.
661 ///
662 /// # Example
663 /// ```rust
664 /// # #[cfg(feature = "deploy")] {
665 /// # use hydro_lang::prelude::*;
666 /// # use futures::StreamExt;
667 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
668 /// process
669 /// .source_iter(q!(vec![1, 2, 3, 4]))
670 /// .filter(q!(|&x| x > 2))
671 /// # }, |mut stream| async move {
672 /// // 3, 4
673 /// # for w in (3..5) {
674 /// # assert_eq!(stream.next().await.unwrap(), w);
675 /// # }
676 /// # }));
677 /// # }
678 /// ```
679 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
680 where
681 F: Fn(&T) -> bool + 'a,
682 {
683 let f = f.splice_fn1_borrow_ctx(&self.location).into();
684 Stream::new(
685 self.location.clone(),
686 HydroNode::Filter {
687 f,
688 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
689 metadata: self.location.new_node_metadata(Self::collection_kind()),
690 },
691 )
692 }
693
694 /// Splits the stream into two streams based on a predicate, without cloning elements.
695 ///
696 /// Elements for which `f` returns `true` are sent to the first output stream,
697 /// and elements for which `f` returns `false` are sent to the second output stream.
698 ///
699 /// Unlike using `filter` twice, this only evaluates the predicate once per element
700 /// and does not require `T: Clone`.
701 ///
702 /// The closure `f` receives a reference `&T` rather than an owned value `T` because
703 /// the predicate is only used for routing; the element itself is moved to the
704 /// appropriate output stream.
705 ///
706 /// # Example
707 /// ```rust
708 /// # #[cfg(feature = "deploy")] {
709 /// # use hydro_lang::prelude::*;
710 /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
711 /// # use futures::StreamExt;
712 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
713 /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
714 /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
715 /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
716 /// evens.map(q!(|x| (x, true)))
717 /// .merge_unordered(odds.map(q!(|x| (x, false))))
718 /// # }, |mut stream| async move {
719 /// # let mut results = Vec::new();
720 /// # for _ in 0..6 {
721 /// # results.push(stream.next().await.unwrap());
722 /// # }
723 /// # results.sort();
724 /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
725 /// # }));
726 /// # }
727 /// ```
728 #[expect(
729 clippy::type_complexity,
730 reason = "return type mirrors the input stream type"
731 )]
732 pub fn partition<F>(
733 self,
734 f: impl IntoQuotedMut<'a, F, L>,
735 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
736 where
737 F: Fn(&T) -> bool + 'a,
738 {
739 let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
740 let shared = SharedNode(Rc::new(RefCell::new(
741 self.ir_node.replace(HydroNode::Placeholder),
742 )));
743
744 let true_stream = Stream::new(
745 self.location.clone(),
746 HydroNode::Partition {
747 inner: SharedNode(shared.0.clone()),
748 f: f.clone(),
749 is_true: true,
750 metadata: self.location.new_node_metadata(Self::collection_kind()),
751 },
752 );
753
754 let false_stream = Stream::new(
755 self.location.clone(),
756 HydroNode::Partition {
757 inner: SharedNode(shared.0),
758 f,
759 is_true: false,
760 metadata: self.location.new_node_metadata(Self::collection_kind()),
761 },
762 );
763
764 (true_stream, false_stream)
765 }
766
767 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
768 ///
769 /// # Example
770 /// ```rust
771 /// # #[cfg(feature = "deploy")] {
772 /// # use hydro_lang::prelude::*;
773 /// # use futures::StreamExt;
774 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
775 /// process
776 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
777 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
778 /// # }, |mut stream| async move {
779 /// // 1, 2
780 /// # for w in (1..3) {
781 /// # assert_eq!(stream.next().await.unwrap(), w);
782 /// # }
783 /// # }));
784 /// # }
785 /// ```
786 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
787 where
788 F: Fn(T) -> Option<U> + 'a,
789 {
790 let f = f.splice_fn1_ctx(&self.location).into();
791 Stream::new(
792 self.location.clone(),
793 HydroNode::FilterMap {
794 f,
795 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
796 metadata: self
797 .location
798 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
799 },
800 )
801 }
802
803 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
804 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
805 /// If `other` is an empty [`Optional`], no values will be produced.
806 ///
807 /// # Example
808 /// ```rust
809 /// # #[cfg(feature = "deploy")] {
810 /// # use hydro_lang::prelude::*;
811 /// # use futures::StreamExt;
812 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
813 /// let tick = process.tick();
814 /// let batch = process
815 /// .source_iter(q!(vec![1, 2, 3, 4]))
816 /// .batch(&tick, nondet!(/** test */));
817 /// let count = batch.clone().count(); // `count()` returns a singleton
818 /// batch.cross_singleton(count).all_ticks()
819 /// # }, |mut stream| async move {
820 /// // (1, 4), (2, 4), (3, 4), (4, 4)
821 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
822 /// # assert_eq!(stream.next().await.unwrap(), w);
823 /// # }
824 /// # }));
825 /// # }
826 /// ```
827 pub fn cross_singleton<O2>(
828 self,
829 other: impl Into<Optional<O2, L, Bounded>>,
830 ) -> Stream<(T, O2), L, B, O, R>
831 where
832 O2: Clone,
833 {
834 let other: Optional<O2, L, Bounded> = other.into();
835 check_matching_location(&self.location, &other.location);
836
837 Stream::new(
838 self.location.clone(),
839 HydroNode::CrossSingleton {
840 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
841 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
842 metadata: self
843 .location
844 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
845 },
846 )
847 }
848
849 /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
850 ///
851 /// # Example
852 /// ```rust
853 /// # #[cfg(feature = "deploy")] {
854 /// # use hydro_lang::prelude::*;
855 /// # use futures::StreamExt;
856 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
857 /// let tick = process.tick();
858 /// // ticks are lazy by default, forces the second tick to run
859 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
860 ///
861 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
862 /// let batch_first_tick = process
863 /// .source_iter(q!(vec![1, 2, 3, 4]))
864 /// .batch(&tick, nondet!(/** test */));
865 /// let batch_second_tick = process
866 /// .source_iter(q!(vec![5, 6, 7, 8]))
867 /// .batch(&tick, nondet!(/** test */))
868 /// .defer_tick();
869 /// batch_first_tick.chain(batch_second_tick)
870 /// .filter_if(signal)
871 /// .all_ticks()
872 /// # }, |mut stream| async move {
873 /// // [1, 2, 3, 4]
874 /// # for w in vec![1, 2, 3, 4] {
875 /// # assert_eq!(stream.next().await.unwrap(), w);
876 /// # }
877 /// # }));
878 /// # }
879 /// ```
880 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
881 self.cross_singleton(signal.filter(q!(|b| *b)))
882 .map(q!(|(d, _)| d))
883 }
884
885 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
886 ///
887 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
888 /// leader of a cluster.
889 ///
890 /// # Example
891 /// ```rust
892 /// # #[cfg(feature = "deploy")] {
893 /// # use hydro_lang::prelude::*;
894 /// # use futures::StreamExt;
895 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
896 /// let tick = process.tick();
897 /// // ticks are lazy by default, forces the second tick to run
898 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
899 ///
900 /// let batch_first_tick = process
901 /// .source_iter(q!(vec![1, 2, 3, 4]))
902 /// .batch(&tick, nondet!(/** test */));
903 /// let batch_second_tick = process
904 /// .source_iter(q!(vec![5, 6, 7, 8]))
905 /// .batch(&tick, nondet!(/** test */))
906 /// .defer_tick(); // appears on the second tick
907 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
908 /// batch_first_tick.chain(batch_second_tick)
909 /// .filter_if_some(some_on_first_tick)
910 /// .all_ticks()
911 /// # }, |mut stream| async move {
912 /// // [1, 2, 3, 4]
913 /// # for w in vec![1, 2, 3, 4] {
914 /// # assert_eq!(stream.next().await.unwrap(), w);
915 /// # }
916 /// # }));
917 /// # }
918 /// ```
919 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
920 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
921 self.filter_if(signal.is_some())
922 }
923
924 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
925 ///
926 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
927 /// some local state.
928 ///
929 /// # Example
930 /// ```rust
931 /// # #[cfg(feature = "deploy")] {
932 /// # use hydro_lang::prelude::*;
933 /// # use futures::StreamExt;
934 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
935 /// let tick = process.tick();
936 /// // ticks are lazy by default, forces the second tick to run
937 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
938 ///
939 /// let batch_first_tick = process
940 /// .source_iter(q!(vec![1, 2, 3, 4]))
941 /// .batch(&tick, nondet!(/** test */));
942 /// let batch_second_tick = process
943 /// .source_iter(q!(vec![5, 6, 7, 8]))
944 /// .batch(&tick, nondet!(/** test */))
945 /// .defer_tick(); // appears on the second tick
946 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
947 /// batch_first_tick.chain(batch_second_tick)
948 /// .filter_if_none(some_on_first_tick)
949 /// .all_ticks()
950 /// # }, |mut stream| async move {
951 /// // [5, 6, 7, 8]
952 /// # for w in vec![5, 6, 7, 8] {
953 /// # assert_eq!(stream.next().await.unwrap(), w);
954 /// # }
955 /// # }));
956 /// # }
957 /// ```
958 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
959 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
960 self.filter_if(other.is_none())
961 }
962
963 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
964 /// tupled pairs in a non-deterministic order.
965 ///
966 /// # Example
967 /// ```rust
968 /// # #[cfg(feature = "deploy")] {
969 /// # use hydro_lang::prelude::*;
970 /// # use std::collections::HashSet;
971 /// # use futures::StreamExt;
972 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
973 /// let tick = process.tick();
974 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
975 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
976 /// stream1.cross_product(stream2)
977 /// # }, |mut stream| async move {
978 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
979 /// # stream.map(|i| assert!(expected.contains(&i)));
980 /// # }));
981 /// # }
982 /// ```
983 pub fn cross_product<T2, O2: Ordering>(
984 self,
985 other: Stream<T2, L, B, O2, R>,
986 ) -> Stream<(T, T2), L, B, NoOrder, R>
987 where
988 T: Clone,
989 T2: Clone,
990 {
991 check_matching_location(&self.location, &other.location);
992
993 Stream::new(
994 self.location.clone(),
995 HydroNode::CrossProduct {
996 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
997 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
998 metadata: self
999 .location
1000 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
1001 },
1002 )
1003 }
1004
1005 /// Takes one stream as input and filters out any duplicate occurrences. The output
1006 /// contains all unique values from the input.
1007 ///
1008 /// # Example
1009 /// ```rust
1010 /// # #[cfg(feature = "deploy")] {
1011 /// # use hydro_lang::prelude::*;
1012 /// # use futures::StreamExt;
1013 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1014 /// let tick = process.tick();
1015 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1016 /// # }, |mut stream| async move {
1017 /// # for w in vec![1, 2, 3, 4] {
1018 /// # assert_eq!(stream.next().await.unwrap(), w);
1019 /// # }
1020 /// # }));
1021 /// # }
1022 /// ```
1023 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1024 where
1025 T: Eq + Hash,
1026 {
1027 Stream::new(
1028 self.location.clone(),
1029 HydroNode::Unique {
1030 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1031 metadata: self
1032 .location
1033 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1034 },
1035 )
1036 }
1037
1038 /// Outputs everything in this stream that is *not* contained in the `other` stream.
1039 ///
1040 /// The `other` stream must be [`Bounded`], since this function will wait until
1041 /// all its elements are available before producing any output.
1042 /// # Example
1043 /// ```rust
1044 /// # #[cfg(feature = "deploy")] {
1045 /// # use hydro_lang::prelude::*;
1046 /// # use futures::StreamExt;
1047 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1048 /// let tick = process.tick();
1049 /// let stream = process
1050 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1051 /// .batch(&tick, nondet!(/** test */));
1052 /// let batch = process
1053 /// .source_iter(q!(vec![1, 2]))
1054 /// .batch(&tick, nondet!(/** test */));
1055 /// stream.filter_not_in(batch).all_ticks()
1056 /// # }, |mut stream| async move {
1057 /// # for w in vec![3, 4] {
1058 /// # assert_eq!(stream.next().await.unwrap(), w);
1059 /// # }
1060 /// # }));
1061 /// # }
1062 /// ```
1063 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1064 where
1065 T: Eq + Hash,
1066 B2: IsBounded,
1067 {
1068 check_matching_location(&self.location, &other.location);
1069
1070 Stream::new(
1071 self.location.clone(),
1072 HydroNode::Difference {
1073 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1074 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1075 metadata: self
1076 .location
1077 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1078 },
1079 )
1080 }
1081
1082 /// An operator which allows you to "inspect" each element of a stream without
1083 /// modifying it. The closure `f` is called on a reference to each item. This is
1084 /// mainly useful for debugging, and should not be used to generate side-effects.
1085 ///
1086 /// # Example
1087 /// ```rust
1088 /// # #[cfg(feature = "deploy")] {
1089 /// # use hydro_lang::prelude::*;
1090 /// # use futures::StreamExt;
1091 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1092 /// let nums = process.source_iter(q!(vec![1, 2]));
1093 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1094 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1095 /// # }, |mut stream| async move {
1096 /// # for w in vec![1, 2] {
1097 /// # assert_eq!(stream.next().await.unwrap(), w);
1098 /// # }
1099 /// # }));
1100 /// # }
1101 /// ```
1102 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1103 where
1104 F: Fn(&T) + 'a,
1105 {
1106 let f = f.splice_fn1_borrow_ctx(&self.location).into();
1107
1108 Stream::new(
1109 self.location.clone(),
1110 HydroNode::Inspect {
1111 f,
1112 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1113 metadata: self.location.new_node_metadata(Self::collection_kind()),
1114 },
1115 )
1116 }
1117
1118 /// Executes the provided closure for every element in this stream.
1119 ///
1120 /// Because the closure may have side effects, the stream must have deterministic order
1121 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1122 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1123 /// [`Stream::assume_retries`] with an explanation for why this is the case.
1124 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1125 where
1126 O: IsOrdered,
1127 R: IsExactlyOnce,
1128 {
1129 let f = f.splice_fn1_ctx(&self.location).into();
1130 self.location
1131 .flow_state()
1132 .borrow_mut()
1133 .push_root(HydroRoot::ForEach {
1134 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1135 f,
1136 op_metadata: HydroIrOpMetadata::new(),
1137 });
1138 }
1139
1140 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1141 /// TCP socket to some other server. You should _not_ use this API for interacting with
1142 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1143 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1144 /// interaction with asynchronous sinks.
1145 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1146 where
1147 O: IsOrdered,
1148 R: IsExactlyOnce,
1149 S: 'a + futures::Sink<T> + Unpin,
1150 {
1151 self.location
1152 .flow_state()
1153 .borrow_mut()
1154 .push_root(HydroRoot::DestSink {
1155 sink: sink.splice_typed_ctx(&self.location).into(),
1156 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1157 op_metadata: HydroIrOpMetadata::new(),
1158 });
1159 }
1160
1161 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1162 ///
1163 /// # Example
1164 /// ```rust
1165 /// # #[cfg(feature = "deploy")] {
1166 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1167 /// # use futures::StreamExt;
1168 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1169 /// let tick = process.tick();
1170 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1171 /// numbers.enumerate()
1172 /// # }, |mut stream| async move {
1173 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1174 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1175 /// # assert_eq!(stream.next().await.unwrap(), w);
1176 /// # }
1177 /// # }));
1178 /// # }
1179 /// ```
1180 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1181 where
1182 O: IsOrdered,
1183 R: IsExactlyOnce,
1184 {
1185 Stream::new(
1186 self.location.clone(),
1187 HydroNode::Enumerate {
1188 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1189 metadata: self.location.new_node_metadata(Stream::<
1190 (usize, T),
1191 L,
1192 B,
1193 TotalOrder,
1194 ExactlyOnce,
1195 >::collection_kind()),
1196 },
1197 )
1198 }
1199
1200 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1201 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1202 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1203 ///
1204 /// Depending on the input stream guarantees, the closure may need to be commutative
1205 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1206 ///
1207 /// # Example
1208 /// ```rust
1209 /// # #[cfg(feature = "deploy")] {
1210 /// # use hydro_lang::prelude::*;
1211 /// # use futures::StreamExt;
1212 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1213 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1214 /// words
1215 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1216 /// .into_stream()
1217 /// # }, |mut stream| async move {
1218 /// // "HELLOWORLD"
1219 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1220 /// # }));
1221 /// # }
1222 /// ```
1223 pub fn fold<A, I, F, C, Idemp>(
1224 self,
1225 init: impl IntoQuotedMut<'a, I, L>,
1226 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1227 ) -> Singleton<A, L, B>
1228 where
1229 I: Fn() -> A + 'a,
1230 F: Fn(&mut A, T),
1231 C: ValidCommutativityFor<O>,
1232 Idemp: ValidIdempotenceFor<R>,
1233 {
1234 let init = init.splice_fn0_ctx(&self.location).into();
1235 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1236 proof.register_proof(&comb);
1237
1238 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1239 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1240
1241 let core = HydroNode::Fold {
1242 init,
1243 acc: comb.into(),
1244 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1245 metadata: ordered_etc
1246 .location
1247 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1248 };
1249
1250 Singleton::new(ordered_etc.location.clone(), core)
1251 }
1252
1253 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1254 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1255 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1256 /// reference, so that it can be modified in place.
1257 ///
1258 /// Depending on the input stream guarantees, the closure may need to be commutative
1259 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1260 ///
1261 /// # Example
1262 /// ```rust
1263 /// # #[cfg(feature = "deploy")] {
1264 /// # use hydro_lang::prelude::*;
1265 /// # use futures::StreamExt;
1266 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1267 /// let bools = process.source_iter(q!(vec![false, true, false]));
1268 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1269 /// # }, |mut stream| async move {
1270 /// // true
1271 /// # assert_eq!(stream.next().await.unwrap(), true);
1272 /// # }));
1273 /// # }
1274 /// ```
1275 pub fn reduce<F, C, Idemp>(
1276 self,
1277 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1278 ) -> Optional<T, L, B>
1279 where
1280 F: Fn(&mut T, T) + 'a,
1281 C: ValidCommutativityFor<O>,
1282 Idemp: ValidIdempotenceFor<R>,
1283 {
1284 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1285 proof.register_proof(&f);
1286
1287 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1288 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1289
1290 let core = HydroNode::Reduce {
1291 f: f.into(),
1292 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1293 metadata: ordered_etc
1294 .location
1295 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1296 };
1297
1298 Optional::new(ordered_etc.location.clone(), core)
1299 }
1300
1301 /// Computes the maximum element in the stream as an [`Optional`], which
1302 /// will be empty until the first element in the input arrives.
1303 ///
1304 /// # Example
1305 /// ```rust
1306 /// # #[cfg(feature = "deploy")] {
1307 /// # use hydro_lang::prelude::*;
1308 /// # use futures::StreamExt;
1309 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1310 /// let tick = process.tick();
1311 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1312 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1313 /// batch.max().all_ticks()
1314 /// # }, |mut stream| async move {
1315 /// // 4
1316 /// # assert_eq!(stream.next().await.unwrap(), 4);
1317 /// # }));
1318 /// # }
1319 /// ```
1320 pub fn max(self) -> Optional<T, L, B>
1321 where
1322 T: Ord,
1323 {
1324 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1325 .assume_ordering_trusted_bounded::<TotalOrder>(
1326 nondet!(/** max is commutative, but order affects intermediates */),
1327 )
1328 .reduce(q!(|curr, new| {
1329 if new > *curr {
1330 *curr = new;
1331 }
1332 }))
1333 }
1334
1335 /// Computes the minimum element in the stream as an [`Optional`], which
1336 /// will be empty until the first element in the input arrives.
1337 ///
1338 /// # Example
1339 /// ```rust
1340 /// # #[cfg(feature = "deploy")] {
1341 /// # use hydro_lang::prelude::*;
1342 /// # use futures::StreamExt;
1343 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1344 /// let tick = process.tick();
1345 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1346 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1347 /// batch.min().all_ticks()
1348 /// # }, |mut stream| async move {
1349 /// // 1
1350 /// # assert_eq!(stream.next().await.unwrap(), 1);
1351 /// # }));
1352 /// # }
1353 /// ```
1354 pub fn min(self) -> Optional<T, L, B>
1355 where
1356 T: Ord,
1357 {
1358 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1359 .assume_ordering_trusted_bounded::<TotalOrder>(
1360 nondet!(/** max is commutative, but order affects intermediates */),
1361 )
1362 .reduce(q!(|curr, new| {
1363 if new < *curr {
1364 *curr = new;
1365 }
1366 }))
1367 }
1368
1369 /// Computes the first element in the stream as an [`Optional`], which
1370 /// will be empty until the first element in the input arrives.
1371 ///
1372 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1373 /// re-ordering of elements may cause the first element to change.
1374 ///
1375 /// # Example
1376 /// ```rust
1377 /// # #[cfg(feature = "deploy")] {
1378 /// # use hydro_lang::prelude::*;
1379 /// # use futures::StreamExt;
1380 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1381 /// let tick = process.tick();
1382 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1383 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1384 /// batch.first().all_ticks()
1385 /// # }, |mut stream| async move {
1386 /// // 1
1387 /// # assert_eq!(stream.next().await.unwrap(), 1);
1388 /// # }));
1389 /// # }
1390 /// ```
1391 pub fn first(self) -> Optional<T, L, B>
1392 where
1393 O: IsOrdered,
1394 {
1395 self.make_totally_ordered()
1396 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1397 .reduce(q!(|_, _| {}))
1398 }
1399
1400 /// Computes the last element in the stream as an [`Optional`], which
1401 /// will be empty until an element in the input arrives.
1402 ///
1403 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1404 /// re-ordering of elements may cause the last element to change.
1405 ///
1406 /// # Example
1407 /// ```rust
1408 /// # #[cfg(feature = "deploy")] {
1409 /// # use hydro_lang::prelude::*;
1410 /// # use futures::StreamExt;
1411 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1412 /// let tick = process.tick();
1413 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1414 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1415 /// batch.last().all_ticks()
1416 /// # }, |mut stream| async move {
1417 /// // 4
1418 /// # assert_eq!(stream.next().await.unwrap(), 4);
1419 /// # }));
1420 /// # }
1421 /// ```
1422 pub fn last(self) -> Optional<T, L, B>
1423 where
1424 O: IsOrdered,
1425 {
1426 self.make_totally_ordered()
1427 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1428 .reduce(q!(|curr, new| *curr = new))
1429 }
1430
1431 /// Collects all the elements of this stream into a single [`Vec`] element.
1432 ///
1433 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1434 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1435 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1436 /// the vector at an arbitrary point in time.
1437 ///
1438 /// # Example
1439 /// ```rust
1440 /// # #[cfg(feature = "deploy")] {
1441 /// # use hydro_lang::prelude::*;
1442 /// # use futures::StreamExt;
1443 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1444 /// let tick = process.tick();
1445 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1446 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1447 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1448 /// # }, |mut stream| async move {
1449 /// // [ vec![1, 2, 3, 4] ]
1450 /// # for w in vec![vec![1, 2, 3, 4]] {
1451 /// # assert_eq!(stream.next().await.unwrap(), w);
1452 /// # }
1453 /// # }));
1454 /// # }
1455 /// ```
1456 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1457 where
1458 O: IsOrdered,
1459 R: IsExactlyOnce,
1460 {
1461 self.make_totally_ordered().make_exactly_once().fold(
1462 q!(|| vec![]),
1463 q!(|acc, v| {
1464 acc.push(v);
1465 }),
1466 )
1467 }
1468
1469 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1470 /// and emitting each intermediate result.
1471 ///
1472 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1473 /// containing all intermediate accumulated values. The scan operation can also terminate early
1474 /// by returning `None`.
1475 ///
1476 /// The function takes a mutable reference to the accumulator and the current element, and returns
1477 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1478 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1479 ///
1480 /// # Examples
1481 ///
1482 /// Basic usage - running sum:
1483 /// ```rust
1484 /// # #[cfg(feature = "deploy")] {
1485 /// # use hydro_lang::prelude::*;
1486 /// # use futures::StreamExt;
1487 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1488 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1489 /// q!(|| 0),
1490 /// q!(|acc, x| {
1491 /// *acc += x;
1492 /// Some(*acc)
1493 /// }),
1494 /// )
1495 /// # }, |mut stream| async move {
1496 /// // Output: 1, 3, 6, 10
1497 /// # for w in vec![1, 3, 6, 10] {
1498 /// # assert_eq!(stream.next().await.unwrap(), w);
1499 /// # }
1500 /// # }));
1501 /// # }
1502 /// ```
1503 ///
1504 /// Early termination example:
1505 /// ```rust
1506 /// # #[cfg(feature = "deploy")] {
1507 /// # use hydro_lang::prelude::*;
1508 /// # use futures::StreamExt;
1509 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1510 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1511 /// q!(|| 1),
1512 /// q!(|state, x| {
1513 /// *state = *state * x;
1514 /// if *state > 6 {
1515 /// None // Terminate the stream
1516 /// } else {
1517 /// Some(-*state)
1518 /// }
1519 /// }),
1520 /// )
1521 /// # }, |mut stream| async move {
1522 /// // Output: -1, -2, -6
1523 /// # for w in vec![-1, -2, -6] {
1524 /// # assert_eq!(stream.next().await.unwrap(), w);
1525 /// # }
1526 /// # }));
1527 /// # }
1528 /// ```
1529 pub fn scan<A, U, I, F>(
1530 self,
1531 init: impl IntoQuotedMut<'a, I, L>,
1532 f: impl IntoQuotedMut<'a, F, L>,
1533 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1534 where
1535 O: IsOrdered,
1536 R: IsExactlyOnce,
1537 I: Fn() -> A + 'a,
1538 F: Fn(&mut A, T) -> Option<U> + 'a,
1539 {
1540 let init = init.splice_fn0_ctx(&self.location).into();
1541 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1542
1543 Stream::new(
1544 self.location.clone(),
1545 HydroNode::Scan {
1546 init,
1547 acc: f,
1548 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1549 metadata: self.location.new_node_metadata(
1550 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1551 ),
1552 },
1553 )
1554 }
1555
1556 /// Iteratively processes the elements of the stream using a state machine that can yield
1557 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1558 /// syntax in Rust, without requiring special syntax.
1559 ///
1560 /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1561 /// state. The second argument defines the processing logic, taking in a mutable reference
1562 /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1563 /// variants define what is emitted and whether further inputs should be processed.
1564 ///
1565 /// # Example
1566 /// ```rust
1567 /// # #[cfg(feature = "deploy")] {
1568 /// # use hydro_lang::prelude::*;
1569 /// # use futures::StreamExt;
1570 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1571 /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1572 /// q!(|| 0),
1573 /// q!(|acc, x| {
1574 /// *acc += x;
1575 /// if *acc > 100 {
1576 /// hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1577 /// } else if *acc % 2 == 0 {
1578 /// hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1579 /// } else {
1580 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1581 /// }
1582 /// }),
1583 /// )
1584 /// # }, |mut stream| async move {
1585 /// // Output: "even", "done!"
1586 /// # let mut results = Vec::new();
1587 /// # for _ in 0..2 {
1588 /// # results.push(stream.next().await.unwrap());
1589 /// # }
1590 /// # results.sort();
1591 /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1592 /// # }));
1593 /// # }
1594 /// ```
1595 pub fn generator<A, U, I, F>(
1596 self,
1597 init: impl IntoQuotedMut<'a, I, L> + Copy,
1598 f: impl IntoQuotedMut<'a, F, L> + Copy,
1599 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1600 where
1601 O: IsOrdered,
1602 R: IsExactlyOnce,
1603 I: Fn() -> A + 'a,
1604 F: Fn(&mut A, T) -> Generate<U> + 'a,
1605 {
1606 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1607 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1608
1609 let this = self.make_totally_ordered().make_exactly_once();
1610
1611 // State is Option<Option<A>>:
1612 // None = not yet initialized
1613 // Some(Some(a)) = active with state a
1614 // Some(None) = terminated
1615 let scan_init = q!(|| None)
1616 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1617 .into();
1618 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1619 if state.is_none() {
1620 *state = Some(Some(init()));
1621 }
1622 match state {
1623 Some(Some(state_value)) => match f(state_value, v) {
1624 Generate::Yield(out) => Some(Some(out)),
1625 Generate::Return(out) => {
1626 *state = Some(None);
1627 Some(Some(out))
1628 }
1629 // Unlike KeyedStream, we can terminate the scan directly on
1630 // Break/Return because there is only one state (no other keys
1631 // that still need processing).
1632 Generate::Break => None,
1633 Generate::Continue => Some(None),
1634 },
1635 // State is Some(None) after Return; terminate the scan.
1636 _ => None,
1637 }
1638 })
1639 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1640 .into();
1641
1642 let scan_node = HydroNode::Scan {
1643 init: scan_init,
1644 acc: scan_f,
1645 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1646 metadata: this.location.new_node_metadata(Stream::<
1647 Option<U>,
1648 L,
1649 B,
1650 TotalOrder,
1651 ExactlyOnce,
1652 >::collection_kind()),
1653 };
1654
1655 let flatten_f = q!(|d| d)
1656 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1657 .into();
1658 let flatten_node = HydroNode::FlatMap {
1659 f: flatten_f,
1660 input: Box::new(scan_node),
1661 metadata: this
1662 .location
1663 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1664 };
1665
1666 Stream::new(this.location.clone(), flatten_node)
1667 }
1668
1669 /// Given a time interval, returns a stream corresponding to samples taken from the
1670 /// stream roughly at that interval. The output will have elements in the same order
1671 /// as the input, but with arbitrary elements skipped between samples. There is also
1672 /// no guarantee on the exact timing of the samples.
1673 ///
1674 /// # Non-Determinism
1675 /// The output stream is non-deterministic in which elements are sampled, since this
1676 /// is controlled by a clock.
1677 pub fn sample_every(
1678 self,
1679 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1680 nondet: NonDet,
1681 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1682 where
1683 L: NoTick + NoAtomic,
1684 {
1685 let samples = self.location.source_interval(interval, nondet);
1686
1687 let tick = self.location.tick();
1688 self.batch(&tick, nondet)
1689 .filter_if(samples.batch(&tick, nondet).first().is_some())
1690 .all_ticks()
1691 .weaken_retries()
1692 }
1693
1694 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1695 /// stream has not emitted a value since that duration.
1696 ///
1697 /// # Non-Determinism
1698 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1699 /// samples take place, timeouts may be non-deterministically generated or missed,
1700 /// and the notification of the timeout may be delayed as well. There is also no
1701 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1702 /// detected based on when the next sample is taken.
1703 pub fn timeout(
1704 self,
1705 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1706 nondet: NonDet,
1707 ) -> Optional<(), L, Unbounded>
1708 where
1709 L: NoTick + NoAtomic,
1710 {
1711 let tick = self.location.tick();
1712
1713 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1714 q!(|| None),
1715 q!(
1716 |latest, _| {
1717 *latest = Some(Instant::now());
1718 },
1719 commutative = manual_proof!(/** TODO */)
1720 ),
1721 );
1722
1723 latest_received
1724 .snapshot(&tick, nondet)
1725 .filter_map(q!(move |latest_received| {
1726 if let Some(latest_received) = latest_received {
1727 if Instant::now().duration_since(latest_received) > duration {
1728 Some(())
1729 } else {
1730 None
1731 }
1732 } else {
1733 Some(())
1734 }
1735 }))
1736 .latest()
1737 }
1738
1739 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1740 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1741 ///
1742 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1743 /// processed before an acknowledgement is emitted.
1744 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1745 let id = self.location.flow_state().borrow_mut().next_clock_id();
1746 let out_location = Atomic {
1747 tick: Tick {
1748 id,
1749 l: self.location.clone(),
1750 },
1751 };
1752 Stream::new(
1753 out_location.clone(),
1754 HydroNode::BeginAtomic {
1755 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1756 metadata: out_location
1757 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1758 },
1759 )
1760 }
1761
1762 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1763 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1764 /// the order of the input. The output stream will execute in the [`Tick`] that was
1765 /// used to create the atomic section.
1766 ///
1767 /// # Non-Determinism
1768 /// The batch boundaries are non-deterministic and may change across executions.
1769 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1770 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1771 Stream::new(
1772 tick.clone(),
1773 HydroNode::Batch {
1774 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1775 metadata: tick
1776 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1777 },
1778 )
1779 }
1780
1781 /// An operator which allows you to "name" a `HydroNode`.
1782 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1783 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1784 {
1785 let mut node = self.ir_node.borrow_mut();
1786 let metadata = node.metadata_mut();
1787 metadata.tag = Some(name.to_owned());
1788 }
1789 self
1790 }
1791
1792 /// Explicitly "casts" the stream to a type with a different ordering
1793 /// guarantee. Useful in unsafe code where the ordering cannot be proven
1794 /// by the type-system.
1795 ///
1796 /// # Non-Determinism
1797 /// This function is used as an escape hatch, and any mistakes in the
1798 /// provided ordering guarantee will propagate into the guarantees
1799 /// for the rest of the program.
1800 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1801 if O::ORDERING_KIND == O2::ORDERING_KIND {
1802 Stream::new(
1803 self.location.clone(),
1804 self.ir_node.replace(HydroNode::Placeholder),
1805 )
1806 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1807 // We can always weaken the ordering guarantee
1808 Stream::new(
1809 self.location.clone(),
1810 HydroNode::Cast {
1811 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1812 metadata: self
1813 .location
1814 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1815 },
1816 )
1817 } else {
1818 Stream::new(
1819 self.location.clone(),
1820 HydroNode::ObserveNonDet {
1821 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1822 trusted: false,
1823 metadata: self
1824 .location
1825 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1826 },
1827 )
1828 }
1829 }
1830
1831 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1832 // intermediate states will not be revealed
1833 fn assume_ordering_trusted_bounded<O2: Ordering>(
1834 self,
1835 nondet: NonDet,
1836 ) -> Stream<T, L, B, O2, R> {
1837 if B::BOUNDED {
1838 self.assume_ordering_trusted(nondet)
1839 } else {
1840 self.assume_ordering(nondet)
1841 }
1842 }
1843
1844 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1845 // is not observable
1846 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1847 self,
1848 _nondet: NonDet,
1849 ) -> Stream<T, L, B, O2, R> {
1850 if O::ORDERING_KIND == O2::ORDERING_KIND {
1851 Stream::new(
1852 self.location.clone(),
1853 self.ir_node.replace(HydroNode::Placeholder),
1854 )
1855 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1856 // We can always weaken the ordering guarantee
1857 Stream::new(
1858 self.location.clone(),
1859 HydroNode::Cast {
1860 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1861 metadata: self
1862 .location
1863 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1864 },
1865 )
1866 } else {
1867 Stream::new(
1868 self.location.clone(),
1869 HydroNode::ObserveNonDet {
1870 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1871 trusted: true,
1872 metadata: self
1873 .location
1874 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1875 },
1876 )
1877 }
1878 }
1879
1880 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1881 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1882 /// which is always safe because that is the weakest possible guarantee.
1883 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1884 self.weaken_ordering::<NoOrder>()
1885 }
1886
1887 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1888 /// enforcing that `O2` is weaker than the input ordering guarantee.
1889 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1890 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1891 self.assume_ordering::<O2>(nondet)
1892 }
1893
1894 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1895 /// implies that `O == TotalOrder`.
1896 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1897 where
1898 O: IsOrdered,
1899 {
1900 self.assume_ordering(nondet!(/** no-op */))
1901 }
1902
1903 /// Explicitly "casts" the stream to a type with a different retries
1904 /// guarantee. Useful in unsafe code where the lack of retries cannot
1905 /// be proven by the type-system.
1906 ///
1907 /// # Non-Determinism
1908 /// This function is used as an escape hatch, and any mistakes in the
1909 /// provided retries guarantee will propagate into the guarantees
1910 /// for the rest of the program.
1911 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1912 if R::RETRIES_KIND == R2::RETRIES_KIND {
1913 Stream::new(
1914 self.location.clone(),
1915 self.ir_node.replace(HydroNode::Placeholder),
1916 )
1917 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1918 // We can always weaken the retries guarantee
1919 Stream::new(
1920 self.location.clone(),
1921 HydroNode::Cast {
1922 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1923 metadata: self
1924 .location
1925 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1926 },
1927 )
1928 } else {
1929 Stream::new(
1930 self.location.clone(),
1931 HydroNode::ObserveNonDet {
1932 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1933 trusted: false,
1934 metadata: self
1935 .location
1936 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1937 },
1938 )
1939 }
1940 }
1941
1942 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1943 // is not observable
1944 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1945 if R::RETRIES_KIND == R2::RETRIES_KIND {
1946 Stream::new(
1947 self.location.clone(),
1948 self.ir_node.replace(HydroNode::Placeholder),
1949 )
1950 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1951 // We can always weaken the retries guarantee
1952 Stream::new(
1953 self.location.clone(),
1954 HydroNode::Cast {
1955 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1956 metadata: self
1957 .location
1958 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1959 },
1960 )
1961 } else {
1962 Stream::new(
1963 self.location.clone(),
1964 HydroNode::ObserveNonDet {
1965 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1966 trusted: true,
1967 metadata: self
1968 .location
1969 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1970 },
1971 )
1972 }
1973 }
1974
1975 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1976 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1977 /// which is always safe because that is the weakest possible guarantee.
1978 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1979 self.weaken_retries::<AtLeastOnce>()
1980 }
1981
1982 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1983 /// enforcing that `R2` is weaker than the input retries guarantee.
1984 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1985 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1986 self.assume_retries::<R2>(nondet)
1987 }
1988
1989 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
1990 /// implies that `R == ExactlyOnce`.
1991 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
1992 where
1993 R: IsExactlyOnce,
1994 {
1995 self.assume_retries(nondet!(/** no-op */))
1996 }
1997
1998 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1999 /// implies that `B == Bounded`.
2000 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2001 where
2002 B: IsBounded,
2003 {
2004 Stream::new(
2005 self.location.clone(),
2006 self.ir_node.replace(HydroNode::Placeholder),
2007 )
2008 }
2009}
2010
2011impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2012where
2013 L: Location<'a>,
2014{
2015 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2016 ///
2017 /// # Example
2018 /// ```rust
2019 /// # #[cfg(feature = "deploy")] {
2020 /// # use hydro_lang::prelude::*;
2021 /// # use futures::StreamExt;
2022 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2023 /// process.source_iter(q!(&[1, 2, 3])).cloned()
2024 /// # }, |mut stream| async move {
2025 /// // 1, 2, 3
2026 /// # for w in vec![1, 2, 3] {
2027 /// # assert_eq!(stream.next().await.unwrap(), w);
2028 /// # }
2029 /// # }));
2030 /// # }
2031 /// ```
2032 pub fn cloned(self) -> Stream<T, L, B, O, R>
2033 where
2034 T: Clone,
2035 {
2036 self.map(q!(|d| d.clone()))
2037 }
2038}
2039
2040impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2041where
2042 L: Location<'a>,
2043{
2044 /// Computes the number of elements in the stream as a [`Singleton`].
2045 ///
2046 /// # Example
2047 /// ```rust
2048 /// # #[cfg(feature = "deploy")] {
2049 /// # use hydro_lang::prelude::*;
2050 /// # use futures::StreamExt;
2051 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2052 /// let tick = process.tick();
2053 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2054 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2055 /// batch.count().all_ticks()
2056 /// # }, |mut stream| async move {
2057 /// // 4
2058 /// # assert_eq!(stream.next().await.unwrap(), 4);
2059 /// # }));
2060 /// # }
2061 /// ```
2062 pub fn count(self) -> Singleton<usize, L, B> {
2063 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2064 /// Order does not affect eventual count, and also does not affect intermediate states.
2065 ))
2066 .fold(q!(|| 0usize), q!(|count, _| *count += 1))
2067 }
2068}
2069
2070impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2071 /// Produces a new stream that merges the elements of the two input streams.
2072 /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2073 ///
2074 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2075 /// [`Bounded`], you can use [`Stream::chain`] instead.
2076 ///
2077 /// # Example
2078 /// ```rust
2079 /// # #[cfg(feature = "deploy")] {
2080 /// # use hydro_lang::prelude::*;
2081 /// # use futures::StreamExt;
2082 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2083 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2084 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2085 /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2086 /// # }, |mut stream| async move {
2087 /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2088 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2089 /// # assert_eq!(stream.next().await.unwrap(), w);
2090 /// # }
2091 /// # }));
2092 /// # }
2093 /// ```
2094 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2095 self,
2096 other: Stream<T, L, Unbounded, O2, R2>,
2097 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2098 where
2099 R: MinRetries<R2>,
2100 {
2101 Stream::new(
2102 self.location.clone(),
2103 HydroNode::Chain {
2104 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2105 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2106 metadata: self.location.new_node_metadata(Stream::<
2107 T,
2108 L,
2109 Unbounded,
2110 NoOrder,
2111 <R as MinRetries<R2>>::Min,
2112 >::collection_kind()),
2113 },
2114 )
2115 }
2116
2117 /// Deprecated: use [`Stream::merge_unordered`] instead.
2118 #[deprecated(note = "use `merge_unordered` instead")]
2119 pub fn interleave<O2: Ordering, R2: Retries>(
2120 self,
2121 other: Stream<T, L, Unbounded, O2, R2>,
2122 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2123 where
2124 R: MinRetries<R2>,
2125 {
2126 self.merge_unordered(other)
2127 }
2128}
2129
2130impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
2131 /// Produces a new stream that combines the elements of the two input streams,
2132 /// preserving the relative order of elements within each input.
2133 ///
2134 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2135 /// [`Bounded`], you can use [`Stream::chain`] instead.
2136 ///
2137 /// # Non-Determinism
2138 /// The order in which elements *across* the two streams will be interleaved is
2139 /// non-deterministic, so the order of elements will vary across runs. If the output order
2140 /// is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic but emits an
2141 /// unordered stream.
2142 ///
2143 /// # Example
2144 /// ```rust
2145 /// # #[cfg(feature = "deploy")] {
2146 /// # use hydro_lang::prelude::*;
2147 /// # use futures::StreamExt;
2148 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2149 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2150 /// # process.source_iter(q!(vec![1, 3])).into();
2151 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2152 /// # }, |mut stream| async move {
2153 /// // 1, 3 and 2, 4 in some order, preserving the original local order
2154 /// # for w in vec![1, 3, 2, 4] {
2155 /// # assert_eq!(stream.next().await.unwrap(), w);
2156 /// # }
2157 /// # }));
2158 /// # }
2159 /// ```
2160 pub fn merge_ordered<R2: Retries>(
2161 self,
2162 other: Stream<T, L, Unbounded, TotalOrder, R2>,
2163 _nondet: NonDet,
2164 ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
2165 where
2166 R: MinRetries<R2>,
2167 {
2168 Stream::new(
2169 self.location.clone(),
2170 HydroNode::Chain {
2171 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2172 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2173 metadata: self.location.new_node_metadata(Stream::<
2174 T,
2175 L,
2176 Unbounded,
2177 TotalOrder,
2178 <R as MinRetries<R2>>::Min,
2179 >::collection_kind()),
2180 },
2181 )
2182 }
2183}
2184
2185impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2186where
2187 L: Location<'a>,
2188{
2189 /// Produces a new stream that emits the input elements in sorted order.
2190 ///
2191 /// The input stream can have any ordering guarantee, but the output stream
2192 /// will have a [`TotalOrder`] guarantee. This operator will block until all
2193 /// elements in the input stream are available, so it requires the input stream
2194 /// to be [`Bounded`].
2195 ///
2196 /// # Example
2197 /// ```rust
2198 /// # #[cfg(feature = "deploy")] {
2199 /// # use hydro_lang::prelude::*;
2200 /// # use futures::StreamExt;
2201 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2202 /// let tick = process.tick();
2203 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2204 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2205 /// batch.sort().all_ticks()
2206 /// # }, |mut stream| async move {
2207 /// // 1, 2, 3, 4
2208 /// # for w in (1..5) {
2209 /// # assert_eq!(stream.next().await.unwrap(), w);
2210 /// # }
2211 /// # }));
2212 /// # }
2213 /// ```
2214 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2215 where
2216 B: IsBounded,
2217 T: Ord,
2218 {
2219 let this = self.make_bounded();
2220 Stream::new(
2221 this.location.clone(),
2222 HydroNode::Sort {
2223 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2224 metadata: this
2225 .location
2226 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2227 },
2228 )
2229 }
2230
2231 /// Produces a new stream that first emits the elements of the `self` stream,
2232 /// and then emits the elements of the `other` stream. The output stream has
2233 /// a [`TotalOrder`] guarantee if and only if both input streams have a
2234 /// [`TotalOrder`] guarantee.
2235 ///
2236 /// Currently, both input streams must be [`Bounded`]. This operator will block
2237 /// on the first stream until all its elements are available. In a future version,
2238 /// we will relax the requirement on the `other` stream.
2239 ///
2240 /// # Example
2241 /// ```rust
2242 /// # #[cfg(feature = "deploy")] {
2243 /// # use hydro_lang::prelude::*;
2244 /// # use futures::StreamExt;
2245 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2246 /// let tick = process.tick();
2247 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2248 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2249 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2250 /// # }, |mut stream| async move {
2251 /// // 2, 3, 4, 5, 1, 2, 3, 4
2252 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2253 /// # assert_eq!(stream.next().await.unwrap(), w);
2254 /// # }
2255 /// # }));
2256 /// # }
2257 /// ```
2258 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2259 self,
2260 other: Stream<T, L, B2, O2, R2>,
2261 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2262 where
2263 B: IsBounded,
2264 O: MinOrder<O2>,
2265 R: MinRetries<R2>,
2266 {
2267 check_matching_location(&self.location, &other.location);
2268
2269 Stream::new(
2270 self.location.clone(),
2271 HydroNode::Chain {
2272 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2273 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2274 metadata: self.location.new_node_metadata(Stream::<
2275 T,
2276 L,
2277 B2,
2278 <O as MinOrder<O2>>::Min,
2279 <R as MinRetries<R2>>::Min,
2280 >::collection_kind()),
2281 },
2282 )
2283 }
2284
2285 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2286 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2287 /// because this is compiled into a nested loop.
2288 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2289 self,
2290 other: Stream<T2, L, Bounded, O2, R>,
2291 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2292 where
2293 B: IsBounded,
2294 T: Clone,
2295 T2: Clone,
2296 {
2297 let this = self.make_bounded();
2298 check_matching_location(&this.location, &other.location);
2299
2300 Stream::new(
2301 this.location.clone(),
2302 HydroNode::CrossProduct {
2303 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2304 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2305 metadata: this.location.new_node_metadata(Stream::<
2306 (T, T2),
2307 L,
2308 Bounded,
2309 <O2 as MinOrder<O>>::Min,
2310 R,
2311 >::collection_kind()),
2312 },
2313 )
2314 }
2315
2316 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2317 /// `self` used as the values for *each* key.
2318 ///
2319 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2320 /// values. For example, it can be used to send the same set of elements to several cluster
2321 /// members, if the membership information is available as a [`KeyedSingleton`].
2322 ///
2323 /// # Example
2324 /// ```rust
2325 /// # #[cfg(feature = "deploy")] {
2326 /// # use hydro_lang::prelude::*;
2327 /// # use futures::StreamExt;
2328 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2329 /// # let tick = process.tick();
2330 /// let keyed_singleton = // { 1: (), 2: () }
2331 /// # process
2332 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2333 /// # .into_keyed()
2334 /// # .batch(&tick, nondet!(/** test */))
2335 /// # .first();
2336 /// let stream = // [ "a", "b" ]
2337 /// # process
2338 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2339 /// # .batch(&tick, nondet!(/** test */));
2340 /// stream.repeat_with_keys(keyed_singleton)
2341 /// # .entries().all_ticks()
2342 /// # }, |mut stream| async move {
2343 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2344 /// # let mut results = Vec::new();
2345 /// # for _ in 0..4 {
2346 /// # results.push(stream.next().await.unwrap());
2347 /// # }
2348 /// # results.sort();
2349 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2350 /// # }));
2351 /// # }
2352 /// ```
2353 pub fn repeat_with_keys<K, V2>(
2354 self,
2355 keys: KeyedSingleton<K, V2, L, Bounded>,
2356 ) -> KeyedStream<K, T, L, Bounded, O, R>
2357 where
2358 B: IsBounded,
2359 K: Clone,
2360 T: Clone,
2361 {
2362 keys.keys()
2363 .weaken_retries()
2364 .assume_ordering_trusted::<TotalOrder>(
2365 nondet!(/** keyed stream does not depend on ordering of keys */),
2366 )
2367 .cross_product_nested_loop(self.make_bounded())
2368 .into_keyed()
2369 }
2370
2371 /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2372 /// execution until all results are available. The output order is based on when futures
2373 /// complete, and may be different than the input order.
2374 ///
2375 /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2376 /// while futures are pending, this variant blocks until the futures resolve.
2377 ///
2378 /// # Example
2379 /// ```rust
2380 /// # #[cfg(feature = "deploy")] {
2381 /// # use std::collections::HashSet;
2382 /// # use futures::StreamExt;
2383 /// # use hydro_lang::prelude::*;
2384 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2385 /// process
2386 /// .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2387 /// .map(q!(|x| async move {
2388 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2389 /// x
2390 /// }))
2391 /// .resolve_futures_blocking()
2392 /// # },
2393 /// # |mut stream| async move {
2394 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2395 /// # let mut output = HashSet::new();
2396 /// # for _ in 1..10 {
2397 /// # output.insert(stream.next().await.unwrap());
2398 /// # }
2399 /// # assert_eq!(
2400 /// # output,
2401 /// # HashSet::<i32>::from_iter(1..10)
2402 /// # );
2403 /// # },
2404 /// # ));
2405 /// # }
2406 /// ```
2407 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2408 where
2409 T: Future,
2410 {
2411 Stream::new(
2412 self.location.clone(),
2413 HydroNode::ResolveFuturesBlocking {
2414 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2415 metadata: self
2416 .location
2417 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2418 },
2419 )
2420 }
2421
2422 /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2423 ///
2424 /// # Example
2425 /// ```rust
2426 /// # #[cfg(feature = "deploy")] {
2427 /// # use hydro_lang::prelude::*;
2428 /// # use futures::StreamExt;
2429 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2430 /// let tick = process.tick();
2431 /// let empty: Stream<i32, _, Bounded> = process
2432 /// .source_iter(q!(Vec::<i32>::new()))
2433 /// .batch(&tick, nondet!(/** test */));
2434 /// empty.is_empty().all_ticks()
2435 /// # }, |mut stream| async move {
2436 /// // true
2437 /// # assert_eq!(stream.next().await.unwrap(), true);
2438 /// # }));
2439 /// # }
2440 /// ```
2441 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2442 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2443 where
2444 B: IsBounded,
2445 {
2446 self.make_bounded()
2447 .assume_ordering_trusted::<TotalOrder>(
2448 nondet!(/** is_empty intermediates unaffected by order */),
2449 )
2450 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** is_empty is idempotent */))
2451 .fold(q!(|| true), q!(|empty, _| { *empty = false },))
2452 }
2453}
2454
2455impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2456where
2457 L: Location<'a>,
2458{
2459 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2460 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2461 /// by equi-joining the two streams on the key attribute `K`.
2462 ///
2463 /// # Example
2464 /// ```rust
2465 /// # #[cfg(feature = "deploy")] {
2466 /// # use hydro_lang::prelude::*;
2467 /// # use std::collections::HashSet;
2468 /// # use futures::StreamExt;
2469 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2470 /// let tick = process.tick();
2471 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2472 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2473 /// stream1.join(stream2)
2474 /// # }, |mut stream| async move {
2475 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2476 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2477 /// # stream.map(|i| assert!(expected.contains(&i)));
2478 /// # }));
2479 /// # }
2480 pub fn join<V2, O2: Ordering, R2: Retries>(
2481 self,
2482 n: Stream<(K, V2), L, B, O2, R2>,
2483 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2484 where
2485 K: Eq + Hash,
2486 R: MinRetries<R2>,
2487 {
2488 check_matching_location(&self.location, &n.location);
2489
2490 Stream::new(
2491 self.location.clone(),
2492 HydroNode::Join {
2493 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2494 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2495 metadata: self.location.new_node_metadata(Stream::<
2496 (K, (V1, V2)),
2497 L,
2498 B,
2499 NoOrder,
2500 <R as MinRetries<R2>>::Min,
2501 >::collection_kind()),
2502 },
2503 )
2504 }
2505
2506 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2507 /// computes the anti-join of the items in the input -- i.e. returns
2508 /// unique items in the first input that do not have a matching key
2509 /// in the second input.
2510 ///
2511 /// # Example
2512 /// ```rust
2513 /// # #[cfg(feature = "deploy")] {
2514 /// # use hydro_lang::prelude::*;
2515 /// # use futures::StreamExt;
2516 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2517 /// let tick = process.tick();
2518 /// let stream = process
2519 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2520 /// .batch(&tick, nondet!(/** test */));
2521 /// let batch = process
2522 /// .source_iter(q!(vec![1, 2]))
2523 /// .batch(&tick, nondet!(/** test */));
2524 /// stream.anti_join(batch).all_ticks()
2525 /// # }, |mut stream| async move {
2526 /// # for w in vec![(3, 'c'), (4, 'd')] {
2527 /// # assert_eq!(stream.next().await.unwrap(), w);
2528 /// # }
2529 /// # }));
2530 /// # }
2531 pub fn anti_join<O2: Ordering, R2: Retries>(
2532 self,
2533 n: Stream<K, L, Bounded, O2, R2>,
2534 ) -> Stream<(K, V1), L, B, O, R>
2535 where
2536 K: Eq + Hash,
2537 {
2538 check_matching_location(&self.location, &n.location);
2539
2540 Stream::new(
2541 self.location.clone(),
2542 HydroNode::AntiJoin {
2543 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2544 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2545 metadata: self
2546 .location
2547 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2548 },
2549 )
2550 }
2551}
2552
2553impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2554 Stream<(K, V), L, B, O, R>
2555{
2556 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2557 /// is used as the key and the second element is added to the entries associated with that key.
2558 ///
2559 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2560 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2561 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2562 /// total ordering _within_ each group but no ordering _across_ groups.
2563 ///
2564 /// # Example
2565 /// ```rust
2566 /// # #[cfg(feature = "deploy")] {
2567 /// # use hydro_lang::prelude::*;
2568 /// # use futures::StreamExt;
2569 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2570 /// process
2571 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2572 /// .into_keyed()
2573 /// # .entries()
2574 /// # }, |mut stream| async move {
2575 /// // { 1: [2, 3], 2: [4] }
2576 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2577 /// # assert_eq!(stream.next().await.unwrap(), w);
2578 /// # }
2579 /// # }));
2580 /// # }
2581 /// ```
2582 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2583 KeyedStream::new(
2584 self.location.clone(),
2585 HydroNode::Cast {
2586 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2587 metadata: self
2588 .location
2589 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2590 },
2591 )
2592 }
2593}
2594
2595impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2596where
2597 K: Eq + Hash,
2598 L: Location<'a>,
2599{
2600 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2601 /// # Example
2602 /// ```rust
2603 /// # #[cfg(feature = "deploy")] {
2604 /// # use hydro_lang::prelude::*;
2605 /// # use futures::StreamExt;
2606 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2607 /// let tick = process.tick();
2608 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2609 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2610 /// batch.keys().all_ticks()
2611 /// # }, |mut stream| async move {
2612 /// // 1, 2
2613 /// # assert_eq!(stream.next().await.unwrap(), 1);
2614 /// # assert_eq!(stream.next().await.unwrap(), 2);
2615 /// # }));
2616 /// # }
2617 /// ```
2618 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2619 self.into_keyed()
2620 .fold(
2621 q!(|| ()),
2622 q!(
2623 |_, _| {},
2624 commutative = manual_proof!(/** values are ignored */),
2625 idempotent = manual_proof!(/** values are ignored */)
2626 ),
2627 )
2628 .keys()
2629 }
2630}
2631
2632impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2633where
2634 L: Location<'a> + NoTick,
2635{
2636 /// Returns a stream corresponding to the latest batch of elements being atomically
2637 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2638 /// the order of the input.
2639 ///
2640 /// # Non-Determinism
2641 /// The batch boundaries are non-deterministic and may change across executions.
2642 pub fn batch_atomic(
2643 self,
2644 tick: &Tick<L>,
2645 _nondet: NonDet,
2646 ) -> Stream<T, Tick<L>, Bounded, O, R> {
2647 Stream::new(
2648 tick.clone(),
2649 HydroNode::Batch {
2650 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2651 metadata: tick
2652 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2653 },
2654 )
2655 }
2656
2657 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2658 /// See [`Stream::atomic`] for more details.
2659 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2660 Stream::new(
2661 self.location.tick.l.clone(),
2662 HydroNode::EndAtomic {
2663 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2664 metadata: self
2665 .location
2666 .tick
2667 .l
2668 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2669 },
2670 )
2671 }
2672}
2673
2674impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2675where
2676 L: Location<'a> + NoTick + NoAtomic,
2677 F: Future<Output = T>,
2678{
2679 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2680 /// Future outputs are produced as available, regardless of input arrival order.
2681 ///
2682 /// # Example
2683 /// ```rust
2684 /// # #[cfg(feature = "deploy")] {
2685 /// # use std::collections::HashSet;
2686 /// # use futures::StreamExt;
2687 /// # use hydro_lang::prelude::*;
2688 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2689 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2690 /// .map(q!(|x| async move {
2691 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2692 /// x
2693 /// }))
2694 /// .resolve_futures()
2695 /// # },
2696 /// # |mut stream| async move {
2697 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2698 /// # let mut output = HashSet::new();
2699 /// # for _ in 1..10 {
2700 /// # output.insert(stream.next().await.unwrap());
2701 /// # }
2702 /// # assert_eq!(
2703 /// # output,
2704 /// # HashSet::<i32>::from_iter(1..10)
2705 /// # );
2706 /// # },
2707 /// # ));
2708 /// # }
2709 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2710 Stream::new(
2711 self.location.clone(),
2712 HydroNode::ResolveFutures {
2713 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2714 metadata: self
2715 .location
2716 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2717 },
2718 )
2719 }
2720
2721 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2722 /// Future outputs are produced in the same order as the input stream.
2723 ///
2724 /// # Example
2725 /// ```rust
2726 /// # #[cfg(feature = "deploy")] {
2727 /// # use std::collections::HashSet;
2728 /// # use futures::StreamExt;
2729 /// # use hydro_lang::prelude::*;
2730 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2731 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2732 /// .map(q!(|x| async move {
2733 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2734 /// x
2735 /// }))
2736 /// .resolve_futures_ordered()
2737 /// # },
2738 /// # |mut stream| async move {
2739 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2740 /// # let mut output = Vec::new();
2741 /// # for _ in 1..10 {
2742 /// # output.push(stream.next().await.unwrap());
2743 /// # }
2744 /// # assert_eq!(
2745 /// # output,
2746 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2747 /// # );
2748 /// # },
2749 /// # ));
2750 /// # }
2751 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2752 Stream::new(
2753 self.location.clone(),
2754 HydroNode::ResolveFuturesOrdered {
2755 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2756 metadata: self
2757 .location
2758 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2759 },
2760 )
2761 }
2762}
2763
2764impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2765where
2766 L: Location<'a>,
2767{
2768 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2769 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2770 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2771 Stream::new(
2772 self.location.outer().clone(),
2773 HydroNode::YieldConcat {
2774 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2775 metadata: self
2776 .location
2777 .outer()
2778 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2779 },
2780 )
2781 }
2782
2783 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2784 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2785 ///
2786 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2787 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2788 /// stream's [`Tick`] context.
2789 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2790 let out_location = Atomic {
2791 tick: self.location.clone(),
2792 };
2793
2794 Stream::new(
2795 out_location.clone(),
2796 HydroNode::YieldConcat {
2797 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2798 metadata: out_location
2799 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2800 },
2801 )
2802 }
2803
2804 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2805 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2806 /// input.
2807 ///
2808 /// This API is particularly useful for stateful computation on batches of data, such as
2809 /// maintaining an accumulated state that is up to date with the current batch.
2810 ///
2811 /// # Example
2812 /// ```rust
2813 /// # #[cfg(feature = "deploy")] {
2814 /// # use hydro_lang::prelude::*;
2815 /// # use futures::StreamExt;
2816 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2817 /// let tick = process.tick();
2818 /// # // ticks are lazy by default, forces the second tick to run
2819 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2820 /// # let batch_first_tick = process
2821 /// # .source_iter(q!(vec![1, 2, 3, 4]))
2822 /// # .batch(&tick, nondet!(/** test */));
2823 /// # let batch_second_tick = process
2824 /// # .source_iter(q!(vec![5, 6, 7]))
2825 /// # .batch(&tick, nondet!(/** test */))
2826 /// # .defer_tick(); // appears on the second tick
2827 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2828 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2829 ///
2830 /// input.batch(&tick, nondet!(/** test */))
2831 /// .across_ticks(|s| s.count()).all_ticks()
2832 /// # }, |mut stream| async move {
2833 /// // [4, 7]
2834 /// assert_eq!(stream.next().await.unwrap(), 4);
2835 /// assert_eq!(stream.next().await.unwrap(), 7);
2836 /// # }));
2837 /// # }
2838 /// ```
2839 pub fn across_ticks<Out: BatchAtomic>(
2840 self,
2841 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2842 ) -> Out::Batched {
2843 thunk(self.all_ticks_atomic()).batched_atomic()
2844 }
2845
2846 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2847 /// always has the elements of `self` at tick `T - 1`.
2848 ///
2849 /// At tick `0`, the output stream is empty, since there is no previous tick.
2850 ///
2851 /// This operator enables stateful iterative processing with ticks, by sending data from one
2852 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2853 ///
2854 /// # Example
2855 /// ```rust
2856 /// # #[cfg(feature = "deploy")] {
2857 /// # use hydro_lang::prelude::*;
2858 /// # use futures::StreamExt;
2859 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2860 /// let tick = process.tick();
2861 /// // ticks are lazy by default, forces the second tick to run
2862 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2863 ///
2864 /// let batch_first_tick = process
2865 /// .source_iter(q!(vec![1, 2, 3, 4]))
2866 /// .batch(&tick, nondet!(/** test */));
2867 /// let batch_second_tick = process
2868 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2869 /// .batch(&tick, nondet!(/** test */))
2870 /// .defer_tick(); // appears on the second tick
2871 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2872 ///
2873 /// changes_across_ticks.clone().filter_not_in(
2874 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2875 /// ).all_ticks()
2876 /// # }, |mut stream| async move {
2877 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2878 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2879 /// # assert_eq!(stream.next().await.unwrap(), w);
2880 /// # }
2881 /// # }));
2882 /// # }
2883 /// ```
2884 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2885 Stream::new(
2886 self.location.clone(),
2887 HydroNode::DeferTick {
2888 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2889 metadata: self
2890 .location
2891 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2892 },
2893 )
2894 }
2895}
2896
2897#[cfg(test)]
2898mod tests {
2899 #[cfg(feature = "deploy")]
2900 use futures::{SinkExt, StreamExt};
2901 #[cfg(feature = "deploy")]
2902 use hydro_deploy::Deployment;
2903 #[cfg(feature = "deploy")]
2904 use serde::{Deserialize, Serialize};
2905 #[cfg(any(feature = "deploy", feature = "sim"))]
2906 use stageleft::q;
2907
2908 #[cfg(any(feature = "deploy", feature = "sim"))]
2909 use crate::compile::builder::FlowBuilder;
2910 #[cfg(feature = "deploy")]
2911 use crate::live_collections::sliced::sliced;
2912 #[cfg(feature = "deploy")]
2913 use crate::live_collections::stream::ExactlyOnce;
2914 #[cfg(feature = "sim")]
2915 use crate::live_collections::stream::NoOrder;
2916 #[cfg(any(feature = "deploy", feature = "sim"))]
2917 use crate::live_collections::stream::TotalOrder;
2918 #[cfg(any(feature = "deploy", feature = "sim"))]
2919 use crate::location::Location;
2920 #[cfg(any(feature = "deploy", feature = "sim"))]
2921 use crate::nondet::nondet;
2922
2923 mod backtrace_chained_ops;
2924
2925 #[cfg(feature = "deploy")]
2926 struct P1 {}
2927 #[cfg(feature = "deploy")]
2928 struct P2 {}
2929
2930 #[cfg(feature = "deploy")]
2931 #[derive(Serialize, Deserialize, Debug)]
2932 struct SendOverNetwork {
2933 n: u32,
2934 }
2935
2936 #[cfg(feature = "deploy")]
2937 #[tokio::test]
2938 async fn first_ten_distributed() {
2939 use crate::networking::TCP;
2940
2941 let mut deployment = Deployment::new();
2942
2943 let mut flow = FlowBuilder::new();
2944 let first_node = flow.process::<P1>();
2945 let second_node = flow.process::<P2>();
2946 let external = flow.external::<P2>();
2947
2948 let numbers = first_node.source_iter(q!(0..10));
2949 let out_port = numbers
2950 .map(q!(|n| SendOverNetwork { n }))
2951 .send(&second_node, TCP.fail_stop().bincode())
2952 .send_bincode_external(&external);
2953
2954 let nodes = flow
2955 .with_process(&first_node, deployment.Localhost())
2956 .with_process(&second_node, deployment.Localhost())
2957 .with_external(&external, deployment.Localhost())
2958 .deploy(&mut deployment);
2959
2960 deployment.deploy().await.unwrap();
2961
2962 let mut external_out = nodes.connect(out_port).await;
2963
2964 deployment.start().await.unwrap();
2965
2966 for i in 0..10 {
2967 assert_eq!(external_out.next().await.unwrap().n, i);
2968 }
2969 }
2970
2971 #[cfg(feature = "deploy")]
2972 #[tokio::test]
2973 async fn first_cardinality() {
2974 let mut deployment = Deployment::new();
2975
2976 let mut flow = FlowBuilder::new();
2977 let node = flow.process::<()>();
2978 let external = flow.external::<()>();
2979
2980 let node_tick = node.tick();
2981 let count = node_tick
2982 .singleton(q!([1, 2, 3]))
2983 .into_stream()
2984 .flatten_ordered()
2985 .first()
2986 .into_stream()
2987 .count()
2988 .all_ticks()
2989 .send_bincode_external(&external);
2990
2991 let nodes = flow
2992 .with_process(&node, deployment.Localhost())
2993 .with_external(&external, deployment.Localhost())
2994 .deploy(&mut deployment);
2995
2996 deployment.deploy().await.unwrap();
2997
2998 let mut external_out = nodes.connect(count).await;
2999
3000 deployment.start().await.unwrap();
3001
3002 assert_eq!(external_out.next().await.unwrap(), 1);
3003 }
3004
3005 #[cfg(feature = "deploy")]
3006 #[tokio::test]
3007 async fn unbounded_reduce_remembers_state() {
3008 let mut deployment = Deployment::new();
3009
3010 let mut flow = FlowBuilder::new();
3011 let node = flow.process::<()>();
3012 let external = flow.external::<()>();
3013
3014 let (input_port, input) = node.source_external_bincode(&external);
3015 let out = input
3016 .reduce(q!(|acc, v| *acc += v))
3017 .sample_eager(nondet!(/** test */))
3018 .send_bincode_external(&external);
3019
3020 let nodes = flow
3021 .with_process(&node, deployment.Localhost())
3022 .with_external(&external, deployment.Localhost())
3023 .deploy(&mut deployment);
3024
3025 deployment.deploy().await.unwrap();
3026
3027 let mut external_in = nodes.connect(input_port).await;
3028 let mut external_out = nodes.connect(out).await;
3029
3030 deployment.start().await.unwrap();
3031
3032 external_in.send(1).await.unwrap();
3033 assert_eq!(external_out.next().await.unwrap(), 1);
3034
3035 external_in.send(2).await.unwrap();
3036 assert_eq!(external_out.next().await.unwrap(), 3);
3037 }
3038
3039 #[cfg(feature = "deploy")]
3040 #[tokio::test]
3041 async fn top_level_bounded_cross_singleton() {
3042 let mut deployment = Deployment::new();
3043
3044 let mut flow = FlowBuilder::new();
3045 let node = flow.process::<()>();
3046 let external = flow.external::<()>();
3047
3048 let (input_port, input) =
3049 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3050
3051 let out = input
3052 .cross_singleton(
3053 node.source_iter(q!(vec![1, 2, 3]))
3054 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3055 )
3056 .send_bincode_external(&external);
3057
3058 let nodes = flow
3059 .with_process(&node, deployment.Localhost())
3060 .with_external(&external, deployment.Localhost())
3061 .deploy(&mut deployment);
3062
3063 deployment.deploy().await.unwrap();
3064
3065 let mut external_in = nodes.connect(input_port).await;
3066 let mut external_out = nodes.connect(out).await;
3067
3068 deployment.start().await.unwrap();
3069
3070 external_in.send(1).await.unwrap();
3071 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3072
3073 external_in.send(2).await.unwrap();
3074 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3075 }
3076
3077 #[cfg(feature = "deploy")]
3078 #[tokio::test]
3079 async fn top_level_bounded_reduce_cardinality() {
3080 let mut deployment = Deployment::new();
3081
3082 let mut flow = FlowBuilder::new();
3083 let node = flow.process::<()>();
3084 let external = flow.external::<()>();
3085
3086 let (input_port, input) =
3087 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3088
3089 let out = sliced! {
3090 let input = use(input, nondet!(/** test */));
3091 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3092 input.cross_singleton(v.into_stream().count())
3093 }
3094 .send_bincode_external(&external);
3095
3096 let nodes = flow
3097 .with_process(&node, deployment.Localhost())
3098 .with_external(&external, deployment.Localhost())
3099 .deploy(&mut deployment);
3100
3101 deployment.deploy().await.unwrap();
3102
3103 let mut external_in = nodes.connect(input_port).await;
3104 let mut external_out = nodes.connect(out).await;
3105
3106 deployment.start().await.unwrap();
3107
3108 external_in.send(1).await.unwrap();
3109 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3110
3111 external_in.send(2).await.unwrap();
3112 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3113 }
3114
3115 #[cfg(feature = "deploy")]
3116 #[tokio::test]
3117 async fn top_level_bounded_into_singleton_cardinality() {
3118 let mut deployment = Deployment::new();
3119
3120 let mut flow = FlowBuilder::new();
3121 let node = flow.process::<()>();
3122 let external = flow.external::<()>();
3123
3124 let (input_port, input) =
3125 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3126
3127 let out = sliced! {
3128 let input = use(input, nondet!(/** test */));
3129 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3130 input.cross_singleton(v.into_stream().count())
3131 }
3132 .send_bincode_external(&external);
3133
3134 let nodes = flow
3135 .with_process(&node, deployment.Localhost())
3136 .with_external(&external, deployment.Localhost())
3137 .deploy(&mut deployment);
3138
3139 deployment.deploy().await.unwrap();
3140
3141 let mut external_in = nodes.connect(input_port).await;
3142 let mut external_out = nodes.connect(out).await;
3143
3144 deployment.start().await.unwrap();
3145
3146 external_in.send(1).await.unwrap();
3147 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3148
3149 external_in.send(2).await.unwrap();
3150 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3151 }
3152
3153 #[cfg(feature = "deploy")]
3154 #[tokio::test]
3155 async fn atomic_fold_replays_each_tick() {
3156 let mut deployment = Deployment::new();
3157
3158 let mut flow = FlowBuilder::new();
3159 let node = flow.process::<()>();
3160 let external = flow.external::<()>();
3161
3162 let (input_port, input) =
3163 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3164 let tick = node.tick();
3165
3166 let out = input
3167 .batch(&tick, nondet!(/** test */))
3168 .cross_singleton(
3169 node.source_iter(q!(vec![1, 2, 3]))
3170 .atomic()
3171 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3172 .snapshot_atomic(&tick, nondet!(/** test */)),
3173 )
3174 .all_ticks()
3175 .send_bincode_external(&external);
3176
3177 let nodes = flow
3178 .with_process(&node, deployment.Localhost())
3179 .with_external(&external, deployment.Localhost())
3180 .deploy(&mut deployment);
3181
3182 deployment.deploy().await.unwrap();
3183
3184 let mut external_in = nodes.connect(input_port).await;
3185 let mut external_out = nodes.connect(out).await;
3186
3187 deployment.start().await.unwrap();
3188
3189 external_in.send(1).await.unwrap();
3190 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3191
3192 external_in.send(2).await.unwrap();
3193 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3194 }
3195
3196 #[cfg(feature = "deploy")]
3197 #[tokio::test]
3198 async fn unbounded_scan_remembers_state() {
3199 let mut deployment = Deployment::new();
3200
3201 let mut flow = FlowBuilder::new();
3202 let node = flow.process::<()>();
3203 let external = flow.external::<()>();
3204
3205 let (input_port, input) = node.source_external_bincode(&external);
3206 let out = input
3207 .scan(
3208 q!(|| 0),
3209 q!(|acc, v| {
3210 *acc += v;
3211 Some(*acc)
3212 }),
3213 )
3214 .send_bincode_external(&external);
3215
3216 let nodes = flow
3217 .with_process(&node, deployment.Localhost())
3218 .with_external(&external, deployment.Localhost())
3219 .deploy(&mut deployment);
3220
3221 deployment.deploy().await.unwrap();
3222
3223 let mut external_in = nodes.connect(input_port).await;
3224 let mut external_out = nodes.connect(out).await;
3225
3226 deployment.start().await.unwrap();
3227
3228 external_in.send(1).await.unwrap();
3229 assert_eq!(external_out.next().await.unwrap(), 1);
3230
3231 external_in.send(2).await.unwrap();
3232 assert_eq!(external_out.next().await.unwrap(), 3);
3233 }
3234
3235 #[cfg(feature = "deploy")]
3236 #[tokio::test]
3237 async fn unbounded_enumerate_remembers_state() {
3238 let mut deployment = Deployment::new();
3239
3240 let mut flow = FlowBuilder::new();
3241 let node = flow.process::<()>();
3242 let external = flow.external::<()>();
3243
3244 let (input_port, input) = node.source_external_bincode(&external);
3245 let out = input.enumerate().send_bincode_external(&external);
3246
3247 let nodes = flow
3248 .with_process(&node, deployment.Localhost())
3249 .with_external(&external, deployment.Localhost())
3250 .deploy(&mut deployment);
3251
3252 deployment.deploy().await.unwrap();
3253
3254 let mut external_in = nodes.connect(input_port).await;
3255 let mut external_out = nodes.connect(out).await;
3256
3257 deployment.start().await.unwrap();
3258
3259 external_in.send(1).await.unwrap();
3260 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3261
3262 external_in.send(2).await.unwrap();
3263 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3264 }
3265
3266 #[cfg(feature = "deploy")]
3267 #[tokio::test]
3268 async fn unbounded_unique_remembers_state() {
3269 let mut deployment = Deployment::new();
3270
3271 let mut flow = FlowBuilder::new();
3272 let node = flow.process::<()>();
3273 let external = flow.external::<()>();
3274
3275 let (input_port, input) =
3276 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3277 let out = input.unique().send_bincode_external(&external);
3278
3279 let nodes = flow
3280 .with_process(&node, deployment.Localhost())
3281 .with_external(&external, deployment.Localhost())
3282 .deploy(&mut deployment);
3283
3284 deployment.deploy().await.unwrap();
3285
3286 let mut external_in = nodes.connect(input_port).await;
3287 let mut external_out = nodes.connect(out).await;
3288
3289 deployment.start().await.unwrap();
3290
3291 external_in.send(1).await.unwrap();
3292 assert_eq!(external_out.next().await.unwrap(), 1);
3293
3294 external_in.send(2).await.unwrap();
3295 assert_eq!(external_out.next().await.unwrap(), 2);
3296
3297 external_in.send(1).await.unwrap();
3298 external_in.send(3).await.unwrap();
3299 assert_eq!(external_out.next().await.unwrap(), 3);
3300 }
3301
3302 #[cfg(feature = "sim")]
3303 #[test]
3304 #[should_panic]
3305 fn sim_batch_nondet_size() {
3306 let mut flow = FlowBuilder::new();
3307 let node = flow.process::<()>();
3308
3309 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3310
3311 let tick = node.tick();
3312 let out_recv = input
3313 .batch(&tick, nondet!(/** test */))
3314 .count()
3315 .all_ticks()
3316 .sim_output();
3317
3318 flow.sim().exhaustive(async || {
3319 in_send.send(());
3320 in_send.send(());
3321 in_send.send(());
3322
3323 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3324 });
3325 }
3326
3327 #[cfg(feature = "sim")]
3328 #[test]
3329 fn sim_batch_preserves_order() {
3330 let mut flow = FlowBuilder::new();
3331 let node = flow.process::<()>();
3332
3333 let (in_send, input) = node.sim_input();
3334
3335 let tick = node.tick();
3336 let out_recv = input
3337 .batch(&tick, nondet!(/** test */))
3338 .all_ticks()
3339 .sim_output();
3340
3341 flow.sim().exhaustive(async || {
3342 in_send.send(1);
3343 in_send.send(2);
3344 in_send.send(3);
3345
3346 out_recv.assert_yields_only([1, 2, 3]).await;
3347 });
3348 }
3349
3350 #[cfg(feature = "sim")]
3351 #[test]
3352 #[should_panic]
3353 fn sim_batch_unordered_shuffles() {
3354 let mut flow = FlowBuilder::new();
3355 let node = flow.process::<()>();
3356
3357 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3358
3359 let tick = node.tick();
3360 let batch = input.batch(&tick, nondet!(/** test */));
3361 let out_recv = batch
3362 .clone()
3363 .min()
3364 .zip(batch.max())
3365 .all_ticks()
3366 .sim_output();
3367
3368 flow.sim().exhaustive(async || {
3369 in_send.send_many_unordered([1, 2, 3]);
3370
3371 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3372 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3373 }
3374 });
3375 }
3376
3377 #[cfg(feature = "sim")]
3378 #[test]
3379 fn sim_batch_unordered_shuffles_count() {
3380 let mut flow = FlowBuilder::new();
3381 let node = flow.process::<()>();
3382
3383 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3384
3385 let tick = node.tick();
3386 let batch = input.batch(&tick, nondet!(/** test */));
3387 let out_recv = batch.all_ticks().sim_output();
3388
3389 let instance_count = flow.sim().exhaustive(async || {
3390 in_send.send_many_unordered([1, 2, 3, 4]);
3391 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3392 });
3393
3394 assert_eq!(
3395 instance_count,
3396 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3397 )
3398 }
3399
3400 #[cfg(feature = "sim")]
3401 #[test]
3402 #[should_panic]
3403 fn sim_observe_order_batched() {
3404 let mut flow = FlowBuilder::new();
3405 let node = flow.process::<()>();
3406
3407 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3408
3409 let tick = node.tick();
3410 let batch = input.batch(&tick, nondet!(/** test */));
3411 let out_recv = batch
3412 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3413 .all_ticks()
3414 .sim_output();
3415
3416 flow.sim().exhaustive(async || {
3417 in_send.send_many_unordered([1, 2, 3, 4]);
3418 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3419 });
3420 }
3421
3422 #[cfg(feature = "sim")]
3423 #[test]
3424 fn sim_observe_order_batched_count() {
3425 let mut flow = FlowBuilder::new();
3426 let node = flow.process::<()>();
3427
3428 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3429
3430 let tick = node.tick();
3431 let batch = input.batch(&tick, nondet!(/** test */));
3432 let out_recv = batch
3433 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3434 .all_ticks()
3435 .sim_output();
3436
3437 let instance_count = flow.sim().exhaustive(async || {
3438 in_send.send_many_unordered([1, 2, 3, 4]);
3439 let _ = out_recv.collect::<Vec<_>>().await;
3440 });
3441
3442 assert_eq!(
3443 instance_count,
3444 192 // 4! * 2^{4 - 1}
3445 )
3446 }
3447
3448 #[cfg(feature = "sim")]
3449 #[test]
3450 fn sim_unordered_count_instance_count() {
3451 let mut flow = FlowBuilder::new();
3452 let node = flow.process::<()>();
3453
3454 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3455
3456 let tick = node.tick();
3457 let out_recv = input
3458 .count()
3459 .snapshot(&tick, nondet!(/** test */))
3460 .all_ticks()
3461 .sim_output();
3462
3463 let instance_count = flow.sim().exhaustive(async || {
3464 in_send.send_many_unordered([1, 2, 3, 4]);
3465 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3466 });
3467
3468 assert_eq!(
3469 instance_count,
3470 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3471 )
3472 }
3473
3474 #[cfg(feature = "sim")]
3475 #[test]
3476 fn sim_top_level_assume_ordering() {
3477 let mut flow = FlowBuilder::new();
3478 let node = flow.process::<()>();
3479
3480 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3481
3482 let out_recv = input
3483 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3484 .sim_output();
3485
3486 let instance_count = flow.sim().exhaustive(async || {
3487 in_send.send_many_unordered([1, 2, 3]);
3488 let mut out = out_recv.collect::<Vec<_>>().await;
3489 out.sort();
3490 assert_eq!(out, vec![1, 2, 3]);
3491 });
3492
3493 assert_eq!(instance_count, 6)
3494 }
3495
3496 #[cfg(feature = "sim")]
3497 #[test]
3498 fn sim_top_level_assume_ordering_cycle_back() {
3499 let mut flow = FlowBuilder::new();
3500 let node = flow.process::<()>();
3501
3502 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3503
3504 let (complete_cycle_back, cycle_back) =
3505 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3506 let ordered = input
3507 .merge_unordered(cycle_back)
3508 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3509 complete_cycle_back.complete(
3510 ordered
3511 .clone()
3512 .map(q!(|v| v + 1))
3513 .filter(q!(|v| v % 2 == 1)),
3514 );
3515
3516 let out_recv = ordered.sim_output();
3517
3518 let mut saw = false;
3519 let instance_count = flow.sim().exhaustive(async || {
3520 in_send.send_many_unordered([0, 2]);
3521 let out = out_recv.collect::<Vec<_>>().await;
3522
3523 if out.starts_with(&[0, 1, 2]) {
3524 saw = true;
3525 }
3526 });
3527
3528 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3529 assert_eq!(instance_count, 6)
3530 }
3531
3532 #[cfg(feature = "sim")]
3533 #[test]
3534 fn sim_top_level_assume_ordering_cycle_back_tick() {
3535 let mut flow = FlowBuilder::new();
3536 let node = flow.process::<()>();
3537
3538 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3539
3540 let (complete_cycle_back, cycle_back) =
3541 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3542 let ordered = input
3543 .merge_unordered(cycle_back)
3544 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3545 complete_cycle_back.complete(
3546 ordered
3547 .clone()
3548 .batch(&node.tick(), nondet!(/** test */))
3549 .all_ticks()
3550 .map(q!(|v| v + 1))
3551 .filter(q!(|v| v % 2 == 1)),
3552 );
3553
3554 let out_recv = ordered.sim_output();
3555
3556 let mut saw = false;
3557 let instance_count = flow.sim().exhaustive(async || {
3558 in_send.send_many_unordered([0, 2]);
3559 let out = out_recv.collect::<Vec<_>>().await;
3560
3561 if out.starts_with(&[0, 1, 2]) {
3562 saw = true;
3563 }
3564 });
3565
3566 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3567 assert_eq!(instance_count, 58)
3568 }
3569
3570 #[cfg(feature = "sim")]
3571 #[test]
3572 fn sim_top_level_assume_ordering_multiple() {
3573 let mut flow = FlowBuilder::new();
3574 let node = flow.process::<()>();
3575
3576 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3577 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3578
3579 let (complete_cycle_back, cycle_back) =
3580 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3581 let input1_ordered = input
3582 .clone()
3583 .merge_unordered(cycle_back)
3584 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3585 let foo = input1_ordered
3586 .clone()
3587 .map(q!(|v| v + 3))
3588 .weaken_ordering::<NoOrder>()
3589 .merge_unordered(input2)
3590 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3591
3592 complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3593
3594 let out_recv = input1_ordered.sim_output();
3595
3596 let mut saw = false;
3597 let instance_count = flow.sim().exhaustive(async || {
3598 in_send.send_many_unordered([0, 1]);
3599 let out = out_recv.collect::<Vec<_>>().await;
3600
3601 if out.starts_with(&[0, 3, 1]) {
3602 saw = true;
3603 }
3604 });
3605
3606 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3607 assert_eq!(instance_count, 24)
3608 }
3609
3610 #[cfg(feature = "sim")]
3611 #[test]
3612 fn sim_atomic_assume_ordering_cycle_back() {
3613 let mut flow = FlowBuilder::new();
3614 let node = flow.process::<()>();
3615
3616 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3617
3618 let (complete_cycle_back, cycle_back) =
3619 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3620 let ordered = input
3621 .merge_unordered(cycle_back)
3622 .atomic()
3623 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3624 .end_atomic();
3625 complete_cycle_back.complete(
3626 ordered
3627 .clone()
3628 .map(q!(|v| v + 1))
3629 .filter(q!(|v| v % 2 == 1)),
3630 );
3631
3632 let out_recv = ordered.sim_output();
3633
3634 let instance_count = flow.sim().exhaustive(async || {
3635 in_send.send_many_unordered([0, 2]);
3636 let out = out_recv.collect::<Vec<_>>().await;
3637 assert_eq!(out.len(), 4);
3638 });
3639
3640 assert_eq!(instance_count, 22)
3641 }
3642
3643 #[cfg(feature = "deploy")]
3644 #[tokio::test]
3645 async fn partition_evens_odds() {
3646 let mut deployment = Deployment::new();
3647
3648 let mut flow = FlowBuilder::new();
3649 let node = flow.process::<()>();
3650 let external = flow.external::<()>();
3651
3652 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3653 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3654 let evens_port = evens.send_bincode_external(&external);
3655 let odds_port = odds.send_bincode_external(&external);
3656
3657 let nodes = flow
3658 .with_process(&node, deployment.Localhost())
3659 .with_external(&external, deployment.Localhost())
3660 .deploy(&mut deployment);
3661
3662 deployment.deploy().await.unwrap();
3663
3664 let mut evens_out = nodes.connect(evens_port).await;
3665 let mut odds_out = nodes.connect(odds_port).await;
3666
3667 deployment.start().await.unwrap();
3668
3669 let mut even_results = Vec::new();
3670 for _ in 0..3 {
3671 even_results.push(evens_out.next().await.unwrap());
3672 }
3673 even_results.sort();
3674 assert_eq!(even_results, vec![2, 4, 6]);
3675
3676 let mut odd_results = Vec::new();
3677 for _ in 0..3 {
3678 odd_results.push(odds_out.next().await.unwrap());
3679 }
3680 odd_results.sort();
3681 assert_eq!(odd_results, vec![1, 3, 5]);
3682 }
3683
3684 #[cfg(feature = "deploy")]
3685 #[tokio::test]
3686 async fn unconsumed_inspect_still_runs() {
3687 use crate::deploy::DeployCrateWrapper;
3688
3689 let mut deployment = Deployment::new();
3690
3691 let mut flow = FlowBuilder::new();
3692 let node = flow.process::<()>();
3693
3694 // The return value of .inspect() is intentionally dropped.
3695 // Before the Null-root fix, this would silently do nothing.
3696 node.source_iter(q!(0..5))
3697 .inspect(q!(|x| println!("inspect: {}", x)));
3698
3699 let nodes = flow
3700 .with_process(&node, deployment.Localhost())
3701 .deploy(&mut deployment);
3702
3703 deployment.deploy().await.unwrap();
3704
3705 let mut stdout = nodes.get_process(&node).stdout();
3706
3707 deployment.start().await.unwrap();
3708
3709 let mut lines = Vec::new();
3710 for _ in 0..5 {
3711 lines.push(stdout.recv().await.unwrap());
3712 }
3713 lines.sort();
3714 assert_eq!(
3715 lines,
3716 vec![
3717 "inspect: 0",
3718 "inspect: 1",
3719 "inspect: 2",
3720 "inspect: 3",
3721 "inspect: 4",
3722 ]
3723 );
3724 }
3725}