Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26#[cfg(stageleft_runtime)]
27use crate::location::dynamic::{DynLocation, LocationId};
28use crate::location::tick::{Atomic, DeferTick, NoAtomic};
29use crate::location::{Location, NoTick, Tick, check_matching_location};
30use crate::manual_expr::ManualExpr;
31use crate::nondet::{NonDet, nondet};
32use crate::prelude::manual_proof;
33use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
34
35pub mod networking;
36
37/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
38#[sealed::sealed]
39pub trait Ordering:
40    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
41{
42    /// The [`StreamOrder`] corresponding to this type.
43    const ORDERING_KIND: StreamOrder;
44}
45
46/// Marks the stream as being totally ordered, which means that there are
47/// no sources of non-determinism (other than intentional ones) that will
48/// affect the order of elements.
49pub enum TotalOrder {}
50
51#[sealed::sealed]
52impl Ordering for TotalOrder {
53    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
54}
55
56/// Marks the stream as having no order, which means that the order of
57/// elements may be affected by non-determinism.
58///
59/// This restricts certain operators, such as `fold` and `reduce`, to only
60/// be used with commutative aggregation functions.
61pub enum NoOrder {}
62
63#[sealed::sealed]
64impl Ordering for NoOrder {
65    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
66}
67
68/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
69/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
70/// have `Self` guarantees instead.
71#[sealed::sealed]
72pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
73#[sealed::sealed]
74impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
75
76/// Helper trait for determining the weakest of two orderings.
77#[sealed::sealed]
78pub trait MinOrder<Other: ?Sized> {
79    /// The weaker of the two orderings.
80    type Min: Ordering;
81}
82
83#[sealed::sealed]
84impl<O: Ordering> MinOrder<O> for TotalOrder {
85    type Min = O;
86}
87
88#[sealed::sealed]
89impl<O: Ordering> MinOrder<O> for NoOrder {
90    type Min = NoOrder;
91}
92
93/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
94#[sealed::sealed]
95pub trait Retries:
96    MinRetries<Self, Min = Self>
97    + MinRetries<ExactlyOnce, Min = Self>
98    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
99{
100    /// The [`StreamRetry`] corresponding to this type.
101    const RETRIES_KIND: StreamRetry;
102}
103
104/// Marks the stream as having deterministic message cardinality, with no
105/// possibility of duplicates.
106pub enum ExactlyOnce {}
107
108#[sealed::sealed]
109impl Retries for ExactlyOnce {
110    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
111}
112
113/// Marks the stream as having non-deterministic message cardinality, which
114/// means that duplicates may occur, but messages will not be dropped.
115pub enum AtLeastOnce {}
116
117#[sealed::sealed]
118impl Retries for AtLeastOnce {
119    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
120}
121
122/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
123/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
124/// have `Self` guarantees instead.
125#[sealed::sealed]
126pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
127#[sealed::sealed]
128impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
129
130/// Helper trait for determining the weakest of two retry guarantees.
131#[sealed::sealed]
132pub trait MinRetries<Other: ?Sized> {
133    /// The weaker of the two retry guarantees.
134    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
135}
136
137#[sealed::sealed]
138impl<R: Retries> MinRetries<R> for ExactlyOnce {
139    type Min = R;
140}
141
142#[sealed::sealed]
143impl<R: Retries> MinRetries<R> for AtLeastOnce {
144    type Min = AtLeastOnce;
145}
146
147#[sealed::sealed]
148#[diagnostic::on_unimplemented(
149    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
150    label = "required here",
151    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
152)]
153/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
154pub trait IsOrdered: Ordering {}
155
156#[sealed::sealed]
157#[diagnostic::do_not_recommend]
158impl IsOrdered for TotalOrder {}
159
160#[sealed::sealed]
161#[diagnostic::on_unimplemented(
162    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
163    label = "required here",
164    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
165)]
166/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
167pub trait IsExactlyOnce: Retries {}
168
169#[sealed::sealed]
170#[diagnostic::do_not_recommend]
171impl IsExactlyOnce for ExactlyOnce {}
172
173/// Streaming sequence of elements with type `Type`.
174///
175/// This live collection represents a growing sequence of elements, with new elements being
176/// asynchronously appended to the end of the sequence. This can be used to model the arrival
177/// of network input, such as API requests, or streaming ingestion.
178///
179/// By default, all streams have deterministic ordering and each element is materialized exactly
180/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
181/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
182/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
183///
184/// Type Parameters:
185/// - `Type`: the type of elements in the stream
186/// - `Loc`: the location where the stream is being materialized
187/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
188/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
189///   (default is [`TotalOrder`])
190/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
191///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
192pub struct Stream<
193    Type,
194    Loc,
195    Bound: Boundedness = Unbounded,
196    Order: Ordering = TotalOrder,
197    Retry: Retries = ExactlyOnce,
198> {
199    pub(crate) location: Loc,
200    pub(crate) ir_node: RefCell<HydroNode>,
201    pub(crate) flow_state: FlowState,
202
203    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
204}
205
206impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
207    fn drop(&mut self) {
208        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
209        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
210            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
211                input: Box::new(ir_node),
212                op_metadata: HydroIrOpMetadata::new(),
213            });
214        }
215    }
216}
217
218impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
219    for Stream<T, L, Unbounded, O, R>
220where
221    L: Location<'a>,
222{
223    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
224        let new_meta = stream
225            .location
226            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
227
228        Stream {
229            location: stream.location.clone(),
230            flow_state: stream.flow_state.clone(),
231            ir_node: RefCell::new(HydroNode::Cast {
232                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
233                metadata: new_meta,
234            }),
235            _phantom: PhantomData,
236        }
237    }
238}
239
240impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
241    for Stream<T, L, B, NoOrder, R>
242where
243    L: Location<'a>,
244{
245    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
246        stream.weaken_ordering()
247    }
248}
249
250impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
251    for Stream<T, L, B, O, AtLeastOnce>
252where
253    L: Location<'a>,
254{
255    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
256        stream.weaken_retries()
257    }
258}
259
260impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
261where
262    L: Location<'a>,
263{
264    fn defer_tick(self) -> Self {
265        Stream::defer_tick(self)
266    }
267}
268
269impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
270    for Stream<T, Tick<L>, Bounded, O, R>
271where
272    L: Location<'a>,
273{
274    type Location = Tick<L>;
275
276    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
277        Stream::new(
278            location.clone(),
279            HydroNode::CycleSource {
280                cycle_id,
281                metadata: location.new_node_metadata(Self::collection_kind()),
282            },
283        )
284    }
285}
286
287impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
288    for Stream<T, Tick<L>, Bounded, O, R>
289where
290    L: Location<'a>,
291{
292    type Location = Tick<L>;
293
294    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
295        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
296            location.clone(),
297            HydroNode::DeferTick {
298                input: Box::new(HydroNode::CycleSource {
299                    cycle_id,
300                    metadata: location.new_node_metadata(Self::collection_kind()),
301                }),
302                metadata: location.new_node_metadata(Self::collection_kind()),
303            },
304        );
305
306        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
307    }
308}
309
310impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
311    for Stream<T, Tick<L>, Bounded, O, R>
312where
313    L: Location<'a>,
314{
315    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
316        assert_eq!(
317            Location::id(&self.location),
318            expected_location,
319            "locations do not match"
320        );
321        self.location
322            .flow_state()
323            .borrow_mut()
324            .push_root(HydroRoot::CycleSink {
325                cycle_id,
326                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
327                op_metadata: HydroIrOpMetadata::new(),
328            });
329    }
330}
331
332impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
333    for Stream<T, L, B, O, R>
334where
335    L: Location<'a> + NoTick,
336{
337    type Location = L;
338
339    fn create_source(cycle_id: CycleId, location: L) -> Self {
340        Stream::new(
341            location.clone(),
342            HydroNode::CycleSource {
343                cycle_id,
344                metadata: location.new_node_metadata(Self::collection_kind()),
345            },
346        )
347    }
348}
349
350impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
351    for Stream<T, L, B, O, R>
352where
353    L: Location<'a> + NoTick,
354{
355    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
356        assert_eq!(
357            Location::id(&self.location),
358            expected_location,
359            "locations do not match"
360        );
361        self.location
362            .flow_state()
363            .borrow_mut()
364            .push_root(HydroRoot::CycleSink {
365                cycle_id,
366                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
367                op_metadata: HydroIrOpMetadata::new(),
368            });
369    }
370}
371
372impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
373where
374    T: Clone,
375    L: Location<'a>,
376{
377    fn clone(&self) -> Self {
378        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
379            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
380            *self.ir_node.borrow_mut() = HydroNode::Tee {
381                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
382                metadata: self.location.new_node_metadata(Self::collection_kind()),
383            };
384        }
385
386        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
387            Stream {
388                location: self.location.clone(),
389                flow_state: self.flow_state.clone(),
390                ir_node: HydroNode::Tee {
391                    inner: SharedNode(inner.0.clone()),
392                    metadata: metadata.clone(),
393                }
394                .into(),
395                _phantom: PhantomData,
396            }
397        } else {
398            unreachable!()
399        }
400    }
401}
402
403impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
404where
405    L: Location<'a>,
406{
407    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
408        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
409        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
410
411        let flow_state = location.flow_state().clone();
412        Stream {
413            location,
414            flow_state,
415            ir_node: RefCell::new(ir_node),
416            _phantom: PhantomData,
417        }
418    }
419
420    /// Returns the [`Location`] where this stream is being materialized.
421    pub fn location(&self) -> &L {
422        &self.location
423    }
424
425    pub(crate) fn collection_kind() -> CollectionKind {
426        CollectionKind::Stream {
427            bound: B::BOUND_KIND,
428            order: O::ORDERING_KIND,
429            retry: R::RETRIES_KIND,
430            element_type: quote_type::<T>().into(),
431        }
432    }
433
434    /// Produces a stream based on invoking `f` on each element.
435    /// If you do not want to modify the stream and instead only want to view
436    /// each item use [`Stream::inspect`] instead.
437    ///
438    /// # Example
439    /// ```rust
440    /// # #[cfg(feature = "deploy")] {
441    /// # use hydro_lang::prelude::*;
442    /// # use futures::StreamExt;
443    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
444    /// let words = process.source_iter(q!(vec!["hello", "world"]));
445    /// words.map(q!(|x| x.to_uppercase()))
446    /// # }, |mut stream| async move {
447    /// # for w in vec!["HELLO", "WORLD"] {
448    /// #     assert_eq!(stream.next().await.unwrap(), w);
449    /// # }
450    /// # }));
451    /// # }
452    /// ```
453    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
454    where
455        F: Fn(T) -> U + 'a,
456    {
457        let f = f.splice_fn1_ctx(&self.location).into();
458        Stream::new(
459            self.location.clone(),
460            HydroNode::Map {
461                f,
462                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
463                metadata: self
464                    .location
465                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
466            },
467        )
468    }
469
470    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
471    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
472    /// for the output type `U` must produce items in a **deterministic** order.
473    ///
474    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
475    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
476    ///
477    /// # Example
478    /// ```rust
479    /// # #[cfg(feature = "deploy")] {
480    /// # use hydro_lang::prelude::*;
481    /// # use futures::StreamExt;
482    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
483    /// process
484    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
485    ///     .flat_map_ordered(q!(|x| x))
486    /// # }, |mut stream| async move {
487    /// // 1, 2, 3, 4
488    /// # for w in (1..5) {
489    /// #     assert_eq!(stream.next().await.unwrap(), w);
490    /// # }
491    /// # }));
492    /// # }
493    /// ```
494    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
495    where
496        I: IntoIterator<Item = U>,
497        F: Fn(T) -> I + 'a,
498    {
499        let f = f.splice_fn1_ctx(&self.location).into();
500        Stream::new(
501            self.location.clone(),
502            HydroNode::FlatMap {
503                f,
504                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
505                metadata: self
506                    .location
507                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
508            },
509        )
510    }
511
512    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
513    /// for the output type `U` to produce items in any order.
514    ///
515    /// # Example
516    /// ```rust
517    /// # #[cfg(feature = "deploy")] {
518    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
519    /// # use futures::StreamExt;
520    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
521    /// process
522    ///     .source_iter(q!(vec![
523    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
524    ///         std::collections::HashSet::from_iter(vec![3, 4]),
525    ///     ]))
526    ///     .flat_map_unordered(q!(|x| x))
527    /// # }, |mut stream| async move {
528    /// // 1, 2, 3, 4, but in no particular order
529    /// # let mut results = Vec::new();
530    /// # for w in (1..5) {
531    /// #     results.push(stream.next().await.unwrap());
532    /// # }
533    /// # results.sort();
534    /// # assert_eq!(results, vec![1, 2, 3, 4]);
535    /// # }));
536    /// # }
537    /// ```
538    pub fn flat_map_unordered<U, I, F>(
539        self,
540        f: impl IntoQuotedMut<'a, F, L>,
541    ) -> Stream<U, L, B, NoOrder, R>
542    where
543        I: IntoIterator<Item = U>,
544        F: Fn(T) -> I + 'a,
545    {
546        let f = f.splice_fn1_ctx(&self.location).into();
547        Stream::new(
548            self.location.clone(),
549            HydroNode::FlatMap {
550                f,
551                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
552                metadata: self
553                    .location
554                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
555            },
556        )
557    }
558
559    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
560    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
561    ///
562    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
563    /// not deterministic, use [`Stream::flatten_unordered`] instead.
564    ///
565    /// ```rust
566    /// # #[cfg(feature = "deploy")] {
567    /// # use hydro_lang::prelude::*;
568    /// # use futures::StreamExt;
569    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
570    /// process
571    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
572    ///     .flatten_ordered()
573    /// # }, |mut stream| async move {
574    /// // 1, 2, 3, 4
575    /// # for w in (1..5) {
576    /// #     assert_eq!(stream.next().await.unwrap(), w);
577    /// # }
578    /// # }));
579    /// # }
580    /// ```
581    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
582    where
583        T: IntoIterator<Item = U>,
584    {
585        self.flat_map_ordered(q!(|d| d))
586    }
587
588    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
589    /// for the element type `T` to produce items in any order.
590    ///
591    /// # Example
592    /// ```rust
593    /// # #[cfg(feature = "deploy")] {
594    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
595    /// # use futures::StreamExt;
596    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
597    /// process
598    ///     .source_iter(q!(vec![
599    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
600    ///         std::collections::HashSet::from_iter(vec![3, 4]),
601    ///     ]))
602    ///     .flatten_unordered()
603    /// # }, |mut stream| async move {
604    /// // 1, 2, 3, 4, but in no particular order
605    /// # let mut results = Vec::new();
606    /// # for w in (1..5) {
607    /// #     results.push(stream.next().await.unwrap());
608    /// # }
609    /// # results.sort();
610    /// # assert_eq!(results, vec![1, 2, 3, 4]);
611    /// # }));
612    /// # }
613    /// ```
614    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
615    where
616        T: IntoIterator<Item = U>,
617    {
618        self.flat_map_unordered(q!(|d| d))
619    }
620
621    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
622    /// then emit the elements of that stream one by one. When the inner stream yields
623    /// `Pending`, this operator yields as well.
624    pub fn flat_map_stream_blocking<U, S, F>(
625        self,
626        f: impl IntoQuotedMut<'a, F, L>,
627    ) -> Stream<U, L, B, O, R>
628    where
629        S: futures::Stream<Item = U>,
630        F: Fn(T) -> S + 'a,
631    {
632        let f = f.splice_fn1_ctx(&self.location).into();
633        Stream::new(
634            self.location.clone(),
635            HydroNode::FlatMapStreamBlocking {
636                f,
637                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
638                metadata: self
639                    .location
640                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
641            },
642        )
643    }
644
645    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
646    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
647    /// yields as well.
648    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
649    where
650        T: futures::Stream<Item = U>,
651    {
652        self.flat_map_stream_blocking(q!(|d| d))
653    }
654
655    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
656    /// `f`, preserving the order of the elements.
657    ///
658    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
659    /// not modify or take ownership of the values. If you need to modify the values while filtering
660    /// use [`Stream::filter_map`] instead.
661    ///
662    /// # Example
663    /// ```rust
664    /// # #[cfg(feature = "deploy")] {
665    /// # use hydro_lang::prelude::*;
666    /// # use futures::StreamExt;
667    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
668    /// process
669    ///     .source_iter(q!(vec![1, 2, 3, 4]))
670    ///     .filter(q!(|&x| x > 2))
671    /// # }, |mut stream| async move {
672    /// // 3, 4
673    /// # for w in (3..5) {
674    /// #     assert_eq!(stream.next().await.unwrap(), w);
675    /// # }
676    /// # }));
677    /// # }
678    /// ```
679    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
680    where
681        F: Fn(&T) -> bool + 'a,
682    {
683        let f = f.splice_fn1_borrow_ctx(&self.location).into();
684        Stream::new(
685            self.location.clone(),
686            HydroNode::Filter {
687                f,
688                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
689                metadata: self.location.new_node_metadata(Self::collection_kind()),
690            },
691        )
692    }
693
694    /// Splits the stream into two streams based on a predicate, without cloning elements.
695    ///
696    /// Elements for which `f` returns `true` are sent to the first output stream,
697    /// and elements for which `f` returns `false` are sent to the second output stream.
698    ///
699    /// Unlike using `filter` twice, this only evaluates the predicate once per element
700    /// and does not require `T: Clone`.
701    ///
702    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
703    /// the predicate is only used for routing; the element itself is moved to the
704    /// appropriate output stream.
705    ///
706    /// # Example
707    /// ```rust
708    /// # #[cfg(feature = "deploy")] {
709    /// # use hydro_lang::prelude::*;
710    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
711    /// # use futures::StreamExt;
712    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
713    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
714    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
715    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
716    /// evens.map(q!(|x| (x, true)))
717    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
718    /// # }, |mut stream| async move {
719    /// # let mut results = Vec::new();
720    /// # for _ in 0..6 {
721    /// #     results.push(stream.next().await.unwrap());
722    /// # }
723    /// # results.sort();
724    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
725    /// # }));
726    /// # }
727    /// ```
728    #[expect(
729        clippy::type_complexity,
730        reason = "return type mirrors the input stream type"
731    )]
732    pub fn partition<F>(
733        self,
734        f: impl IntoQuotedMut<'a, F, L>,
735    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
736    where
737        F: Fn(&T) -> bool + 'a,
738    {
739        let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
740        let shared = SharedNode(Rc::new(RefCell::new(
741            self.ir_node.replace(HydroNode::Placeholder),
742        )));
743
744        let true_stream = Stream::new(
745            self.location.clone(),
746            HydroNode::Partition {
747                inner: SharedNode(shared.0.clone()),
748                f: f.clone(),
749                is_true: true,
750                metadata: self.location.new_node_metadata(Self::collection_kind()),
751            },
752        );
753
754        let false_stream = Stream::new(
755            self.location.clone(),
756            HydroNode::Partition {
757                inner: SharedNode(shared.0),
758                f,
759                is_true: false,
760                metadata: self.location.new_node_metadata(Self::collection_kind()),
761            },
762        );
763
764        (true_stream, false_stream)
765    }
766
767    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
768    ///
769    /// # Example
770    /// ```rust
771    /// # #[cfg(feature = "deploy")] {
772    /// # use hydro_lang::prelude::*;
773    /// # use futures::StreamExt;
774    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
775    /// process
776    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
777    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
778    /// # }, |mut stream| async move {
779    /// // 1, 2
780    /// # for w in (1..3) {
781    /// #     assert_eq!(stream.next().await.unwrap(), w);
782    /// # }
783    /// # }));
784    /// # }
785    /// ```
786    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
787    where
788        F: Fn(T) -> Option<U> + 'a,
789    {
790        let f = f.splice_fn1_ctx(&self.location).into();
791        Stream::new(
792            self.location.clone(),
793            HydroNode::FilterMap {
794                f,
795                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
796                metadata: self
797                    .location
798                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
799            },
800        )
801    }
802
803    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
804    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
805    /// If `other` is an empty [`Optional`], no values will be produced.
806    ///
807    /// # Example
808    /// ```rust
809    /// # #[cfg(feature = "deploy")] {
810    /// # use hydro_lang::prelude::*;
811    /// # use futures::StreamExt;
812    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
813    /// let tick = process.tick();
814    /// let batch = process
815    ///   .source_iter(q!(vec![1, 2, 3, 4]))
816    ///   .batch(&tick, nondet!(/** test */));
817    /// let count = batch.clone().count(); // `count()` returns a singleton
818    /// batch.cross_singleton(count).all_ticks()
819    /// # }, |mut stream| async move {
820    /// // (1, 4), (2, 4), (3, 4), (4, 4)
821    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
822    /// #     assert_eq!(stream.next().await.unwrap(), w);
823    /// # }
824    /// # }));
825    /// # }
826    /// ```
827    pub fn cross_singleton<O2>(
828        self,
829        other: impl Into<Optional<O2, L, Bounded>>,
830    ) -> Stream<(T, O2), L, B, O, R>
831    where
832        O2: Clone,
833    {
834        let other: Optional<O2, L, Bounded> = other.into();
835        check_matching_location(&self.location, &other.location);
836
837        Stream::new(
838            self.location.clone(),
839            HydroNode::CrossSingleton {
840                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
841                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
842                metadata: self
843                    .location
844                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
845            },
846        )
847    }
848
849    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
850    ///
851    /// # Example
852    /// ```rust
853    /// # #[cfg(feature = "deploy")] {
854    /// # use hydro_lang::prelude::*;
855    /// # use futures::StreamExt;
856    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
857    /// let tick = process.tick();
858    /// // ticks are lazy by default, forces the second tick to run
859    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
860    ///
861    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
862    /// let batch_first_tick = process
863    ///   .source_iter(q!(vec![1, 2, 3, 4]))
864    ///   .batch(&tick, nondet!(/** test */));
865    /// let batch_second_tick = process
866    ///   .source_iter(q!(vec![5, 6, 7, 8]))
867    ///   .batch(&tick, nondet!(/** test */))
868    ///   .defer_tick();
869    /// batch_first_tick.chain(batch_second_tick)
870    ///   .filter_if(signal)
871    ///   .all_ticks()
872    /// # }, |mut stream| async move {
873    /// // [1, 2, 3, 4]
874    /// # for w in vec![1, 2, 3, 4] {
875    /// #     assert_eq!(stream.next().await.unwrap(), w);
876    /// # }
877    /// # }));
878    /// # }
879    /// ```
880    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
881        self.cross_singleton(signal.filter(q!(|b| *b)))
882            .map(q!(|(d, _)| d))
883    }
884
885    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
886    ///
887    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
888    /// leader of a cluster.
889    ///
890    /// # Example
891    /// ```rust
892    /// # #[cfg(feature = "deploy")] {
893    /// # use hydro_lang::prelude::*;
894    /// # use futures::StreamExt;
895    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
896    /// let tick = process.tick();
897    /// // ticks are lazy by default, forces the second tick to run
898    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
899    ///
900    /// let batch_first_tick = process
901    ///   .source_iter(q!(vec![1, 2, 3, 4]))
902    ///   .batch(&tick, nondet!(/** test */));
903    /// let batch_second_tick = process
904    ///   .source_iter(q!(vec![5, 6, 7, 8]))
905    ///   .batch(&tick, nondet!(/** test */))
906    ///   .defer_tick(); // appears on the second tick
907    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
908    /// batch_first_tick.chain(batch_second_tick)
909    ///   .filter_if_some(some_on_first_tick)
910    ///   .all_ticks()
911    /// # }, |mut stream| async move {
912    /// // [1, 2, 3, 4]
913    /// # for w in vec![1, 2, 3, 4] {
914    /// #     assert_eq!(stream.next().await.unwrap(), w);
915    /// # }
916    /// # }));
917    /// # }
918    /// ```
919    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
920    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
921        self.filter_if(signal.is_some())
922    }
923
924    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
925    ///
926    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
927    /// some local state.
928    ///
929    /// # Example
930    /// ```rust
931    /// # #[cfg(feature = "deploy")] {
932    /// # use hydro_lang::prelude::*;
933    /// # use futures::StreamExt;
934    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
935    /// let tick = process.tick();
936    /// // ticks are lazy by default, forces the second tick to run
937    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
938    ///
939    /// let batch_first_tick = process
940    ///   .source_iter(q!(vec![1, 2, 3, 4]))
941    ///   .batch(&tick, nondet!(/** test */));
942    /// let batch_second_tick = process
943    ///   .source_iter(q!(vec![5, 6, 7, 8]))
944    ///   .batch(&tick, nondet!(/** test */))
945    ///   .defer_tick(); // appears on the second tick
946    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
947    /// batch_first_tick.chain(batch_second_tick)
948    ///   .filter_if_none(some_on_first_tick)
949    ///   .all_ticks()
950    /// # }, |mut stream| async move {
951    /// // [5, 6, 7, 8]
952    /// # for w in vec![5, 6, 7, 8] {
953    /// #     assert_eq!(stream.next().await.unwrap(), w);
954    /// # }
955    /// # }));
956    /// # }
957    /// ```
958    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
959    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
960        self.filter_if(other.is_none())
961    }
962
963    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
964    /// tupled pairs in a non-deterministic order.
965    ///
966    /// # Example
967    /// ```rust
968    /// # #[cfg(feature = "deploy")] {
969    /// # use hydro_lang::prelude::*;
970    /// # use std::collections::HashSet;
971    /// # use futures::StreamExt;
972    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
973    /// let tick = process.tick();
974    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
975    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
976    /// stream1.cross_product(stream2)
977    /// # }, |mut stream| async move {
978    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
979    /// # stream.map(|i| assert!(expected.contains(&i)));
980    /// # }));
981    /// # }
982    /// ```
983    pub fn cross_product<T2, O2: Ordering>(
984        self,
985        other: Stream<T2, L, B, O2, R>,
986    ) -> Stream<(T, T2), L, B, NoOrder, R>
987    where
988        T: Clone,
989        T2: Clone,
990    {
991        check_matching_location(&self.location, &other.location);
992
993        Stream::new(
994            self.location.clone(),
995            HydroNode::CrossProduct {
996                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
997                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
998                metadata: self
999                    .location
1000                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
1001            },
1002        )
1003    }
1004
1005    /// Takes one stream as input and filters out any duplicate occurrences. The output
1006    /// contains all unique values from the input.
1007    ///
1008    /// # Example
1009    /// ```rust
1010    /// # #[cfg(feature = "deploy")] {
1011    /// # use hydro_lang::prelude::*;
1012    /// # use futures::StreamExt;
1013    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1014    /// let tick = process.tick();
1015    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1016    /// # }, |mut stream| async move {
1017    /// # for w in vec![1, 2, 3, 4] {
1018    /// #     assert_eq!(stream.next().await.unwrap(), w);
1019    /// # }
1020    /// # }));
1021    /// # }
1022    /// ```
1023    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1024    where
1025        T: Eq + Hash,
1026    {
1027        Stream::new(
1028            self.location.clone(),
1029            HydroNode::Unique {
1030                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1031                metadata: self
1032                    .location
1033                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1034            },
1035        )
1036    }
1037
1038    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1039    ///
1040    /// The `other` stream must be [`Bounded`], since this function will wait until
1041    /// all its elements are available before producing any output.
1042    /// # Example
1043    /// ```rust
1044    /// # #[cfg(feature = "deploy")] {
1045    /// # use hydro_lang::prelude::*;
1046    /// # use futures::StreamExt;
1047    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1048    /// let tick = process.tick();
1049    /// let stream = process
1050    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1051    ///   .batch(&tick, nondet!(/** test */));
1052    /// let batch = process
1053    ///   .source_iter(q!(vec![1, 2]))
1054    ///   .batch(&tick, nondet!(/** test */));
1055    /// stream.filter_not_in(batch).all_ticks()
1056    /// # }, |mut stream| async move {
1057    /// # for w in vec![3, 4] {
1058    /// #     assert_eq!(stream.next().await.unwrap(), w);
1059    /// # }
1060    /// # }));
1061    /// # }
1062    /// ```
1063    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1064    where
1065        T: Eq + Hash,
1066        B2: IsBounded,
1067    {
1068        check_matching_location(&self.location, &other.location);
1069
1070        Stream::new(
1071            self.location.clone(),
1072            HydroNode::Difference {
1073                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1074                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1075                metadata: self
1076                    .location
1077                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1078            },
1079        )
1080    }
1081
1082    /// An operator which allows you to "inspect" each element of a stream without
1083    /// modifying it. The closure `f` is called on a reference to each item. This is
1084    /// mainly useful for debugging, and should not be used to generate side-effects.
1085    ///
1086    /// # Example
1087    /// ```rust
1088    /// # #[cfg(feature = "deploy")] {
1089    /// # use hydro_lang::prelude::*;
1090    /// # use futures::StreamExt;
1091    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1092    /// let nums = process.source_iter(q!(vec![1, 2]));
1093    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1094    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1095    /// # }, |mut stream| async move {
1096    /// # for w in vec![1, 2] {
1097    /// #     assert_eq!(stream.next().await.unwrap(), w);
1098    /// # }
1099    /// # }));
1100    /// # }
1101    /// ```
1102    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1103    where
1104        F: Fn(&T) + 'a,
1105    {
1106        let f = f.splice_fn1_borrow_ctx(&self.location).into();
1107
1108        Stream::new(
1109            self.location.clone(),
1110            HydroNode::Inspect {
1111                f,
1112                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1113                metadata: self.location.new_node_metadata(Self::collection_kind()),
1114            },
1115        )
1116    }
1117
1118    /// Executes the provided closure for every element in this stream.
1119    ///
1120    /// Because the closure may have side effects, the stream must have deterministic order
1121    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1122    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1123    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1124    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1125    where
1126        O: IsOrdered,
1127        R: IsExactlyOnce,
1128    {
1129        let f = f.splice_fn1_ctx(&self.location).into();
1130        self.location
1131            .flow_state()
1132            .borrow_mut()
1133            .push_root(HydroRoot::ForEach {
1134                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1135                f,
1136                op_metadata: HydroIrOpMetadata::new(),
1137            });
1138    }
1139
1140    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1141    /// TCP socket to some other server. You should _not_ use this API for interacting with
1142    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1143    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1144    /// interaction with asynchronous sinks.
1145    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1146    where
1147        O: IsOrdered,
1148        R: IsExactlyOnce,
1149        S: 'a + futures::Sink<T> + Unpin,
1150    {
1151        self.location
1152            .flow_state()
1153            .borrow_mut()
1154            .push_root(HydroRoot::DestSink {
1155                sink: sink.splice_typed_ctx(&self.location).into(),
1156                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1157                op_metadata: HydroIrOpMetadata::new(),
1158            });
1159    }
1160
1161    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1162    ///
1163    /// # Example
1164    /// ```rust
1165    /// # #[cfg(feature = "deploy")] {
1166    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1167    /// # use futures::StreamExt;
1168    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1169    /// let tick = process.tick();
1170    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1171    /// numbers.enumerate()
1172    /// # }, |mut stream| async move {
1173    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1174    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1175    /// #     assert_eq!(stream.next().await.unwrap(), w);
1176    /// # }
1177    /// # }));
1178    /// # }
1179    /// ```
1180    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1181    where
1182        O: IsOrdered,
1183        R: IsExactlyOnce,
1184    {
1185        Stream::new(
1186            self.location.clone(),
1187            HydroNode::Enumerate {
1188                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1189                metadata: self.location.new_node_metadata(Stream::<
1190                    (usize, T),
1191                    L,
1192                    B,
1193                    TotalOrder,
1194                    ExactlyOnce,
1195                >::collection_kind()),
1196            },
1197        )
1198    }
1199
1200    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1201    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1202    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1203    ///
1204    /// Depending on the input stream guarantees, the closure may need to be commutative
1205    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1206    ///
1207    /// # Example
1208    /// ```rust
1209    /// # #[cfg(feature = "deploy")] {
1210    /// # use hydro_lang::prelude::*;
1211    /// # use futures::StreamExt;
1212    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1213    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1214    /// words
1215    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1216    ///     .into_stream()
1217    /// # }, |mut stream| async move {
1218    /// // "HELLOWORLD"
1219    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1220    /// # }));
1221    /// # }
1222    /// ```
1223    pub fn fold<A, I, F, C, Idemp>(
1224        self,
1225        init: impl IntoQuotedMut<'a, I, L>,
1226        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1227    ) -> Singleton<A, L, B>
1228    where
1229        I: Fn() -> A + 'a,
1230        F: Fn(&mut A, T),
1231        C: ValidCommutativityFor<O>,
1232        Idemp: ValidIdempotenceFor<R>,
1233    {
1234        let init = init.splice_fn0_ctx(&self.location).into();
1235        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1236        proof.register_proof(&comb);
1237
1238        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1239        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1240
1241        let core = HydroNode::Fold {
1242            init,
1243            acc: comb.into(),
1244            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1245            metadata: ordered_etc
1246                .location
1247                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1248        };
1249
1250        Singleton::new(ordered_etc.location.clone(), core)
1251    }
1252
1253    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1254    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1255    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1256    /// reference, so that it can be modified in place.
1257    ///
1258    /// Depending on the input stream guarantees, the closure may need to be commutative
1259    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1260    ///
1261    /// # Example
1262    /// ```rust
1263    /// # #[cfg(feature = "deploy")] {
1264    /// # use hydro_lang::prelude::*;
1265    /// # use futures::StreamExt;
1266    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1267    /// let bools = process.source_iter(q!(vec![false, true, false]));
1268    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1269    /// # }, |mut stream| async move {
1270    /// // true
1271    /// # assert_eq!(stream.next().await.unwrap(), true);
1272    /// # }));
1273    /// # }
1274    /// ```
1275    pub fn reduce<F, C, Idemp>(
1276        self,
1277        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1278    ) -> Optional<T, L, B>
1279    where
1280        F: Fn(&mut T, T) + 'a,
1281        C: ValidCommutativityFor<O>,
1282        Idemp: ValidIdempotenceFor<R>,
1283    {
1284        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1285        proof.register_proof(&f);
1286
1287        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1288        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1289
1290        let core = HydroNode::Reduce {
1291            f: f.into(),
1292            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1293            metadata: ordered_etc
1294                .location
1295                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1296        };
1297
1298        Optional::new(ordered_etc.location.clone(), core)
1299    }
1300
1301    /// Computes the maximum element in the stream as an [`Optional`], which
1302    /// will be empty until the first element in the input arrives.
1303    ///
1304    /// # Example
1305    /// ```rust
1306    /// # #[cfg(feature = "deploy")] {
1307    /// # use hydro_lang::prelude::*;
1308    /// # use futures::StreamExt;
1309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1310    /// let tick = process.tick();
1311    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1312    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1313    /// batch.max().all_ticks()
1314    /// # }, |mut stream| async move {
1315    /// // 4
1316    /// # assert_eq!(stream.next().await.unwrap(), 4);
1317    /// # }));
1318    /// # }
1319    /// ```
1320    pub fn max(self) -> Optional<T, L, B>
1321    where
1322        T: Ord,
1323    {
1324        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1325            .assume_ordering_trusted_bounded::<TotalOrder>(
1326                nondet!(/** max is commutative, but order affects intermediates */),
1327            )
1328            .reduce(q!(|curr, new| {
1329                if new > *curr {
1330                    *curr = new;
1331                }
1332            }))
1333    }
1334
1335    /// Computes the minimum element in the stream as an [`Optional`], which
1336    /// will be empty until the first element in the input arrives.
1337    ///
1338    /// # Example
1339    /// ```rust
1340    /// # #[cfg(feature = "deploy")] {
1341    /// # use hydro_lang::prelude::*;
1342    /// # use futures::StreamExt;
1343    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1344    /// let tick = process.tick();
1345    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1346    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1347    /// batch.min().all_ticks()
1348    /// # }, |mut stream| async move {
1349    /// // 1
1350    /// # assert_eq!(stream.next().await.unwrap(), 1);
1351    /// # }));
1352    /// # }
1353    /// ```
1354    pub fn min(self) -> Optional<T, L, B>
1355    where
1356        T: Ord,
1357    {
1358        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1359            .assume_ordering_trusted_bounded::<TotalOrder>(
1360                nondet!(/** max is commutative, but order affects intermediates */),
1361            )
1362            .reduce(q!(|curr, new| {
1363                if new < *curr {
1364                    *curr = new;
1365                }
1366            }))
1367    }
1368
1369    /// Computes the first element in the stream as an [`Optional`], which
1370    /// will be empty until the first element in the input arrives.
1371    ///
1372    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1373    /// re-ordering of elements may cause the first element to change.
1374    ///
1375    /// # Example
1376    /// ```rust
1377    /// # #[cfg(feature = "deploy")] {
1378    /// # use hydro_lang::prelude::*;
1379    /// # use futures::StreamExt;
1380    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1381    /// let tick = process.tick();
1382    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1383    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1384    /// batch.first().all_ticks()
1385    /// # }, |mut stream| async move {
1386    /// // 1
1387    /// # assert_eq!(stream.next().await.unwrap(), 1);
1388    /// # }));
1389    /// # }
1390    /// ```
1391    pub fn first(self) -> Optional<T, L, B>
1392    where
1393        O: IsOrdered,
1394    {
1395        self.make_totally_ordered()
1396            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1397            .reduce(q!(|_, _| {}))
1398    }
1399
1400    /// Computes the last element in the stream as an [`Optional`], which
1401    /// will be empty until an element in the input arrives.
1402    ///
1403    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1404    /// re-ordering of elements may cause the last element to change.
1405    ///
1406    /// # Example
1407    /// ```rust
1408    /// # #[cfg(feature = "deploy")] {
1409    /// # use hydro_lang::prelude::*;
1410    /// # use futures::StreamExt;
1411    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1412    /// let tick = process.tick();
1413    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1414    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1415    /// batch.last().all_ticks()
1416    /// # }, |mut stream| async move {
1417    /// // 4
1418    /// # assert_eq!(stream.next().await.unwrap(), 4);
1419    /// # }));
1420    /// # }
1421    /// ```
1422    pub fn last(self) -> Optional<T, L, B>
1423    where
1424        O: IsOrdered,
1425    {
1426        self.make_totally_ordered()
1427            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1428            .reduce(q!(|curr, new| *curr = new))
1429    }
1430
1431    /// Collects all the elements of this stream into a single [`Vec`] element.
1432    ///
1433    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1434    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1435    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1436    /// the vector at an arbitrary point in time.
1437    ///
1438    /// # Example
1439    /// ```rust
1440    /// # #[cfg(feature = "deploy")] {
1441    /// # use hydro_lang::prelude::*;
1442    /// # use futures::StreamExt;
1443    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1444    /// let tick = process.tick();
1445    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1446    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1447    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1448    /// # }, |mut stream| async move {
1449    /// // [ vec![1, 2, 3, 4] ]
1450    /// # for w in vec![vec![1, 2, 3, 4]] {
1451    /// #     assert_eq!(stream.next().await.unwrap(), w);
1452    /// # }
1453    /// # }));
1454    /// # }
1455    /// ```
1456    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1457    where
1458        O: IsOrdered,
1459        R: IsExactlyOnce,
1460    {
1461        self.make_totally_ordered().make_exactly_once().fold(
1462            q!(|| vec![]),
1463            q!(|acc, v| {
1464                acc.push(v);
1465            }),
1466        )
1467    }
1468
1469    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1470    /// and emitting each intermediate result.
1471    ///
1472    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1473    /// containing all intermediate accumulated values. The scan operation can also terminate early
1474    /// by returning `None`.
1475    ///
1476    /// The function takes a mutable reference to the accumulator and the current element, and returns
1477    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1478    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1479    ///
1480    /// # Examples
1481    ///
1482    /// Basic usage - running sum:
1483    /// ```rust
1484    /// # #[cfg(feature = "deploy")] {
1485    /// # use hydro_lang::prelude::*;
1486    /// # use futures::StreamExt;
1487    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1488    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1489    ///     q!(|| 0),
1490    ///     q!(|acc, x| {
1491    ///         *acc += x;
1492    ///         Some(*acc)
1493    ///     }),
1494    /// )
1495    /// # }, |mut stream| async move {
1496    /// // Output: 1, 3, 6, 10
1497    /// # for w in vec![1, 3, 6, 10] {
1498    /// #     assert_eq!(stream.next().await.unwrap(), w);
1499    /// # }
1500    /// # }));
1501    /// # }
1502    /// ```
1503    ///
1504    /// Early termination example:
1505    /// ```rust
1506    /// # #[cfg(feature = "deploy")] {
1507    /// # use hydro_lang::prelude::*;
1508    /// # use futures::StreamExt;
1509    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1510    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1511    ///     q!(|| 1),
1512    ///     q!(|state, x| {
1513    ///         *state = *state * x;
1514    ///         if *state > 6 {
1515    ///             None // Terminate the stream
1516    ///         } else {
1517    ///             Some(-*state)
1518    ///         }
1519    ///     }),
1520    /// )
1521    /// # }, |mut stream| async move {
1522    /// // Output: -1, -2, -6
1523    /// # for w in vec![-1, -2, -6] {
1524    /// #     assert_eq!(stream.next().await.unwrap(), w);
1525    /// # }
1526    /// # }));
1527    /// # }
1528    /// ```
1529    pub fn scan<A, U, I, F>(
1530        self,
1531        init: impl IntoQuotedMut<'a, I, L>,
1532        f: impl IntoQuotedMut<'a, F, L>,
1533    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1534    where
1535        O: IsOrdered,
1536        R: IsExactlyOnce,
1537        I: Fn() -> A + 'a,
1538        F: Fn(&mut A, T) -> Option<U> + 'a,
1539    {
1540        let init = init.splice_fn0_ctx(&self.location).into();
1541        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1542
1543        Stream::new(
1544            self.location.clone(),
1545            HydroNode::Scan {
1546                init,
1547                acc: f,
1548                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1549                metadata: self.location.new_node_metadata(
1550                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1551                ),
1552            },
1553        )
1554    }
1555
1556    /// Iteratively processes the elements of the stream using a state machine that can yield
1557    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1558    /// syntax in Rust, without requiring special syntax.
1559    ///
1560    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1561    /// state. The second argument defines the processing logic, taking in a mutable reference
1562    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1563    /// variants define what is emitted and whether further inputs should be processed.
1564    ///
1565    /// # Example
1566    /// ```rust
1567    /// # #[cfg(feature = "deploy")] {
1568    /// # use hydro_lang::prelude::*;
1569    /// # use futures::StreamExt;
1570    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1571    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1572    ///     q!(|| 0),
1573    ///     q!(|acc, x| {
1574    ///         *acc += x;
1575    ///         if *acc > 100 {
1576    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1577    ///         } else if *acc % 2 == 0 {
1578    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1579    ///         } else {
1580    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1581    ///         }
1582    ///     }),
1583    /// )
1584    /// # }, |mut stream| async move {
1585    /// // Output: "even", "done!"
1586    /// # let mut results = Vec::new();
1587    /// # for _ in 0..2 {
1588    /// #     results.push(stream.next().await.unwrap());
1589    /// # }
1590    /// # results.sort();
1591    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1592    /// # }));
1593    /// # }
1594    /// ```
1595    pub fn generator<A, U, I, F>(
1596        self,
1597        init: impl IntoQuotedMut<'a, I, L> + Copy,
1598        f: impl IntoQuotedMut<'a, F, L> + Copy,
1599    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1600    where
1601        O: IsOrdered,
1602        R: IsExactlyOnce,
1603        I: Fn() -> A + 'a,
1604        F: Fn(&mut A, T) -> Generate<U> + 'a,
1605    {
1606        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1607        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1608
1609        let this = self.make_totally_ordered().make_exactly_once();
1610
1611        // State is Option<Option<A>>:
1612        //   None = not yet initialized
1613        //   Some(Some(a)) = active with state a
1614        //   Some(None) = terminated
1615        let scan_init = q!(|| None)
1616            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1617            .into();
1618        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1619            if state.is_none() {
1620                *state = Some(Some(init()));
1621            }
1622            match state {
1623                Some(Some(state_value)) => match f(state_value, v) {
1624                    Generate::Yield(out) => Some(Some(out)),
1625                    Generate::Return(out) => {
1626                        *state = Some(None);
1627                        Some(Some(out))
1628                    }
1629                    // Unlike KeyedStream, we can terminate the scan directly on
1630                    // Break/Return because there is only one state (no other keys
1631                    // that still need processing).
1632                    Generate::Break => None,
1633                    Generate::Continue => Some(None),
1634                },
1635                // State is Some(None) after Return; terminate the scan.
1636                _ => None,
1637            }
1638        })
1639        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1640        .into();
1641
1642        let scan_node = HydroNode::Scan {
1643            init: scan_init,
1644            acc: scan_f,
1645            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1646            metadata: this.location.new_node_metadata(Stream::<
1647                Option<U>,
1648                L,
1649                B,
1650                TotalOrder,
1651                ExactlyOnce,
1652            >::collection_kind()),
1653        };
1654
1655        let flatten_f = q!(|d| d)
1656            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1657            .into();
1658        let flatten_node = HydroNode::FlatMap {
1659            f: flatten_f,
1660            input: Box::new(scan_node),
1661            metadata: this
1662                .location
1663                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1664        };
1665
1666        Stream::new(this.location.clone(), flatten_node)
1667    }
1668
1669    /// Given a time interval, returns a stream corresponding to samples taken from the
1670    /// stream roughly at that interval. The output will have elements in the same order
1671    /// as the input, but with arbitrary elements skipped between samples. There is also
1672    /// no guarantee on the exact timing of the samples.
1673    ///
1674    /// # Non-Determinism
1675    /// The output stream is non-deterministic in which elements are sampled, since this
1676    /// is controlled by a clock.
1677    pub fn sample_every(
1678        self,
1679        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1680        nondet: NonDet,
1681    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1682    where
1683        L: NoTick + NoAtomic,
1684    {
1685        let samples = self.location.source_interval(interval, nondet);
1686
1687        let tick = self.location.tick();
1688        self.batch(&tick, nondet)
1689            .filter_if(samples.batch(&tick, nondet).first().is_some())
1690            .all_ticks()
1691            .weaken_retries()
1692    }
1693
1694    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1695    /// stream has not emitted a value since that duration.
1696    ///
1697    /// # Non-Determinism
1698    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1699    /// samples take place, timeouts may be non-deterministically generated or missed,
1700    /// and the notification of the timeout may be delayed as well. There is also no
1701    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1702    /// detected based on when the next sample is taken.
1703    pub fn timeout(
1704        self,
1705        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1706        nondet: NonDet,
1707    ) -> Optional<(), L, Unbounded>
1708    where
1709        L: NoTick + NoAtomic,
1710    {
1711        let tick = self.location.tick();
1712
1713        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1714            q!(|| None),
1715            q!(
1716                |latest, _| {
1717                    *latest = Some(Instant::now());
1718                },
1719                commutative = manual_proof!(/** TODO */)
1720            ),
1721        );
1722
1723        latest_received
1724            .snapshot(&tick, nondet)
1725            .filter_map(q!(move |latest_received| {
1726                if let Some(latest_received) = latest_received {
1727                    if Instant::now().duration_since(latest_received) > duration {
1728                        Some(())
1729                    } else {
1730                        None
1731                    }
1732                } else {
1733                    Some(())
1734                }
1735            }))
1736            .latest()
1737    }
1738
1739    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1740    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1741    ///
1742    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1743    /// processed before an acknowledgement is emitted.
1744    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1745        let id = self.location.flow_state().borrow_mut().next_clock_id();
1746        let out_location = Atomic {
1747            tick: Tick {
1748                id,
1749                l: self.location.clone(),
1750            },
1751        };
1752        Stream::new(
1753            out_location.clone(),
1754            HydroNode::BeginAtomic {
1755                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1756                metadata: out_location
1757                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1758            },
1759        )
1760    }
1761
1762    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1763    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1764    /// the order of the input. The output stream will execute in the [`Tick`] that was
1765    /// used to create the atomic section.
1766    ///
1767    /// # Non-Determinism
1768    /// The batch boundaries are non-deterministic and may change across executions.
1769    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1770        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1771        Stream::new(
1772            tick.clone(),
1773            HydroNode::Batch {
1774                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1775                metadata: tick
1776                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1777            },
1778        )
1779    }
1780
1781    /// An operator which allows you to "name" a `HydroNode`.
1782    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1783    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1784        {
1785            let mut node = self.ir_node.borrow_mut();
1786            let metadata = node.metadata_mut();
1787            metadata.tag = Some(name.to_owned());
1788        }
1789        self
1790    }
1791
1792    /// Explicitly "casts" the stream to a type with a different ordering
1793    /// guarantee. Useful in unsafe code where the ordering cannot be proven
1794    /// by the type-system.
1795    ///
1796    /// # Non-Determinism
1797    /// This function is used as an escape hatch, and any mistakes in the
1798    /// provided ordering guarantee will propagate into the guarantees
1799    /// for the rest of the program.
1800    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1801        if O::ORDERING_KIND == O2::ORDERING_KIND {
1802            Stream::new(
1803                self.location.clone(),
1804                self.ir_node.replace(HydroNode::Placeholder),
1805            )
1806        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1807            // We can always weaken the ordering guarantee
1808            Stream::new(
1809                self.location.clone(),
1810                HydroNode::Cast {
1811                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1812                    metadata: self
1813                        .location
1814                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1815                },
1816            )
1817        } else {
1818            Stream::new(
1819                self.location.clone(),
1820                HydroNode::ObserveNonDet {
1821                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1822                    trusted: false,
1823                    metadata: self
1824                        .location
1825                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1826                },
1827            )
1828        }
1829    }
1830
1831    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1832    // intermediate states will not be revealed
1833    fn assume_ordering_trusted_bounded<O2: Ordering>(
1834        self,
1835        nondet: NonDet,
1836    ) -> Stream<T, L, B, O2, R> {
1837        if B::BOUNDED {
1838            self.assume_ordering_trusted(nondet)
1839        } else {
1840            self.assume_ordering(nondet)
1841        }
1842    }
1843
1844    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1845    // is not observable
1846    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1847        self,
1848        _nondet: NonDet,
1849    ) -> Stream<T, L, B, O2, R> {
1850        if O::ORDERING_KIND == O2::ORDERING_KIND {
1851            Stream::new(
1852                self.location.clone(),
1853                self.ir_node.replace(HydroNode::Placeholder),
1854            )
1855        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1856            // We can always weaken the ordering guarantee
1857            Stream::new(
1858                self.location.clone(),
1859                HydroNode::Cast {
1860                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1861                    metadata: self
1862                        .location
1863                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1864                },
1865            )
1866        } else {
1867            Stream::new(
1868                self.location.clone(),
1869                HydroNode::ObserveNonDet {
1870                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1871                    trusted: true,
1872                    metadata: self
1873                        .location
1874                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1875                },
1876            )
1877        }
1878    }
1879
1880    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1881    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1882    /// which is always safe because that is the weakest possible guarantee.
1883    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1884        self.weaken_ordering::<NoOrder>()
1885    }
1886
1887    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1888    /// enforcing that `O2` is weaker than the input ordering guarantee.
1889    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1890        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1891        self.assume_ordering::<O2>(nondet)
1892    }
1893
1894    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1895    /// implies that `O == TotalOrder`.
1896    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1897    where
1898        O: IsOrdered,
1899    {
1900        self.assume_ordering(nondet!(/** no-op */))
1901    }
1902
1903    /// Explicitly "casts" the stream to a type with a different retries
1904    /// guarantee. Useful in unsafe code where the lack of retries cannot
1905    /// be proven by the type-system.
1906    ///
1907    /// # Non-Determinism
1908    /// This function is used as an escape hatch, and any mistakes in the
1909    /// provided retries guarantee will propagate into the guarantees
1910    /// for the rest of the program.
1911    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1912        if R::RETRIES_KIND == R2::RETRIES_KIND {
1913            Stream::new(
1914                self.location.clone(),
1915                self.ir_node.replace(HydroNode::Placeholder),
1916            )
1917        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1918            // We can always weaken the retries guarantee
1919            Stream::new(
1920                self.location.clone(),
1921                HydroNode::Cast {
1922                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1923                    metadata: self
1924                        .location
1925                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1926                },
1927            )
1928        } else {
1929            Stream::new(
1930                self.location.clone(),
1931                HydroNode::ObserveNonDet {
1932                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1933                    trusted: false,
1934                    metadata: self
1935                        .location
1936                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1937                },
1938            )
1939        }
1940    }
1941
1942    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1943    // is not observable
1944    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1945        if R::RETRIES_KIND == R2::RETRIES_KIND {
1946            Stream::new(
1947                self.location.clone(),
1948                self.ir_node.replace(HydroNode::Placeholder),
1949            )
1950        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1951            // We can always weaken the retries guarantee
1952            Stream::new(
1953                self.location.clone(),
1954                HydroNode::Cast {
1955                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1956                    metadata: self
1957                        .location
1958                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1959                },
1960            )
1961        } else {
1962            Stream::new(
1963                self.location.clone(),
1964                HydroNode::ObserveNonDet {
1965                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1966                    trusted: true,
1967                    metadata: self
1968                        .location
1969                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1970                },
1971            )
1972        }
1973    }
1974
1975    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1976    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1977    /// which is always safe because that is the weakest possible guarantee.
1978    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1979        self.weaken_retries::<AtLeastOnce>()
1980    }
1981
1982    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1983    /// enforcing that `R2` is weaker than the input retries guarantee.
1984    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1985        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1986        self.assume_retries::<R2>(nondet)
1987    }
1988
1989    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
1990    /// implies that `R == ExactlyOnce`.
1991    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
1992    where
1993        R: IsExactlyOnce,
1994    {
1995        self.assume_retries(nondet!(/** no-op */))
1996    }
1997
1998    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1999    /// implies that `B == Bounded`.
2000    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2001    where
2002        B: IsBounded,
2003    {
2004        Stream::new(
2005            self.location.clone(),
2006            self.ir_node.replace(HydroNode::Placeholder),
2007        )
2008    }
2009}
2010
2011impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2012where
2013    L: Location<'a>,
2014{
2015    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2016    ///
2017    /// # Example
2018    /// ```rust
2019    /// # #[cfg(feature = "deploy")] {
2020    /// # use hydro_lang::prelude::*;
2021    /// # use futures::StreamExt;
2022    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2023    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2024    /// # }, |mut stream| async move {
2025    /// // 1, 2, 3
2026    /// # for w in vec![1, 2, 3] {
2027    /// #     assert_eq!(stream.next().await.unwrap(), w);
2028    /// # }
2029    /// # }));
2030    /// # }
2031    /// ```
2032    pub fn cloned(self) -> Stream<T, L, B, O, R>
2033    where
2034        T: Clone,
2035    {
2036        self.map(q!(|d| d.clone()))
2037    }
2038}
2039
2040impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2041where
2042    L: Location<'a>,
2043{
2044    /// Computes the number of elements in the stream as a [`Singleton`].
2045    ///
2046    /// # Example
2047    /// ```rust
2048    /// # #[cfg(feature = "deploy")] {
2049    /// # use hydro_lang::prelude::*;
2050    /// # use futures::StreamExt;
2051    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2052    /// let tick = process.tick();
2053    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2054    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2055    /// batch.count().all_ticks()
2056    /// # }, |mut stream| async move {
2057    /// // 4
2058    /// # assert_eq!(stream.next().await.unwrap(), 4);
2059    /// # }));
2060    /// # }
2061    /// ```
2062    pub fn count(self) -> Singleton<usize, L, B> {
2063        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2064            /// Order does not affect eventual count, and also does not affect intermediate states.
2065        ))
2066        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
2067    }
2068}
2069
2070impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2071    /// Produces a new stream that merges the elements of the two input streams.
2072    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2073    ///
2074    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2075    /// [`Bounded`], you can use [`Stream::chain`] instead.
2076    ///
2077    /// # Example
2078    /// ```rust
2079    /// # #[cfg(feature = "deploy")] {
2080    /// # use hydro_lang::prelude::*;
2081    /// # use futures::StreamExt;
2082    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2083    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2084    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2085    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2086    /// # }, |mut stream| async move {
2087    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2088    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2089    /// #     assert_eq!(stream.next().await.unwrap(), w);
2090    /// # }
2091    /// # }));
2092    /// # }
2093    /// ```
2094    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2095        self,
2096        other: Stream<T, L, Unbounded, O2, R2>,
2097    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2098    where
2099        R: MinRetries<R2>,
2100    {
2101        Stream::new(
2102            self.location.clone(),
2103            HydroNode::Chain {
2104                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2105                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2106                metadata: self.location.new_node_metadata(Stream::<
2107                    T,
2108                    L,
2109                    Unbounded,
2110                    NoOrder,
2111                    <R as MinRetries<R2>>::Min,
2112                >::collection_kind()),
2113            },
2114        )
2115    }
2116
2117    /// Deprecated: use [`Stream::merge_unordered`] instead.
2118    #[deprecated(note = "use `merge_unordered` instead")]
2119    pub fn interleave<O2: Ordering, R2: Retries>(
2120        self,
2121        other: Stream<T, L, Unbounded, O2, R2>,
2122    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2123    where
2124        R: MinRetries<R2>,
2125    {
2126        self.merge_unordered(other)
2127    }
2128}
2129
2130impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
2131    /// Produces a new stream that combines the elements of the two input streams,
2132    /// preserving the relative order of elements within each input.
2133    ///
2134    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2135    /// [`Bounded`], you can use [`Stream::chain`] instead.
2136    ///
2137    /// # Non-Determinism
2138    /// The order in which elements *across* the two streams will be interleaved is
2139    /// non-deterministic, so the order of elements will vary across runs. If the output order
2140    /// is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic but emits an
2141    /// unordered stream.
2142    ///
2143    /// # Example
2144    /// ```rust
2145    /// # #[cfg(feature = "deploy")] {
2146    /// # use hydro_lang::prelude::*;
2147    /// # use futures::StreamExt;
2148    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2149    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2150    /// # process.source_iter(q!(vec![1, 3])).into();
2151    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2152    /// # }, |mut stream| async move {
2153    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2154    /// # for w in vec![1, 3, 2, 4] {
2155    /// #     assert_eq!(stream.next().await.unwrap(), w);
2156    /// # }
2157    /// # }));
2158    /// # }
2159    /// ```
2160    pub fn merge_ordered<R2: Retries>(
2161        self,
2162        other: Stream<T, L, Unbounded, TotalOrder, R2>,
2163        _nondet: NonDet,
2164    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
2165    where
2166        R: MinRetries<R2>,
2167    {
2168        Stream::new(
2169            self.location.clone(),
2170            HydroNode::Chain {
2171                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2172                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2173                metadata: self.location.new_node_metadata(Stream::<
2174                    T,
2175                    L,
2176                    Unbounded,
2177                    TotalOrder,
2178                    <R as MinRetries<R2>>::Min,
2179                >::collection_kind()),
2180            },
2181        )
2182    }
2183}
2184
2185impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2186where
2187    L: Location<'a>,
2188{
2189    /// Produces a new stream that emits the input elements in sorted order.
2190    ///
2191    /// The input stream can have any ordering guarantee, but the output stream
2192    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2193    /// elements in the input stream are available, so it requires the input stream
2194    /// to be [`Bounded`].
2195    ///
2196    /// # Example
2197    /// ```rust
2198    /// # #[cfg(feature = "deploy")] {
2199    /// # use hydro_lang::prelude::*;
2200    /// # use futures::StreamExt;
2201    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2202    /// let tick = process.tick();
2203    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2204    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2205    /// batch.sort().all_ticks()
2206    /// # }, |mut stream| async move {
2207    /// // 1, 2, 3, 4
2208    /// # for w in (1..5) {
2209    /// #     assert_eq!(stream.next().await.unwrap(), w);
2210    /// # }
2211    /// # }));
2212    /// # }
2213    /// ```
2214    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2215    where
2216        B: IsBounded,
2217        T: Ord,
2218    {
2219        let this = self.make_bounded();
2220        Stream::new(
2221            this.location.clone(),
2222            HydroNode::Sort {
2223                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2224                metadata: this
2225                    .location
2226                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2227            },
2228        )
2229    }
2230
2231    /// Produces a new stream that first emits the elements of the `self` stream,
2232    /// and then emits the elements of the `other` stream. The output stream has
2233    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2234    /// [`TotalOrder`] guarantee.
2235    ///
2236    /// Currently, both input streams must be [`Bounded`]. This operator will block
2237    /// on the first stream until all its elements are available. In a future version,
2238    /// we will relax the requirement on the `other` stream.
2239    ///
2240    /// # Example
2241    /// ```rust
2242    /// # #[cfg(feature = "deploy")] {
2243    /// # use hydro_lang::prelude::*;
2244    /// # use futures::StreamExt;
2245    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2246    /// let tick = process.tick();
2247    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2248    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2249    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2250    /// # }, |mut stream| async move {
2251    /// // 2, 3, 4, 5, 1, 2, 3, 4
2252    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2253    /// #     assert_eq!(stream.next().await.unwrap(), w);
2254    /// # }
2255    /// # }));
2256    /// # }
2257    /// ```
2258    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2259        self,
2260        other: Stream<T, L, B2, O2, R2>,
2261    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2262    where
2263        B: IsBounded,
2264        O: MinOrder<O2>,
2265        R: MinRetries<R2>,
2266    {
2267        check_matching_location(&self.location, &other.location);
2268
2269        Stream::new(
2270            self.location.clone(),
2271            HydroNode::Chain {
2272                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2273                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2274                metadata: self.location.new_node_metadata(Stream::<
2275                    T,
2276                    L,
2277                    B2,
2278                    <O as MinOrder<O2>>::Min,
2279                    <R as MinRetries<R2>>::Min,
2280                >::collection_kind()),
2281            },
2282        )
2283    }
2284
2285    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2286    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2287    /// because this is compiled into a nested loop.
2288    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2289        self,
2290        other: Stream<T2, L, Bounded, O2, R>,
2291    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2292    where
2293        B: IsBounded,
2294        T: Clone,
2295        T2: Clone,
2296    {
2297        let this = self.make_bounded();
2298        check_matching_location(&this.location, &other.location);
2299
2300        Stream::new(
2301            this.location.clone(),
2302            HydroNode::CrossProduct {
2303                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2304                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2305                metadata: this.location.new_node_metadata(Stream::<
2306                    (T, T2),
2307                    L,
2308                    Bounded,
2309                    <O2 as MinOrder<O>>::Min,
2310                    R,
2311                >::collection_kind()),
2312            },
2313        )
2314    }
2315
2316    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2317    /// `self` used as the values for *each* key.
2318    ///
2319    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2320    /// values. For example, it can be used to send the same set of elements to several cluster
2321    /// members, if the membership information is available as a [`KeyedSingleton`].
2322    ///
2323    /// # Example
2324    /// ```rust
2325    /// # #[cfg(feature = "deploy")] {
2326    /// # use hydro_lang::prelude::*;
2327    /// # use futures::StreamExt;
2328    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2329    /// # let tick = process.tick();
2330    /// let keyed_singleton = // { 1: (), 2: () }
2331    /// # process
2332    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2333    /// #     .into_keyed()
2334    /// #     .batch(&tick, nondet!(/** test */))
2335    /// #     .first();
2336    /// let stream = // [ "a", "b" ]
2337    /// # process
2338    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2339    /// #     .batch(&tick, nondet!(/** test */));
2340    /// stream.repeat_with_keys(keyed_singleton)
2341    /// # .entries().all_ticks()
2342    /// # }, |mut stream| async move {
2343    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2344    /// # let mut results = Vec::new();
2345    /// # for _ in 0..4 {
2346    /// #     results.push(stream.next().await.unwrap());
2347    /// # }
2348    /// # results.sort();
2349    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2350    /// # }));
2351    /// # }
2352    /// ```
2353    pub fn repeat_with_keys<K, V2>(
2354        self,
2355        keys: KeyedSingleton<K, V2, L, Bounded>,
2356    ) -> KeyedStream<K, T, L, Bounded, O, R>
2357    where
2358        B: IsBounded,
2359        K: Clone,
2360        T: Clone,
2361    {
2362        keys.keys()
2363            .weaken_retries()
2364            .assume_ordering_trusted::<TotalOrder>(
2365                nondet!(/** keyed stream does not depend on ordering of keys */),
2366            )
2367            .cross_product_nested_loop(self.make_bounded())
2368            .into_keyed()
2369    }
2370
2371    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2372    /// execution until all results are available. The output order is based on when futures
2373    /// complete, and may be different than the input order.
2374    ///
2375    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2376    /// while futures are pending, this variant blocks until the futures resolve.
2377    ///
2378    /// # Example
2379    /// ```rust
2380    /// # #[cfg(feature = "deploy")] {
2381    /// # use std::collections::HashSet;
2382    /// # use futures::StreamExt;
2383    /// # use hydro_lang::prelude::*;
2384    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2385    /// process
2386    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2387    ///     .map(q!(|x| async move {
2388    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2389    ///         x
2390    ///     }))
2391    ///     .resolve_futures_blocking()
2392    /// #   },
2393    /// #   |mut stream| async move {
2394    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2395    /// #       let mut output = HashSet::new();
2396    /// #       for _ in 1..10 {
2397    /// #           output.insert(stream.next().await.unwrap());
2398    /// #       }
2399    /// #       assert_eq!(
2400    /// #           output,
2401    /// #           HashSet::<i32>::from_iter(1..10)
2402    /// #       );
2403    /// #   },
2404    /// # ));
2405    /// # }
2406    /// ```
2407    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2408    where
2409        T: Future,
2410    {
2411        Stream::new(
2412            self.location.clone(),
2413            HydroNode::ResolveFuturesBlocking {
2414                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2415                metadata: self
2416                    .location
2417                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2418            },
2419        )
2420    }
2421
2422    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2423    ///
2424    /// # Example
2425    /// ```rust
2426    /// # #[cfg(feature = "deploy")] {
2427    /// # use hydro_lang::prelude::*;
2428    /// # use futures::StreamExt;
2429    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2430    /// let tick = process.tick();
2431    /// let empty: Stream<i32, _, Bounded> = process
2432    ///   .source_iter(q!(Vec::<i32>::new()))
2433    ///   .batch(&tick, nondet!(/** test */));
2434    /// empty.is_empty().all_ticks()
2435    /// # }, |mut stream| async move {
2436    /// // true
2437    /// # assert_eq!(stream.next().await.unwrap(), true);
2438    /// # }));
2439    /// # }
2440    /// ```
2441    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2442    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2443    where
2444        B: IsBounded,
2445    {
2446        self.make_bounded()
2447            .assume_ordering_trusted::<TotalOrder>(
2448                nondet!(/** is_empty intermediates unaffected by order */),
2449            )
2450            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** is_empty is idempotent */))
2451            .fold(q!(|| true), q!(|empty, _| { *empty = false },))
2452    }
2453}
2454
2455impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2456where
2457    L: Location<'a>,
2458{
2459    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2460    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2461    /// by equi-joining the two streams on the key attribute `K`.
2462    ///
2463    /// # Example
2464    /// ```rust
2465    /// # #[cfg(feature = "deploy")] {
2466    /// # use hydro_lang::prelude::*;
2467    /// # use std::collections::HashSet;
2468    /// # use futures::StreamExt;
2469    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2470    /// let tick = process.tick();
2471    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2472    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2473    /// stream1.join(stream2)
2474    /// # }, |mut stream| async move {
2475    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2476    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2477    /// # stream.map(|i| assert!(expected.contains(&i)));
2478    /// # }));
2479    /// # }
2480    pub fn join<V2, O2: Ordering, R2: Retries>(
2481        self,
2482        n: Stream<(K, V2), L, B, O2, R2>,
2483    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2484    where
2485        K: Eq + Hash,
2486        R: MinRetries<R2>,
2487    {
2488        check_matching_location(&self.location, &n.location);
2489
2490        Stream::new(
2491            self.location.clone(),
2492            HydroNode::Join {
2493                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2494                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2495                metadata: self.location.new_node_metadata(Stream::<
2496                    (K, (V1, V2)),
2497                    L,
2498                    B,
2499                    NoOrder,
2500                    <R as MinRetries<R2>>::Min,
2501                >::collection_kind()),
2502            },
2503        )
2504    }
2505
2506    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2507    /// computes the anti-join of the items in the input -- i.e. returns
2508    /// unique items in the first input that do not have a matching key
2509    /// in the second input.
2510    ///
2511    /// # Example
2512    /// ```rust
2513    /// # #[cfg(feature = "deploy")] {
2514    /// # use hydro_lang::prelude::*;
2515    /// # use futures::StreamExt;
2516    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2517    /// let tick = process.tick();
2518    /// let stream = process
2519    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2520    ///   .batch(&tick, nondet!(/** test */));
2521    /// let batch = process
2522    ///   .source_iter(q!(vec![1, 2]))
2523    ///   .batch(&tick, nondet!(/** test */));
2524    /// stream.anti_join(batch).all_ticks()
2525    /// # }, |mut stream| async move {
2526    /// # for w in vec![(3, 'c'), (4, 'd')] {
2527    /// #     assert_eq!(stream.next().await.unwrap(), w);
2528    /// # }
2529    /// # }));
2530    /// # }
2531    pub fn anti_join<O2: Ordering, R2: Retries>(
2532        self,
2533        n: Stream<K, L, Bounded, O2, R2>,
2534    ) -> Stream<(K, V1), L, B, O, R>
2535    where
2536        K: Eq + Hash,
2537    {
2538        check_matching_location(&self.location, &n.location);
2539
2540        Stream::new(
2541            self.location.clone(),
2542            HydroNode::AntiJoin {
2543                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2544                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2545                metadata: self
2546                    .location
2547                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2548            },
2549        )
2550    }
2551}
2552
2553impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2554    Stream<(K, V), L, B, O, R>
2555{
2556    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2557    /// is used as the key and the second element is added to the entries associated with that key.
2558    ///
2559    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2560    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2561    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2562    /// total ordering _within_ each group but no ordering _across_ groups.
2563    ///
2564    /// # Example
2565    /// ```rust
2566    /// # #[cfg(feature = "deploy")] {
2567    /// # use hydro_lang::prelude::*;
2568    /// # use futures::StreamExt;
2569    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2570    /// process
2571    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2572    ///     .into_keyed()
2573    /// #   .entries()
2574    /// # }, |mut stream| async move {
2575    /// // { 1: [2, 3], 2: [4] }
2576    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2577    /// #     assert_eq!(stream.next().await.unwrap(), w);
2578    /// # }
2579    /// # }));
2580    /// # }
2581    /// ```
2582    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2583        KeyedStream::new(
2584            self.location.clone(),
2585            HydroNode::Cast {
2586                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2587                metadata: self
2588                    .location
2589                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2590            },
2591        )
2592    }
2593}
2594
2595impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2596where
2597    K: Eq + Hash,
2598    L: Location<'a>,
2599{
2600    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2601    /// # Example
2602    /// ```rust
2603    /// # #[cfg(feature = "deploy")] {
2604    /// # use hydro_lang::prelude::*;
2605    /// # use futures::StreamExt;
2606    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2607    /// let tick = process.tick();
2608    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2609    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2610    /// batch.keys().all_ticks()
2611    /// # }, |mut stream| async move {
2612    /// // 1, 2
2613    /// # assert_eq!(stream.next().await.unwrap(), 1);
2614    /// # assert_eq!(stream.next().await.unwrap(), 2);
2615    /// # }));
2616    /// # }
2617    /// ```
2618    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2619        self.into_keyed()
2620            .fold(
2621                q!(|| ()),
2622                q!(
2623                    |_, _| {},
2624                    commutative = manual_proof!(/** values are ignored */),
2625                    idempotent = manual_proof!(/** values are ignored */)
2626                ),
2627            )
2628            .keys()
2629    }
2630}
2631
2632impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2633where
2634    L: Location<'a> + NoTick,
2635{
2636    /// Returns a stream corresponding to the latest batch of elements being atomically
2637    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2638    /// the order of the input.
2639    ///
2640    /// # Non-Determinism
2641    /// The batch boundaries are non-deterministic and may change across executions.
2642    pub fn batch_atomic(
2643        self,
2644        tick: &Tick<L>,
2645        _nondet: NonDet,
2646    ) -> Stream<T, Tick<L>, Bounded, O, R> {
2647        Stream::new(
2648            tick.clone(),
2649            HydroNode::Batch {
2650                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2651                metadata: tick
2652                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2653            },
2654        )
2655    }
2656
2657    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2658    /// See [`Stream::atomic`] for more details.
2659    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2660        Stream::new(
2661            self.location.tick.l.clone(),
2662            HydroNode::EndAtomic {
2663                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2664                metadata: self
2665                    .location
2666                    .tick
2667                    .l
2668                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2669            },
2670        )
2671    }
2672}
2673
2674impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2675where
2676    L: Location<'a> + NoTick + NoAtomic,
2677    F: Future<Output = T>,
2678{
2679    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2680    /// Future outputs are produced as available, regardless of input arrival order.
2681    ///
2682    /// # Example
2683    /// ```rust
2684    /// # #[cfg(feature = "deploy")] {
2685    /// # use std::collections::HashSet;
2686    /// # use futures::StreamExt;
2687    /// # use hydro_lang::prelude::*;
2688    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2689    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2690    ///     .map(q!(|x| async move {
2691    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2692    ///         x
2693    ///     }))
2694    ///     .resolve_futures()
2695    /// #   },
2696    /// #   |mut stream| async move {
2697    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2698    /// #       let mut output = HashSet::new();
2699    /// #       for _ in 1..10 {
2700    /// #           output.insert(stream.next().await.unwrap());
2701    /// #       }
2702    /// #       assert_eq!(
2703    /// #           output,
2704    /// #           HashSet::<i32>::from_iter(1..10)
2705    /// #       );
2706    /// #   },
2707    /// # ));
2708    /// # }
2709    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2710        Stream::new(
2711            self.location.clone(),
2712            HydroNode::ResolveFutures {
2713                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2714                metadata: self
2715                    .location
2716                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2717            },
2718        )
2719    }
2720
2721    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2722    /// Future outputs are produced in the same order as the input stream.
2723    ///
2724    /// # Example
2725    /// ```rust
2726    /// # #[cfg(feature = "deploy")] {
2727    /// # use std::collections::HashSet;
2728    /// # use futures::StreamExt;
2729    /// # use hydro_lang::prelude::*;
2730    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2731    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2732    ///     .map(q!(|x| async move {
2733    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2734    ///         x
2735    ///     }))
2736    ///     .resolve_futures_ordered()
2737    /// #   },
2738    /// #   |mut stream| async move {
2739    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2740    /// #       let mut output = Vec::new();
2741    /// #       for _ in 1..10 {
2742    /// #           output.push(stream.next().await.unwrap());
2743    /// #       }
2744    /// #       assert_eq!(
2745    /// #           output,
2746    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2747    /// #       );
2748    /// #   },
2749    /// # ));
2750    /// # }
2751    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2752        Stream::new(
2753            self.location.clone(),
2754            HydroNode::ResolveFuturesOrdered {
2755                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2756                metadata: self
2757                    .location
2758                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2759            },
2760        )
2761    }
2762}
2763
2764impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2765where
2766    L: Location<'a>,
2767{
2768    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2769    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2770    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2771        Stream::new(
2772            self.location.outer().clone(),
2773            HydroNode::YieldConcat {
2774                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2775                metadata: self
2776                    .location
2777                    .outer()
2778                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2779            },
2780        )
2781    }
2782
2783    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2784    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2785    ///
2786    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2787    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2788    /// stream's [`Tick`] context.
2789    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2790        let out_location = Atomic {
2791            tick: self.location.clone(),
2792        };
2793
2794        Stream::new(
2795            out_location.clone(),
2796            HydroNode::YieldConcat {
2797                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2798                metadata: out_location
2799                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2800            },
2801        )
2802    }
2803
2804    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2805    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2806    /// input.
2807    ///
2808    /// This API is particularly useful for stateful computation on batches of data, such as
2809    /// maintaining an accumulated state that is up to date with the current batch.
2810    ///
2811    /// # Example
2812    /// ```rust
2813    /// # #[cfg(feature = "deploy")] {
2814    /// # use hydro_lang::prelude::*;
2815    /// # use futures::StreamExt;
2816    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2817    /// let tick = process.tick();
2818    /// # // ticks are lazy by default, forces the second tick to run
2819    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2820    /// # let batch_first_tick = process
2821    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2822    /// #  .batch(&tick, nondet!(/** test */));
2823    /// # let batch_second_tick = process
2824    /// #   .source_iter(q!(vec![5, 6, 7]))
2825    /// #   .batch(&tick, nondet!(/** test */))
2826    /// #   .defer_tick(); // appears on the second tick
2827    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2828    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2829    ///
2830    /// input.batch(&tick, nondet!(/** test */))
2831    ///     .across_ticks(|s| s.count()).all_ticks()
2832    /// # }, |mut stream| async move {
2833    /// // [4, 7]
2834    /// assert_eq!(stream.next().await.unwrap(), 4);
2835    /// assert_eq!(stream.next().await.unwrap(), 7);
2836    /// # }));
2837    /// # }
2838    /// ```
2839    pub fn across_ticks<Out: BatchAtomic>(
2840        self,
2841        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2842    ) -> Out::Batched {
2843        thunk(self.all_ticks_atomic()).batched_atomic()
2844    }
2845
2846    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2847    /// always has the elements of `self` at tick `T - 1`.
2848    ///
2849    /// At tick `0`, the output stream is empty, since there is no previous tick.
2850    ///
2851    /// This operator enables stateful iterative processing with ticks, by sending data from one
2852    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2853    ///
2854    /// # Example
2855    /// ```rust
2856    /// # #[cfg(feature = "deploy")] {
2857    /// # use hydro_lang::prelude::*;
2858    /// # use futures::StreamExt;
2859    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2860    /// let tick = process.tick();
2861    /// // ticks are lazy by default, forces the second tick to run
2862    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2863    ///
2864    /// let batch_first_tick = process
2865    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2866    ///   .batch(&tick, nondet!(/** test */));
2867    /// let batch_second_tick = process
2868    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2869    ///   .batch(&tick, nondet!(/** test */))
2870    ///   .defer_tick(); // appears on the second tick
2871    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2872    ///
2873    /// changes_across_ticks.clone().filter_not_in(
2874    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2875    /// ).all_ticks()
2876    /// # }, |mut stream| async move {
2877    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2878    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2879    /// #     assert_eq!(stream.next().await.unwrap(), w);
2880    /// # }
2881    /// # }));
2882    /// # }
2883    /// ```
2884    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2885        Stream::new(
2886            self.location.clone(),
2887            HydroNode::DeferTick {
2888                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2889                metadata: self
2890                    .location
2891                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2892            },
2893        )
2894    }
2895}
2896
2897#[cfg(test)]
2898mod tests {
2899    #[cfg(feature = "deploy")]
2900    use futures::{SinkExt, StreamExt};
2901    #[cfg(feature = "deploy")]
2902    use hydro_deploy::Deployment;
2903    #[cfg(feature = "deploy")]
2904    use serde::{Deserialize, Serialize};
2905    #[cfg(any(feature = "deploy", feature = "sim"))]
2906    use stageleft::q;
2907
2908    #[cfg(any(feature = "deploy", feature = "sim"))]
2909    use crate::compile::builder::FlowBuilder;
2910    #[cfg(feature = "deploy")]
2911    use crate::live_collections::sliced::sliced;
2912    #[cfg(feature = "deploy")]
2913    use crate::live_collections::stream::ExactlyOnce;
2914    #[cfg(feature = "sim")]
2915    use crate::live_collections::stream::NoOrder;
2916    #[cfg(any(feature = "deploy", feature = "sim"))]
2917    use crate::live_collections::stream::TotalOrder;
2918    #[cfg(any(feature = "deploy", feature = "sim"))]
2919    use crate::location::Location;
2920    #[cfg(any(feature = "deploy", feature = "sim"))]
2921    use crate::nondet::nondet;
2922
2923    mod backtrace_chained_ops;
2924
2925    #[cfg(feature = "deploy")]
2926    struct P1 {}
2927    #[cfg(feature = "deploy")]
2928    struct P2 {}
2929
2930    #[cfg(feature = "deploy")]
2931    #[derive(Serialize, Deserialize, Debug)]
2932    struct SendOverNetwork {
2933        n: u32,
2934    }
2935
2936    #[cfg(feature = "deploy")]
2937    #[tokio::test]
2938    async fn first_ten_distributed() {
2939        use crate::networking::TCP;
2940
2941        let mut deployment = Deployment::new();
2942
2943        let mut flow = FlowBuilder::new();
2944        let first_node = flow.process::<P1>();
2945        let second_node = flow.process::<P2>();
2946        let external = flow.external::<P2>();
2947
2948        let numbers = first_node.source_iter(q!(0..10));
2949        let out_port = numbers
2950            .map(q!(|n| SendOverNetwork { n }))
2951            .send(&second_node, TCP.fail_stop().bincode())
2952            .send_bincode_external(&external);
2953
2954        let nodes = flow
2955            .with_process(&first_node, deployment.Localhost())
2956            .with_process(&second_node, deployment.Localhost())
2957            .with_external(&external, deployment.Localhost())
2958            .deploy(&mut deployment);
2959
2960        deployment.deploy().await.unwrap();
2961
2962        let mut external_out = nodes.connect(out_port).await;
2963
2964        deployment.start().await.unwrap();
2965
2966        for i in 0..10 {
2967            assert_eq!(external_out.next().await.unwrap().n, i);
2968        }
2969    }
2970
2971    #[cfg(feature = "deploy")]
2972    #[tokio::test]
2973    async fn first_cardinality() {
2974        let mut deployment = Deployment::new();
2975
2976        let mut flow = FlowBuilder::new();
2977        let node = flow.process::<()>();
2978        let external = flow.external::<()>();
2979
2980        let node_tick = node.tick();
2981        let count = node_tick
2982            .singleton(q!([1, 2, 3]))
2983            .into_stream()
2984            .flatten_ordered()
2985            .first()
2986            .into_stream()
2987            .count()
2988            .all_ticks()
2989            .send_bincode_external(&external);
2990
2991        let nodes = flow
2992            .with_process(&node, deployment.Localhost())
2993            .with_external(&external, deployment.Localhost())
2994            .deploy(&mut deployment);
2995
2996        deployment.deploy().await.unwrap();
2997
2998        let mut external_out = nodes.connect(count).await;
2999
3000        deployment.start().await.unwrap();
3001
3002        assert_eq!(external_out.next().await.unwrap(), 1);
3003    }
3004
3005    #[cfg(feature = "deploy")]
3006    #[tokio::test]
3007    async fn unbounded_reduce_remembers_state() {
3008        let mut deployment = Deployment::new();
3009
3010        let mut flow = FlowBuilder::new();
3011        let node = flow.process::<()>();
3012        let external = flow.external::<()>();
3013
3014        let (input_port, input) = node.source_external_bincode(&external);
3015        let out = input
3016            .reduce(q!(|acc, v| *acc += v))
3017            .sample_eager(nondet!(/** test */))
3018            .send_bincode_external(&external);
3019
3020        let nodes = flow
3021            .with_process(&node, deployment.Localhost())
3022            .with_external(&external, deployment.Localhost())
3023            .deploy(&mut deployment);
3024
3025        deployment.deploy().await.unwrap();
3026
3027        let mut external_in = nodes.connect(input_port).await;
3028        let mut external_out = nodes.connect(out).await;
3029
3030        deployment.start().await.unwrap();
3031
3032        external_in.send(1).await.unwrap();
3033        assert_eq!(external_out.next().await.unwrap(), 1);
3034
3035        external_in.send(2).await.unwrap();
3036        assert_eq!(external_out.next().await.unwrap(), 3);
3037    }
3038
3039    #[cfg(feature = "deploy")]
3040    #[tokio::test]
3041    async fn top_level_bounded_cross_singleton() {
3042        let mut deployment = Deployment::new();
3043
3044        let mut flow = FlowBuilder::new();
3045        let node = flow.process::<()>();
3046        let external = flow.external::<()>();
3047
3048        let (input_port, input) =
3049            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3050
3051        let out = input
3052            .cross_singleton(
3053                node.source_iter(q!(vec![1, 2, 3]))
3054                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3055            )
3056            .send_bincode_external(&external);
3057
3058        let nodes = flow
3059            .with_process(&node, deployment.Localhost())
3060            .with_external(&external, deployment.Localhost())
3061            .deploy(&mut deployment);
3062
3063        deployment.deploy().await.unwrap();
3064
3065        let mut external_in = nodes.connect(input_port).await;
3066        let mut external_out = nodes.connect(out).await;
3067
3068        deployment.start().await.unwrap();
3069
3070        external_in.send(1).await.unwrap();
3071        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3072
3073        external_in.send(2).await.unwrap();
3074        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3075    }
3076
3077    #[cfg(feature = "deploy")]
3078    #[tokio::test]
3079    async fn top_level_bounded_reduce_cardinality() {
3080        let mut deployment = Deployment::new();
3081
3082        let mut flow = FlowBuilder::new();
3083        let node = flow.process::<()>();
3084        let external = flow.external::<()>();
3085
3086        let (input_port, input) =
3087            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3088
3089        let out = sliced! {
3090            let input = use(input, nondet!(/** test */));
3091            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3092            input.cross_singleton(v.into_stream().count())
3093        }
3094        .send_bincode_external(&external);
3095
3096        let nodes = flow
3097            .with_process(&node, deployment.Localhost())
3098            .with_external(&external, deployment.Localhost())
3099            .deploy(&mut deployment);
3100
3101        deployment.deploy().await.unwrap();
3102
3103        let mut external_in = nodes.connect(input_port).await;
3104        let mut external_out = nodes.connect(out).await;
3105
3106        deployment.start().await.unwrap();
3107
3108        external_in.send(1).await.unwrap();
3109        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3110
3111        external_in.send(2).await.unwrap();
3112        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3113    }
3114
3115    #[cfg(feature = "deploy")]
3116    #[tokio::test]
3117    async fn top_level_bounded_into_singleton_cardinality() {
3118        let mut deployment = Deployment::new();
3119
3120        let mut flow = FlowBuilder::new();
3121        let node = flow.process::<()>();
3122        let external = flow.external::<()>();
3123
3124        let (input_port, input) =
3125            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3126
3127        let out = sliced! {
3128            let input = use(input, nondet!(/** test */));
3129            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3130            input.cross_singleton(v.into_stream().count())
3131        }
3132        .send_bincode_external(&external);
3133
3134        let nodes = flow
3135            .with_process(&node, deployment.Localhost())
3136            .with_external(&external, deployment.Localhost())
3137            .deploy(&mut deployment);
3138
3139        deployment.deploy().await.unwrap();
3140
3141        let mut external_in = nodes.connect(input_port).await;
3142        let mut external_out = nodes.connect(out).await;
3143
3144        deployment.start().await.unwrap();
3145
3146        external_in.send(1).await.unwrap();
3147        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3148
3149        external_in.send(2).await.unwrap();
3150        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3151    }
3152
3153    #[cfg(feature = "deploy")]
3154    #[tokio::test]
3155    async fn atomic_fold_replays_each_tick() {
3156        let mut deployment = Deployment::new();
3157
3158        let mut flow = FlowBuilder::new();
3159        let node = flow.process::<()>();
3160        let external = flow.external::<()>();
3161
3162        let (input_port, input) =
3163            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3164        let tick = node.tick();
3165
3166        let out = input
3167            .batch(&tick, nondet!(/** test */))
3168            .cross_singleton(
3169                node.source_iter(q!(vec![1, 2, 3]))
3170                    .atomic()
3171                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3172                    .snapshot_atomic(&tick, nondet!(/** test */)),
3173            )
3174            .all_ticks()
3175            .send_bincode_external(&external);
3176
3177        let nodes = flow
3178            .with_process(&node, deployment.Localhost())
3179            .with_external(&external, deployment.Localhost())
3180            .deploy(&mut deployment);
3181
3182        deployment.deploy().await.unwrap();
3183
3184        let mut external_in = nodes.connect(input_port).await;
3185        let mut external_out = nodes.connect(out).await;
3186
3187        deployment.start().await.unwrap();
3188
3189        external_in.send(1).await.unwrap();
3190        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3191
3192        external_in.send(2).await.unwrap();
3193        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3194    }
3195
3196    #[cfg(feature = "deploy")]
3197    #[tokio::test]
3198    async fn unbounded_scan_remembers_state() {
3199        let mut deployment = Deployment::new();
3200
3201        let mut flow = FlowBuilder::new();
3202        let node = flow.process::<()>();
3203        let external = flow.external::<()>();
3204
3205        let (input_port, input) = node.source_external_bincode(&external);
3206        let out = input
3207            .scan(
3208                q!(|| 0),
3209                q!(|acc, v| {
3210                    *acc += v;
3211                    Some(*acc)
3212                }),
3213            )
3214            .send_bincode_external(&external);
3215
3216        let nodes = flow
3217            .with_process(&node, deployment.Localhost())
3218            .with_external(&external, deployment.Localhost())
3219            .deploy(&mut deployment);
3220
3221        deployment.deploy().await.unwrap();
3222
3223        let mut external_in = nodes.connect(input_port).await;
3224        let mut external_out = nodes.connect(out).await;
3225
3226        deployment.start().await.unwrap();
3227
3228        external_in.send(1).await.unwrap();
3229        assert_eq!(external_out.next().await.unwrap(), 1);
3230
3231        external_in.send(2).await.unwrap();
3232        assert_eq!(external_out.next().await.unwrap(), 3);
3233    }
3234
3235    #[cfg(feature = "deploy")]
3236    #[tokio::test]
3237    async fn unbounded_enumerate_remembers_state() {
3238        let mut deployment = Deployment::new();
3239
3240        let mut flow = FlowBuilder::new();
3241        let node = flow.process::<()>();
3242        let external = flow.external::<()>();
3243
3244        let (input_port, input) = node.source_external_bincode(&external);
3245        let out = input.enumerate().send_bincode_external(&external);
3246
3247        let nodes = flow
3248            .with_process(&node, deployment.Localhost())
3249            .with_external(&external, deployment.Localhost())
3250            .deploy(&mut deployment);
3251
3252        deployment.deploy().await.unwrap();
3253
3254        let mut external_in = nodes.connect(input_port).await;
3255        let mut external_out = nodes.connect(out).await;
3256
3257        deployment.start().await.unwrap();
3258
3259        external_in.send(1).await.unwrap();
3260        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3261
3262        external_in.send(2).await.unwrap();
3263        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3264    }
3265
3266    #[cfg(feature = "deploy")]
3267    #[tokio::test]
3268    async fn unbounded_unique_remembers_state() {
3269        let mut deployment = Deployment::new();
3270
3271        let mut flow = FlowBuilder::new();
3272        let node = flow.process::<()>();
3273        let external = flow.external::<()>();
3274
3275        let (input_port, input) =
3276            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3277        let out = input.unique().send_bincode_external(&external);
3278
3279        let nodes = flow
3280            .with_process(&node, deployment.Localhost())
3281            .with_external(&external, deployment.Localhost())
3282            .deploy(&mut deployment);
3283
3284        deployment.deploy().await.unwrap();
3285
3286        let mut external_in = nodes.connect(input_port).await;
3287        let mut external_out = nodes.connect(out).await;
3288
3289        deployment.start().await.unwrap();
3290
3291        external_in.send(1).await.unwrap();
3292        assert_eq!(external_out.next().await.unwrap(), 1);
3293
3294        external_in.send(2).await.unwrap();
3295        assert_eq!(external_out.next().await.unwrap(), 2);
3296
3297        external_in.send(1).await.unwrap();
3298        external_in.send(3).await.unwrap();
3299        assert_eq!(external_out.next().await.unwrap(), 3);
3300    }
3301
3302    #[cfg(feature = "sim")]
3303    #[test]
3304    #[should_panic]
3305    fn sim_batch_nondet_size() {
3306        let mut flow = FlowBuilder::new();
3307        let node = flow.process::<()>();
3308
3309        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3310
3311        let tick = node.tick();
3312        let out_recv = input
3313            .batch(&tick, nondet!(/** test */))
3314            .count()
3315            .all_ticks()
3316            .sim_output();
3317
3318        flow.sim().exhaustive(async || {
3319            in_send.send(());
3320            in_send.send(());
3321            in_send.send(());
3322
3323            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3324        });
3325    }
3326
3327    #[cfg(feature = "sim")]
3328    #[test]
3329    fn sim_batch_preserves_order() {
3330        let mut flow = FlowBuilder::new();
3331        let node = flow.process::<()>();
3332
3333        let (in_send, input) = node.sim_input();
3334
3335        let tick = node.tick();
3336        let out_recv = input
3337            .batch(&tick, nondet!(/** test */))
3338            .all_ticks()
3339            .sim_output();
3340
3341        flow.sim().exhaustive(async || {
3342            in_send.send(1);
3343            in_send.send(2);
3344            in_send.send(3);
3345
3346            out_recv.assert_yields_only([1, 2, 3]).await;
3347        });
3348    }
3349
3350    #[cfg(feature = "sim")]
3351    #[test]
3352    #[should_panic]
3353    fn sim_batch_unordered_shuffles() {
3354        let mut flow = FlowBuilder::new();
3355        let node = flow.process::<()>();
3356
3357        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3358
3359        let tick = node.tick();
3360        let batch = input.batch(&tick, nondet!(/** test */));
3361        let out_recv = batch
3362            .clone()
3363            .min()
3364            .zip(batch.max())
3365            .all_ticks()
3366            .sim_output();
3367
3368        flow.sim().exhaustive(async || {
3369            in_send.send_many_unordered([1, 2, 3]);
3370
3371            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3372                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3373            }
3374        });
3375    }
3376
3377    #[cfg(feature = "sim")]
3378    #[test]
3379    fn sim_batch_unordered_shuffles_count() {
3380        let mut flow = FlowBuilder::new();
3381        let node = flow.process::<()>();
3382
3383        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3384
3385        let tick = node.tick();
3386        let batch = input.batch(&tick, nondet!(/** test */));
3387        let out_recv = batch.all_ticks().sim_output();
3388
3389        let instance_count = flow.sim().exhaustive(async || {
3390            in_send.send_many_unordered([1, 2, 3, 4]);
3391            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3392        });
3393
3394        assert_eq!(
3395            instance_count,
3396            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3397        )
3398    }
3399
3400    #[cfg(feature = "sim")]
3401    #[test]
3402    #[should_panic]
3403    fn sim_observe_order_batched() {
3404        let mut flow = FlowBuilder::new();
3405        let node = flow.process::<()>();
3406
3407        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3408
3409        let tick = node.tick();
3410        let batch = input.batch(&tick, nondet!(/** test */));
3411        let out_recv = batch
3412            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3413            .all_ticks()
3414            .sim_output();
3415
3416        flow.sim().exhaustive(async || {
3417            in_send.send_many_unordered([1, 2, 3, 4]);
3418            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3419        });
3420    }
3421
3422    #[cfg(feature = "sim")]
3423    #[test]
3424    fn sim_observe_order_batched_count() {
3425        let mut flow = FlowBuilder::new();
3426        let node = flow.process::<()>();
3427
3428        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3429
3430        let tick = node.tick();
3431        let batch = input.batch(&tick, nondet!(/** test */));
3432        let out_recv = batch
3433            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3434            .all_ticks()
3435            .sim_output();
3436
3437        let instance_count = flow.sim().exhaustive(async || {
3438            in_send.send_many_unordered([1, 2, 3, 4]);
3439            let _ = out_recv.collect::<Vec<_>>().await;
3440        });
3441
3442        assert_eq!(
3443            instance_count,
3444            192 // 4! * 2^{4 - 1}
3445        )
3446    }
3447
3448    #[cfg(feature = "sim")]
3449    #[test]
3450    fn sim_unordered_count_instance_count() {
3451        let mut flow = FlowBuilder::new();
3452        let node = flow.process::<()>();
3453
3454        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3455
3456        let tick = node.tick();
3457        let out_recv = input
3458            .count()
3459            .snapshot(&tick, nondet!(/** test */))
3460            .all_ticks()
3461            .sim_output();
3462
3463        let instance_count = flow.sim().exhaustive(async || {
3464            in_send.send_many_unordered([1, 2, 3, 4]);
3465            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3466        });
3467
3468        assert_eq!(
3469            instance_count,
3470            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3471        )
3472    }
3473
3474    #[cfg(feature = "sim")]
3475    #[test]
3476    fn sim_top_level_assume_ordering() {
3477        let mut flow = FlowBuilder::new();
3478        let node = flow.process::<()>();
3479
3480        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3481
3482        let out_recv = input
3483            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3484            .sim_output();
3485
3486        let instance_count = flow.sim().exhaustive(async || {
3487            in_send.send_many_unordered([1, 2, 3]);
3488            let mut out = out_recv.collect::<Vec<_>>().await;
3489            out.sort();
3490            assert_eq!(out, vec![1, 2, 3]);
3491        });
3492
3493        assert_eq!(instance_count, 6)
3494    }
3495
3496    #[cfg(feature = "sim")]
3497    #[test]
3498    fn sim_top_level_assume_ordering_cycle_back() {
3499        let mut flow = FlowBuilder::new();
3500        let node = flow.process::<()>();
3501
3502        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3503
3504        let (complete_cycle_back, cycle_back) =
3505            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3506        let ordered = input
3507            .merge_unordered(cycle_back)
3508            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3509        complete_cycle_back.complete(
3510            ordered
3511                .clone()
3512                .map(q!(|v| v + 1))
3513                .filter(q!(|v| v % 2 == 1)),
3514        );
3515
3516        let out_recv = ordered.sim_output();
3517
3518        let mut saw = false;
3519        let instance_count = flow.sim().exhaustive(async || {
3520            in_send.send_many_unordered([0, 2]);
3521            let out = out_recv.collect::<Vec<_>>().await;
3522
3523            if out.starts_with(&[0, 1, 2]) {
3524                saw = true;
3525            }
3526        });
3527
3528        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3529        assert_eq!(instance_count, 6)
3530    }
3531
3532    #[cfg(feature = "sim")]
3533    #[test]
3534    fn sim_top_level_assume_ordering_cycle_back_tick() {
3535        let mut flow = FlowBuilder::new();
3536        let node = flow.process::<()>();
3537
3538        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3539
3540        let (complete_cycle_back, cycle_back) =
3541            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3542        let ordered = input
3543            .merge_unordered(cycle_back)
3544            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3545        complete_cycle_back.complete(
3546            ordered
3547                .clone()
3548                .batch(&node.tick(), nondet!(/** test */))
3549                .all_ticks()
3550                .map(q!(|v| v + 1))
3551                .filter(q!(|v| v % 2 == 1)),
3552        );
3553
3554        let out_recv = ordered.sim_output();
3555
3556        let mut saw = false;
3557        let instance_count = flow.sim().exhaustive(async || {
3558            in_send.send_many_unordered([0, 2]);
3559            let out = out_recv.collect::<Vec<_>>().await;
3560
3561            if out.starts_with(&[0, 1, 2]) {
3562                saw = true;
3563            }
3564        });
3565
3566        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3567        assert_eq!(instance_count, 58)
3568    }
3569
3570    #[cfg(feature = "sim")]
3571    #[test]
3572    fn sim_top_level_assume_ordering_multiple() {
3573        let mut flow = FlowBuilder::new();
3574        let node = flow.process::<()>();
3575
3576        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3577        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3578
3579        let (complete_cycle_back, cycle_back) =
3580            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3581        let input1_ordered = input
3582            .clone()
3583            .merge_unordered(cycle_back)
3584            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3585        let foo = input1_ordered
3586            .clone()
3587            .map(q!(|v| v + 3))
3588            .weaken_ordering::<NoOrder>()
3589            .merge_unordered(input2)
3590            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3591
3592        complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3593
3594        let out_recv = input1_ordered.sim_output();
3595
3596        let mut saw = false;
3597        let instance_count = flow.sim().exhaustive(async || {
3598            in_send.send_many_unordered([0, 1]);
3599            let out = out_recv.collect::<Vec<_>>().await;
3600
3601            if out.starts_with(&[0, 3, 1]) {
3602                saw = true;
3603            }
3604        });
3605
3606        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3607        assert_eq!(instance_count, 24)
3608    }
3609
3610    #[cfg(feature = "sim")]
3611    #[test]
3612    fn sim_atomic_assume_ordering_cycle_back() {
3613        let mut flow = FlowBuilder::new();
3614        let node = flow.process::<()>();
3615
3616        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3617
3618        let (complete_cycle_back, cycle_back) =
3619            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3620        let ordered = input
3621            .merge_unordered(cycle_back)
3622            .atomic()
3623            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3624            .end_atomic();
3625        complete_cycle_back.complete(
3626            ordered
3627                .clone()
3628                .map(q!(|v| v + 1))
3629                .filter(q!(|v| v % 2 == 1)),
3630        );
3631
3632        let out_recv = ordered.sim_output();
3633
3634        let instance_count = flow.sim().exhaustive(async || {
3635            in_send.send_many_unordered([0, 2]);
3636            let out = out_recv.collect::<Vec<_>>().await;
3637            assert_eq!(out.len(), 4);
3638        });
3639
3640        assert_eq!(instance_count, 22)
3641    }
3642
3643    #[cfg(feature = "deploy")]
3644    #[tokio::test]
3645    async fn partition_evens_odds() {
3646        let mut deployment = Deployment::new();
3647
3648        let mut flow = FlowBuilder::new();
3649        let node = flow.process::<()>();
3650        let external = flow.external::<()>();
3651
3652        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3653        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3654        let evens_port = evens.send_bincode_external(&external);
3655        let odds_port = odds.send_bincode_external(&external);
3656
3657        let nodes = flow
3658            .with_process(&node, deployment.Localhost())
3659            .with_external(&external, deployment.Localhost())
3660            .deploy(&mut deployment);
3661
3662        deployment.deploy().await.unwrap();
3663
3664        let mut evens_out = nodes.connect(evens_port).await;
3665        let mut odds_out = nodes.connect(odds_port).await;
3666
3667        deployment.start().await.unwrap();
3668
3669        let mut even_results = Vec::new();
3670        for _ in 0..3 {
3671            even_results.push(evens_out.next().await.unwrap());
3672        }
3673        even_results.sort();
3674        assert_eq!(even_results, vec![2, 4, 6]);
3675
3676        let mut odd_results = Vec::new();
3677        for _ in 0..3 {
3678            odd_results.push(odds_out.next().await.unwrap());
3679        }
3680        odd_results.sort();
3681        assert_eq!(odd_results, vec![1, 3, 5]);
3682    }
3683
3684    #[cfg(feature = "deploy")]
3685    #[tokio::test]
3686    async fn unconsumed_inspect_still_runs() {
3687        use crate::deploy::DeployCrateWrapper;
3688
3689        let mut deployment = Deployment::new();
3690
3691        let mut flow = FlowBuilder::new();
3692        let node = flow.process::<()>();
3693
3694        // The return value of .inspect() is intentionally dropped.
3695        // Before the Null-root fix, this would silently do nothing.
3696        node.source_iter(q!(0..5))
3697            .inspect(q!(|x| println!("inspect: {}", x)));
3698
3699        let nodes = flow
3700            .with_process(&node, deployment.Localhost())
3701            .deploy(&mut deployment);
3702
3703        deployment.deploy().await.unwrap();
3704
3705        let mut stdout = nodes.get_process(&node).stdout();
3706
3707        deployment.start().await.unwrap();
3708
3709        let mut lines = Vec::new();
3710        for _ in 0..5 {
3711            lines.push(stdout.recv().await.unwrap());
3712        }
3713        lines.sort();
3714        assert_eq!(
3715            lines,
3716            vec![
3717                "inspect: 0",
3718                "inspect: 1",
3719                "inspect: 2",
3720                "inspect: 3",
3721                "inspect: 4",
3722            ]
3723        );
3724    }
3725}