1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl From<syn::Expr> for DebugExpr {
44 fn from(expr: syn::Expr) -> Self {
45 Self(Box::new(expr))
46 }
47}
48
49impl Deref for DebugExpr {
50 type Target = syn::Expr;
51
52 fn deref(&self) -> &Self::Target {
53 &self.0
54 }
55}
56
57impl ToTokens for DebugExpr {
58 fn to_tokens(&self, tokens: &mut TokenStream) {
59 self.0.to_tokens(tokens);
60 }
61}
62
63impl Debug for DebugExpr {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "{}", self.0.to_token_stream())
66 }
67}
68
69impl Display for DebugExpr {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 let original = self.0.as_ref().clone();
72 let simplified = simplify_q_macro(original);
73
74 write!(f, "q!({})", quote::quote!(#simplified))
77 }
78}
79
80fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
82 let mut simplifier = QMacroSimplifier::new();
85 simplifier.visit_expr_mut(&mut expr);
86
87 if let Some(simplified) = simplifier.simplified_result {
89 simplified
90 } else {
91 expr
92 }
93}
94
95#[derive(Default)]
97pub struct QMacroSimplifier {
98 pub simplified_result: Option<syn::Expr>,
99}
100
101impl QMacroSimplifier {
102 pub fn new() -> Self {
103 Self::default()
104 }
105}
106
107impl VisitMut for QMacroSimplifier {
108 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
109 if self.simplified_result.is_some() {
111 return;
112 }
113
114 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
115 && self.is_stageleft_runtime_support_call(&path_expr.path)
117 && let Some(closure) = self.extract_closure_from_args(&call.args)
119 {
120 self.simplified_result = Some(closure);
121 return;
122 }
123
124 syn::visit_mut::visit_expr_mut(self, expr);
127 }
128}
129
130impl QMacroSimplifier {
131 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
132 if let Some(last_segment) = path.segments.last() {
134 let fn_name = last_segment.ident.to_string();
135 fn_name.contains("_type_hint")
137 && path.segments.len() > 2
138 && path.segments[0].ident == "stageleft"
139 && path.segments[1].ident == "runtime_support"
140 } else {
141 false
142 }
143 }
144
145 fn extract_closure_from_args(
146 &self,
147 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
148 ) -> Option<syn::Expr> {
149 for arg in args {
151 if let syn::Expr::Closure(_) = arg {
152 return Some(arg.clone());
153 }
154 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
156 return Some(closure_expr);
157 }
158 }
159 None
160 }
161
162 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
163 let mut visitor = ClosureFinder {
164 found_closure: None,
165 prefer_inner_blocks: true,
166 };
167 visitor.visit_expr(expr);
168 visitor.found_closure
169 }
170}
171
172struct ClosureFinder {
174 found_closure: Option<syn::Expr>,
175 prefer_inner_blocks: bool,
176}
177
178impl<'ast> Visit<'ast> for ClosureFinder {
179 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
180 if self.found_closure.is_some() {
182 return;
183 }
184
185 match expr {
186 syn::Expr::Closure(_) => {
187 self.found_closure = Some(expr.clone());
188 }
189 syn::Expr::Block(block) if self.prefer_inner_blocks => {
190 for stmt in &block.block.stmts {
192 if let syn::Stmt::Expr(stmt_expr, _) = stmt
193 && let syn::Expr::Block(_) = stmt_expr
194 {
195 let mut inner_visitor = ClosureFinder {
197 found_closure: None,
198 prefer_inner_blocks: false, };
200 inner_visitor.visit_expr(stmt_expr);
201 if inner_visitor.found_closure.is_some() {
202 self.found_closure = Some(stmt_expr.clone());
204 return;
205 }
206 }
207 }
208
209 visit::visit_expr(self, expr);
211
212 if self.found_closure.is_some() {
215 }
217 }
218 _ => {
219 visit::visit_expr(self, expr);
221 }
222 }
223 }
224}
225
226#[derive(Clone, PartialEq, Eq, Hash)]
230pub struct DebugType(pub Box<syn::Type>);
231
232impl From<syn::Type> for DebugType {
233 fn from(t: syn::Type) -> Self {
234 Self(Box::new(t))
235 }
236}
237
238impl Deref for DebugType {
239 type Target = syn::Type;
240
241 fn deref(&self) -> &Self::Target {
242 &self.0
243 }
244}
245
246impl ToTokens for DebugType {
247 fn to_tokens(&self, tokens: &mut TokenStream) {
248 self.0.to_tokens(tokens);
249 }
250}
251
252impl Debug for DebugType {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 write!(f, "{}", self.0.to_token_stream())
255 }
256}
257
258pub enum DebugInstantiate {
259 Building,
260 Finalized(Box<DebugInstantiateFinalized>),
261}
262
263#[cfg_attr(
264 not(feature = "build"),
265 expect(
266 dead_code,
267 reason = "sink, source unused without `feature = \"build\"`."
268 )
269)]
270pub struct DebugInstantiateFinalized {
271 sink: syn::Expr,
272 source: syn::Expr,
273 connect_fn: Option<Box<dyn FnOnce()>>,
274}
275
276impl From<DebugInstantiateFinalized> for DebugInstantiate {
277 fn from(f: DebugInstantiateFinalized) -> Self {
278 Self::Finalized(Box::new(f))
279 }
280}
281
282impl Debug for DebugInstantiate {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 write!(f, "<network instantiate>")
285 }
286}
287
288impl Hash for DebugInstantiate {
289 fn hash<H: Hasher>(&self, _state: &mut H) {
290 }
292}
293
294impl Clone for DebugInstantiate {
295 fn clone(&self) -> Self {
296 match self {
297 DebugInstantiate::Building => DebugInstantiate::Building,
298 DebugInstantiate::Finalized(_) => {
299 panic!("DebugInstantiate::Finalized should not be cloned")
300 }
301 }
302 }
303}
304
305#[derive(Debug, Hash, Clone)]
314pub enum ClusterMembersState {
315 Uninit,
317 Stream(DebugExpr),
320 Tee(LocationId, LocationId),
324}
325
326#[derive(Debug, Hash, Clone)]
328pub enum HydroSource {
329 Stream(DebugExpr),
330 ExternalNetwork(),
331 Iter(DebugExpr),
332 Spin(),
333 ClusterMembers(LocationId, ClusterMembersState),
334 Embedded(syn::Ident),
335 EmbeddedSingleton(syn::Ident),
336}
337
338#[cfg(feature = "build")]
339pub trait DfirBuilder {
345 fn singleton_intermediates(&self) -> bool;
347
348 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
350
351 fn batch(
352 &mut self,
353 in_ident: syn::Ident,
354 in_location: &LocationId,
355 in_kind: &CollectionKind,
356 out_ident: &syn::Ident,
357 out_location: &LocationId,
358 op_meta: &HydroIrOpMetadata,
359 );
360 fn yield_from_tick(
361 &mut self,
362 in_ident: syn::Ident,
363 in_location: &LocationId,
364 in_kind: &CollectionKind,
365 out_ident: &syn::Ident,
366 out_location: &LocationId,
367 );
368
369 fn begin_atomic(
370 &mut self,
371 in_ident: syn::Ident,
372 in_location: &LocationId,
373 in_kind: &CollectionKind,
374 out_ident: &syn::Ident,
375 out_location: &LocationId,
376 op_meta: &HydroIrOpMetadata,
377 );
378 fn end_atomic(
379 &mut self,
380 in_ident: syn::Ident,
381 in_location: &LocationId,
382 in_kind: &CollectionKind,
383 out_ident: &syn::Ident,
384 );
385
386 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
387 fn observe_nondet(
388 &mut self,
389 trusted: bool,
390 location: &LocationId,
391 in_ident: syn::Ident,
392 in_kind: &CollectionKind,
393 out_ident: &syn::Ident,
394 out_kind: &CollectionKind,
395 op_meta: &HydroIrOpMetadata,
396 );
397
398 #[expect(clippy::too_many_arguments, reason = "TODO")]
399 fn create_network(
400 &mut self,
401 from: &LocationId,
402 to: &LocationId,
403 input_ident: syn::Ident,
404 out_ident: &syn::Ident,
405 serialize: Option<&DebugExpr>,
406 sink: syn::Expr,
407 source: syn::Expr,
408 deserialize: Option<&DebugExpr>,
409 tag_id: usize,
410 networking_info: &crate::networking::NetworkingInfo,
411 );
412
413 fn create_external_source(
414 &mut self,
415 on: &LocationId,
416 source_expr: syn::Expr,
417 out_ident: &syn::Ident,
418 deserialize: Option<&DebugExpr>,
419 tag_id: usize,
420 );
421
422 fn create_external_output(
423 &mut self,
424 on: &LocationId,
425 sink_expr: syn::Expr,
426 input_ident: &syn::Ident,
427 serialize: Option<&DebugExpr>,
428 tag_id: usize,
429 );
430}
431
432#[cfg(feature = "build")]
433impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
434 fn singleton_intermediates(&self) -> bool {
435 false
436 }
437
438 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
439 self.entry(location.root().key())
440 .expect("location was removed")
441 .or_default()
442 }
443
444 fn batch(
445 &mut self,
446 in_ident: syn::Ident,
447 in_location: &LocationId,
448 in_kind: &CollectionKind,
449 out_ident: &syn::Ident,
450 _out_location: &LocationId,
451 _op_meta: &HydroIrOpMetadata,
452 ) {
453 let builder = self.get_dfir_mut(in_location.root());
454 if in_kind.is_bounded()
455 && matches!(
456 in_kind,
457 CollectionKind::Singleton { .. }
458 | CollectionKind::Optional { .. }
459 | CollectionKind::KeyedSingleton { .. }
460 )
461 {
462 assert!(in_location.is_top_level());
463 builder.add_dfir(
464 parse_quote! {
465 #out_ident = #in_ident -> persist::<'static>();
466 },
467 None,
468 None,
469 );
470 } else {
471 builder.add_dfir(
472 parse_quote! {
473 #out_ident = #in_ident;
474 },
475 None,
476 None,
477 );
478 }
479 }
480
481 fn yield_from_tick(
482 &mut self,
483 in_ident: syn::Ident,
484 in_location: &LocationId,
485 _in_kind: &CollectionKind,
486 out_ident: &syn::Ident,
487 _out_location: &LocationId,
488 ) {
489 let builder = self.get_dfir_mut(in_location.root());
490 builder.add_dfir(
491 parse_quote! {
492 #out_ident = #in_ident;
493 },
494 None,
495 None,
496 );
497 }
498
499 fn begin_atomic(
500 &mut self,
501 in_ident: syn::Ident,
502 in_location: &LocationId,
503 _in_kind: &CollectionKind,
504 out_ident: &syn::Ident,
505 _out_location: &LocationId,
506 _op_meta: &HydroIrOpMetadata,
507 ) {
508 let builder = self.get_dfir_mut(in_location.root());
509 builder.add_dfir(
510 parse_quote! {
511 #out_ident = #in_ident;
512 },
513 None,
514 None,
515 );
516 }
517
518 fn end_atomic(
519 &mut self,
520 in_ident: syn::Ident,
521 in_location: &LocationId,
522 _in_kind: &CollectionKind,
523 out_ident: &syn::Ident,
524 ) {
525 let builder = self.get_dfir_mut(in_location.root());
526 builder.add_dfir(
527 parse_quote! {
528 #out_ident = #in_ident;
529 },
530 None,
531 None,
532 );
533 }
534
535 fn observe_nondet(
536 &mut self,
537 _trusted: bool,
538 location: &LocationId,
539 in_ident: syn::Ident,
540 _in_kind: &CollectionKind,
541 out_ident: &syn::Ident,
542 _out_kind: &CollectionKind,
543 _op_meta: &HydroIrOpMetadata,
544 ) {
545 let builder = self.get_dfir_mut(location);
546 builder.add_dfir(
547 parse_quote! {
548 #out_ident = #in_ident;
549 },
550 None,
551 None,
552 );
553 }
554
555 fn create_network(
556 &mut self,
557 from: &LocationId,
558 to: &LocationId,
559 input_ident: syn::Ident,
560 out_ident: &syn::Ident,
561 serialize: Option<&DebugExpr>,
562 sink: syn::Expr,
563 source: syn::Expr,
564 deserialize: Option<&DebugExpr>,
565 tag_id: usize,
566 _networking_info: &crate::networking::NetworkingInfo,
567 ) {
568 let sender_builder = self.get_dfir_mut(from);
569 if let Some(serialize_pipeline) = serialize {
570 sender_builder.add_dfir(
571 parse_quote! {
572 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
573 },
574 None,
575 Some(&format!("send{}", tag_id)),
577 );
578 } else {
579 sender_builder.add_dfir(
580 parse_quote! {
581 #input_ident -> dest_sink(#sink);
582 },
583 None,
584 Some(&format!("send{}", tag_id)),
585 );
586 }
587
588 let receiver_builder = self.get_dfir_mut(to);
589 if let Some(deserialize_pipeline) = deserialize {
590 receiver_builder.add_dfir(
591 parse_quote! {
592 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
593 },
594 None,
595 Some(&format!("recv{}", tag_id)),
596 );
597 } else {
598 receiver_builder.add_dfir(
599 parse_quote! {
600 #out_ident = source_stream(#source);
601 },
602 None,
603 Some(&format!("recv{}", tag_id)),
604 );
605 }
606 }
607
608 fn create_external_source(
609 &mut self,
610 on: &LocationId,
611 source_expr: syn::Expr,
612 out_ident: &syn::Ident,
613 deserialize: Option<&DebugExpr>,
614 tag_id: usize,
615 ) {
616 let receiver_builder = self.get_dfir_mut(on);
617 if let Some(deserialize_pipeline) = deserialize {
618 receiver_builder.add_dfir(
619 parse_quote! {
620 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
621 },
622 None,
623 Some(&format!("recv{}", tag_id)),
624 );
625 } else {
626 receiver_builder.add_dfir(
627 parse_quote! {
628 #out_ident = source_stream(#source_expr);
629 },
630 None,
631 Some(&format!("recv{}", tag_id)),
632 );
633 }
634 }
635
636 fn create_external_output(
637 &mut self,
638 on: &LocationId,
639 sink_expr: syn::Expr,
640 input_ident: &syn::Ident,
641 serialize: Option<&DebugExpr>,
642 tag_id: usize,
643 ) {
644 let sender_builder = self.get_dfir_mut(on);
645 if let Some(serialize_fn) = serialize {
646 sender_builder.add_dfir(
647 parse_quote! {
648 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
649 },
650 None,
651 Some(&format!("send{}", tag_id)),
653 );
654 } else {
655 sender_builder.add_dfir(
656 parse_quote! {
657 #input_ident -> dest_sink(#sink_expr);
658 },
659 None,
660 Some(&format!("send{}", tag_id)),
661 );
662 }
663 }
664}
665
666#[cfg(feature = "build")]
667pub enum BuildersOrCallback<'a, L, N>
668where
669 L: FnMut(&mut HydroRoot, &mut usize),
670 N: FnMut(&mut HydroNode, &mut usize),
671{
672 Builders(&'a mut dyn DfirBuilder),
673 Callback(L, N),
674}
675
676#[derive(Debug, Hash)]
680pub enum HydroRoot {
681 ForEach {
682 f: DebugExpr,
683 input: Box<HydroNode>,
684 op_metadata: HydroIrOpMetadata,
685 },
686 SendExternal {
687 to_external_key: LocationKey,
688 to_port_id: ExternalPortId,
689 to_many: bool,
690 unpaired: bool,
691 serialize_fn: Option<DebugExpr>,
692 instantiate_fn: DebugInstantiate,
693 input: Box<HydroNode>,
694 op_metadata: HydroIrOpMetadata,
695 },
696 DestSink {
697 sink: DebugExpr,
698 input: Box<HydroNode>,
699 op_metadata: HydroIrOpMetadata,
700 },
701 CycleSink {
702 cycle_id: CycleId,
703 input: Box<HydroNode>,
704 op_metadata: HydroIrOpMetadata,
705 },
706 EmbeddedOutput {
707 ident: syn::Ident,
708 input: Box<HydroNode>,
709 op_metadata: HydroIrOpMetadata,
710 },
711 Null {
712 input: Box<HydroNode>,
713 op_metadata: HydroIrOpMetadata,
714 },
715}
716
717impl HydroRoot {
718 #[cfg(feature = "build")]
719 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
720 pub fn compile_network<'a, D>(
721 &mut self,
722 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
723 seen_tees: &mut SeenSharedNodes,
724 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
725 processes: &SparseSecondaryMap<LocationKey, D::Process>,
726 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
727 externals: &SparseSecondaryMap<LocationKey, D::External>,
728 env: &mut D::InstantiateEnv,
729 ) where
730 D: Deploy<'a>,
731 {
732 let refcell_extra_stmts = RefCell::new(extra_stmts);
733 let refcell_env = RefCell::new(env);
734 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
735 self.transform_bottom_up(
736 &mut |l| {
737 if let HydroRoot::SendExternal {
738 input,
739 to_external_key,
740 to_port_id,
741 to_many,
742 unpaired,
743 instantiate_fn,
744 ..
745 } = l
746 {
747 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
748 DebugInstantiate::Building => {
749 let to_node = externals
750 .get(*to_external_key)
751 .unwrap_or_else(|| {
752 panic!("A external used in the graph was not instantiated: {}", to_external_key)
753 })
754 .clone();
755
756 match input.metadata().location_id.root() {
757 &LocationId::Process(process_key) => {
758 if *to_many {
759 (
760 (
761 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
762 parse_quote!(DUMMY),
763 ),
764 Box::new(|| {}) as Box<dyn FnOnce()>,
765 )
766 } else {
767 let from_node = processes
768 .get(process_key)
769 .unwrap_or_else(|| {
770 panic!("A process used in the graph was not instantiated: {}", process_key)
771 })
772 .clone();
773
774 let sink_port = from_node.next_port();
775 let source_port = to_node.next_port();
776
777 if *unpaired {
778 use stageleft::quote_type;
779 use tokio_util::codec::LengthDelimitedCodec;
780
781 to_node.register(*to_port_id, source_port.clone());
782
783 let _ = D::e2o_source(
784 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
785 &to_node, &source_port,
786 &from_node, &sink_port,
787 "e_type::<LengthDelimitedCodec>(),
788 format!("{}_{}", *to_external_key, *to_port_id)
789 );
790 }
791
792 (
793 (
794 D::o2e_sink(
795 &from_node,
796 &sink_port,
797 &to_node,
798 &source_port,
799 format!("{}_{}", *to_external_key, *to_port_id)
800 ),
801 parse_quote!(DUMMY),
802 ),
803 if *unpaired {
804 D::e2o_connect(
805 &to_node,
806 &source_port,
807 &from_node,
808 &sink_port,
809 *to_many,
810 NetworkHint::Auto,
811 )
812 } else {
813 Box::new(|| {}) as Box<dyn FnOnce()>
814 },
815 )
816 }
817 }
818 LocationId::Cluster(cluster_key) => {
819 let from_node = clusters
820 .get(*cluster_key)
821 .unwrap_or_else(|| {
822 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
823 })
824 .clone();
825
826 let sink_port = from_node.next_port();
827 let source_port = to_node.next_port();
828
829 if *unpaired {
830 to_node.register(*to_port_id, source_port.clone());
831 }
832
833 (
834 (
835 D::m2e_sink(
836 &from_node,
837 &sink_port,
838 &to_node,
839 &source_port,
840 format!("{}_{}", *to_external_key, *to_port_id)
841 ),
842 parse_quote!(DUMMY),
843 ),
844 Box::new(|| {}) as Box<dyn FnOnce()>,
845 )
846 }
847 _ => panic!()
848 }
849 },
850
851 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
852 };
853
854 *instantiate_fn = DebugInstantiateFinalized {
855 sink: sink_expr,
856 source: source_expr,
857 connect_fn: Some(connect_fn),
858 }
859 .into();
860 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
861 let element_type = match &input.metadata().collection_kind {
862 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
863 _ => panic!("Embedded output must have Stream collection kind"),
864 };
865 let location_key = match input.metadata().location_id.root() {
866 LocationId::Process(key) | LocationId::Cluster(key) => *key,
867 _ => panic!("Embedded output must be on a process or cluster"),
868 };
869 D::register_embedded_output(
870 &mut refcell_env.borrow_mut(),
871 location_key,
872 ident,
873 &element_type,
874 );
875 }
876 },
877 &mut |n| {
878 if let HydroNode::Network {
879 name,
880 networking_info,
881 input,
882 instantiate_fn,
883 metadata,
884 ..
885 } = n
886 {
887 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
888 DebugInstantiate::Building => instantiate_network::<D>(
889 &mut refcell_env.borrow_mut(),
890 input.metadata().location_id.root(),
891 metadata.location_id.root(),
892 processes,
893 clusters,
894 name.as_deref(),
895 networking_info,
896 ),
897
898 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
899 };
900
901 *instantiate_fn = DebugInstantiateFinalized {
902 sink: sink_expr,
903 source: source_expr,
904 connect_fn: Some(connect_fn),
905 }
906 .into();
907 } else if let HydroNode::ExternalInput {
908 from_external_key,
909 from_port_id,
910 from_many,
911 codec_type,
912 port_hint,
913 instantiate_fn,
914 metadata,
915 ..
916 } = n
917 {
918 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
919 DebugInstantiate::Building => {
920 let from_node = externals
921 .get(*from_external_key)
922 .unwrap_or_else(|| {
923 panic!(
924 "A external used in the graph was not instantiated: {}",
925 from_external_key,
926 )
927 })
928 .clone();
929
930 match metadata.location_id.root() {
931 &LocationId::Process(process_key) => {
932 let to_node = processes
933 .get(process_key)
934 .unwrap_or_else(|| {
935 panic!("A process used in the graph was not instantiated: {}", process_key)
936 })
937 .clone();
938
939 let sink_port = from_node.next_port();
940 let source_port = to_node.next_port();
941
942 from_node.register(*from_port_id, sink_port.clone());
943
944 (
945 (
946 parse_quote!(DUMMY),
947 if *from_many {
948 D::e2o_many_source(
949 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
950 &to_node, &source_port,
951 codec_type.0.as_ref(),
952 format!("{}_{}", *from_external_key, *from_port_id)
953 )
954 } else {
955 D::e2o_source(
956 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
957 &from_node, &sink_port,
958 &to_node, &source_port,
959 codec_type.0.as_ref(),
960 format!("{}_{}", *from_external_key, *from_port_id)
961 )
962 },
963 ),
964 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
965 )
966 }
967 LocationId::Cluster(cluster_key) => {
968 let to_node = clusters
969 .get(*cluster_key)
970 .unwrap_or_else(|| {
971 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
972 })
973 .clone();
974
975 let sink_port = from_node.next_port();
976 let source_port = to_node.next_port();
977
978 from_node.register(*from_port_id, sink_port.clone());
979
980 (
981 (
982 parse_quote!(DUMMY),
983 D::e2m_source(
984 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
985 &from_node, &sink_port,
986 &to_node, &source_port,
987 codec_type.0.as_ref(),
988 format!("{}_{}", *from_external_key, *from_port_id)
989 ),
990 ),
991 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
992 )
993 }
994 _ => panic!()
995 }
996 },
997
998 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
999 };
1000
1001 *instantiate_fn = DebugInstantiateFinalized {
1002 sink: sink_expr,
1003 source: source_expr,
1004 connect_fn: Some(connect_fn),
1005 }
1006 .into();
1007 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1008 let element_type = match &metadata.collection_kind {
1009 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1010 _ => panic!("Embedded source must have Stream collection kind"),
1011 };
1012 let location_key = match metadata.location_id.root() {
1013 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1014 _ => panic!("Embedded source must be on a process or cluster"),
1015 };
1016 D::register_embedded_stream_input(
1017 &mut refcell_env.borrow_mut(),
1018 location_key,
1019 ident,
1020 &element_type,
1021 );
1022 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1023 let element_type = match &metadata.collection_kind {
1024 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1025 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1026 };
1027 let location_key = match metadata.location_id.root() {
1028 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1029 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1030 };
1031 D::register_embedded_singleton_input(
1032 &mut refcell_env.borrow_mut(),
1033 location_key,
1034 ident,
1035 &element_type,
1036 );
1037 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1038 match state {
1039 ClusterMembersState::Uninit => {
1040 let at_location = metadata.location_id.root().clone();
1041 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1042 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1043 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1045 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1046 &(),
1047 );
1048 *state = ClusterMembersState::Stream(expr.into());
1049 } else {
1050 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1052 }
1053 }
1054 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1055 panic!("cluster members already finalized");
1056 }
1057 }
1058 }
1059 },
1060 seen_tees,
1061 false,
1062 );
1063 }
1064
1065 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1066 self.transform_bottom_up(
1067 &mut |l| {
1068 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1069 match instantiate_fn {
1070 DebugInstantiate::Building => panic!("network not built"),
1071
1072 DebugInstantiate::Finalized(finalized) => {
1073 (finalized.connect_fn.take().unwrap())();
1074 }
1075 }
1076 }
1077 },
1078 &mut |n| {
1079 if let HydroNode::Network { instantiate_fn, .. }
1080 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1081 {
1082 match instantiate_fn {
1083 DebugInstantiate::Building => panic!("network not built"),
1084
1085 DebugInstantiate::Finalized(finalized) => {
1086 (finalized.connect_fn.take().unwrap())();
1087 }
1088 }
1089 }
1090 },
1091 seen_tees,
1092 false,
1093 );
1094 }
1095
1096 pub fn transform_bottom_up(
1097 &mut self,
1098 transform_root: &mut impl FnMut(&mut HydroRoot),
1099 transform_node: &mut impl FnMut(&mut HydroNode),
1100 seen_tees: &mut SeenSharedNodes,
1101 check_well_formed: bool,
1102 ) {
1103 self.transform_children(
1104 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1105 seen_tees,
1106 );
1107
1108 transform_root(self);
1109 }
1110
1111 pub fn transform_children(
1112 &mut self,
1113 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1114 seen_tees: &mut SeenSharedNodes,
1115 ) {
1116 match self {
1117 HydroRoot::ForEach { input, .. }
1118 | HydroRoot::SendExternal { input, .. }
1119 | HydroRoot::DestSink { input, .. }
1120 | HydroRoot::CycleSink { input, .. }
1121 | HydroRoot::EmbeddedOutput { input, .. }
1122 | HydroRoot::Null { input, .. } => {
1123 transform(input, seen_tees);
1124 }
1125 }
1126 }
1127
1128 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1129 match self {
1130 HydroRoot::ForEach {
1131 f,
1132 input,
1133 op_metadata,
1134 } => HydroRoot::ForEach {
1135 f: f.clone(),
1136 input: Box::new(input.deep_clone(seen_tees)),
1137 op_metadata: op_metadata.clone(),
1138 },
1139 HydroRoot::SendExternal {
1140 to_external_key,
1141 to_port_id,
1142 to_many,
1143 unpaired,
1144 serialize_fn,
1145 instantiate_fn,
1146 input,
1147 op_metadata,
1148 } => HydroRoot::SendExternal {
1149 to_external_key: *to_external_key,
1150 to_port_id: *to_port_id,
1151 to_many: *to_many,
1152 unpaired: *unpaired,
1153 serialize_fn: serialize_fn.clone(),
1154 instantiate_fn: instantiate_fn.clone(),
1155 input: Box::new(input.deep_clone(seen_tees)),
1156 op_metadata: op_metadata.clone(),
1157 },
1158 HydroRoot::DestSink {
1159 sink,
1160 input,
1161 op_metadata,
1162 } => HydroRoot::DestSink {
1163 sink: sink.clone(),
1164 input: Box::new(input.deep_clone(seen_tees)),
1165 op_metadata: op_metadata.clone(),
1166 },
1167 HydroRoot::CycleSink {
1168 cycle_id,
1169 input,
1170 op_metadata,
1171 } => HydroRoot::CycleSink {
1172 cycle_id: *cycle_id,
1173 input: Box::new(input.deep_clone(seen_tees)),
1174 op_metadata: op_metadata.clone(),
1175 },
1176 HydroRoot::EmbeddedOutput {
1177 ident,
1178 input,
1179 op_metadata,
1180 } => HydroRoot::EmbeddedOutput {
1181 ident: ident.clone(),
1182 input: Box::new(input.deep_clone(seen_tees)),
1183 op_metadata: op_metadata.clone(),
1184 },
1185 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1186 input: Box::new(input.deep_clone(seen_tees)),
1187 op_metadata: op_metadata.clone(),
1188 },
1189 }
1190 }
1191
1192 #[cfg(feature = "build")]
1193 pub fn emit(
1194 &mut self,
1195 graph_builders: &mut dyn DfirBuilder,
1196 seen_tees: &mut SeenSharedNodes,
1197 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1198 next_stmt_id: &mut usize,
1199 ) {
1200 self.emit_core(
1201 &mut BuildersOrCallback::<
1202 fn(&mut HydroRoot, &mut usize),
1203 fn(&mut HydroNode, &mut usize),
1204 >::Builders(graph_builders),
1205 seen_tees,
1206 built_tees,
1207 next_stmt_id,
1208 );
1209 }
1210
1211 #[cfg(feature = "build")]
1212 pub fn emit_core(
1213 &mut self,
1214 builders_or_callback: &mut BuildersOrCallback<
1215 impl FnMut(&mut HydroRoot, &mut usize),
1216 impl FnMut(&mut HydroNode, &mut usize),
1217 >,
1218 seen_tees: &mut SeenSharedNodes,
1219 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1220 next_stmt_id: &mut usize,
1221 ) {
1222 match self {
1223 HydroRoot::ForEach { f, input, .. } => {
1224 let input_ident =
1225 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1226
1227 match builders_or_callback {
1228 BuildersOrCallback::Builders(graph_builders) => {
1229 graph_builders
1230 .get_dfir_mut(&input.metadata().location_id)
1231 .add_dfir(
1232 parse_quote! {
1233 #input_ident -> for_each(#f);
1234 },
1235 None,
1236 Some(&next_stmt_id.to_string()),
1237 );
1238 }
1239 BuildersOrCallback::Callback(leaf_callback, _) => {
1240 leaf_callback(self, next_stmt_id);
1241 }
1242 }
1243
1244 *next_stmt_id += 1;
1245 }
1246
1247 HydroRoot::SendExternal {
1248 serialize_fn,
1249 instantiate_fn,
1250 input,
1251 ..
1252 } => {
1253 let input_ident =
1254 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1255
1256 match builders_or_callback {
1257 BuildersOrCallback::Builders(graph_builders) => {
1258 let (sink_expr, _) = match instantiate_fn {
1259 DebugInstantiate::Building => (
1260 syn::parse_quote!(DUMMY_SINK),
1261 syn::parse_quote!(DUMMY_SOURCE),
1262 ),
1263
1264 DebugInstantiate::Finalized(finalized) => {
1265 (finalized.sink.clone(), finalized.source.clone())
1266 }
1267 };
1268
1269 graph_builders.create_external_output(
1270 &input.metadata().location_id,
1271 sink_expr,
1272 &input_ident,
1273 serialize_fn.as_ref(),
1274 *next_stmt_id,
1275 );
1276 }
1277 BuildersOrCallback::Callback(leaf_callback, _) => {
1278 leaf_callback(self, next_stmt_id);
1279 }
1280 }
1281
1282 *next_stmt_id += 1;
1283 }
1284
1285 HydroRoot::DestSink { sink, input, .. } => {
1286 let input_ident =
1287 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1288
1289 match builders_or_callback {
1290 BuildersOrCallback::Builders(graph_builders) => {
1291 graph_builders
1292 .get_dfir_mut(&input.metadata().location_id)
1293 .add_dfir(
1294 parse_quote! {
1295 #input_ident -> dest_sink(#sink);
1296 },
1297 None,
1298 Some(&next_stmt_id.to_string()),
1299 );
1300 }
1301 BuildersOrCallback::Callback(leaf_callback, _) => {
1302 leaf_callback(self, next_stmt_id);
1303 }
1304 }
1305
1306 *next_stmt_id += 1;
1307 }
1308
1309 HydroRoot::CycleSink {
1310 cycle_id, input, ..
1311 } => {
1312 let input_ident =
1313 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1314
1315 match builders_or_callback {
1316 BuildersOrCallback::Builders(graph_builders) => {
1317 let elem_type: syn::Type = match &input.metadata().collection_kind {
1318 CollectionKind::KeyedSingleton {
1319 key_type,
1320 value_type,
1321 ..
1322 }
1323 | CollectionKind::KeyedStream {
1324 key_type,
1325 value_type,
1326 ..
1327 } => {
1328 parse_quote!((#key_type, #value_type))
1329 }
1330 CollectionKind::Stream { element_type, .. }
1331 | CollectionKind::Singleton { element_type, .. }
1332 | CollectionKind::Optional { element_type, .. } => {
1333 parse_quote!(#element_type)
1334 }
1335 };
1336
1337 let cycle_id_ident = cycle_id.as_ident();
1338 graph_builders
1339 .get_dfir_mut(&input.metadata().location_id)
1340 .add_dfir(
1341 parse_quote! {
1342 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1343 },
1344 None,
1345 None,
1346 );
1347 }
1348 BuildersOrCallback::Callback(_, _) => {}
1350 }
1351 }
1352
1353 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1354 let input_ident =
1355 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1356
1357 match builders_or_callback {
1358 BuildersOrCallback::Builders(graph_builders) => {
1359 graph_builders
1360 .get_dfir_mut(&input.metadata().location_id)
1361 .add_dfir(
1362 parse_quote! {
1363 #input_ident -> for_each(&mut #ident);
1364 },
1365 None,
1366 Some(&next_stmt_id.to_string()),
1367 );
1368 }
1369 BuildersOrCallback::Callback(leaf_callback, _) => {
1370 leaf_callback(self, next_stmt_id);
1371 }
1372 }
1373
1374 *next_stmt_id += 1;
1375 }
1376
1377 HydroRoot::Null { input, .. } => {
1378 let input_ident =
1379 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1380
1381 match builders_or_callback {
1382 BuildersOrCallback::Builders(graph_builders) => {
1383 graph_builders
1384 .get_dfir_mut(&input.metadata().location_id)
1385 .add_dfir(
1386 parse_quote! {
1387 #input_ident -> for_each(|_| {});
1388 },
1389 None,
1390 Some(&next_stmt_id.to_string()),
1391 );
1392 }
1393 BuildersOrCallback::Callback(leaf_callback, _) => {
1394 leaf_callback(self, next_stmt_id);
1395 }
1396 }
1397
1398 *next_stmt_id += 1;
1399 }
1400 }
1401 }
1402
1403 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1404 match self {
1405 HydroRoot::ForEach { op_metadata, .. }
1406 | HydroRoot::SendExternal { op_metadata, .. }
1407 | HydroRoot::DestSink { op_metadata, .. }
1408 | HydroRoot::CycleSink { op_metadata, .. }
1409 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1410 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1411 }
1412 }
1413
1414 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1415 match self {
1416 HydroRoot::ForEach { op_metadata, .. }
1417 | HydroRoot::SendExternal { op_metadata, .. }
1418 | HydroRoot::DestSink { op_metadata, .. }
1419 | HydroRoot::CycleSink { op_metadata, .. }
1420 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1421 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1422 }
1423 }
1424
1425 pub fn input(&self) -> &HydroNode {
1426 match self {
1427 HydroRoot::ForEach { input, .. }
1428 | HydroRoot::SendExternal { input, .. }
1429 | HydroRoot::DestSink { input, .. }
1430 | HydroRoot::CycleSink { input, .. }
1431 | HydroRoot::EmbeddedOutput { input, .. }
1432 | HydroRoot::Null { input, .. } => input,
1433 }
1434 }
1435
1436 pub fn input_metadata(&self) -> &HydroIrMetadata {
1437 self.input().metadata()
1438 }
1439
1440 pub fn print_root(&self) -> String {
1441 match self {
1442 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1443 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1444 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1445 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1446 HydroRoot::EmbeddedOutput { ident, .. } => {
1447 format!("EmbeddedOutput({})", ident)
1448 }
1449 HydroRoot::Null { .. } => "Null".to_owned(),
1450 }
1451 }
1452
1453 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1454 match self {
1455 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1456 transform(f);
1457 }
1458 HydroRoot::SendExternal { .. }
1459 | HydroRoot::CycleSink { .. }
1460 | HydroRoot::EmbeddedOutput { .. }
1461 | HydroRoot::Null { .. } => {}
1462 }
1463 }
1464}
1465
1466#[cfg(feature = "build")]
1467fn tick_of(loc: &LocationId) -> Option<ClockId> {
1468 match loc {
1469 LocationId::Tick(id, _) => Some(*id),
1470 LocationId::Atomic(inner) => tick_of(inner),
1471 _ => None,
1472 }
1473}
1474
1475#[cfg(feature = "build")]
1476fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1477 match loc {
1478 LocationId::Tick(id, inner) => {
1479 *id = uf_find(uf, *id);
1480 remap_location(inner, uf);
1481 }
1482 LocationId::Atomic(inner) => {
1483 remap_location(inner, uf);
1484 }
1485 LocationId::Process(_) | LocationId::Cluster(_) => {}
1486 }
1487}
1488
1489#[cfg(feature = "build")]
1490fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1491 let p = *parent.get(&x).unwrap_or(&x);
1492 if p == x {
1493 return x;
1494 }
1495 let root = uf_find(parent, p);
1496 parent.insert(x, root);
1497 root
1498}
1499
1500#[cfg(feature = "build")]
1501fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1502 let ra = uf_find(parent, a);
1503 let rb = uf_find(parent, b);
1504 if ra != rb {
1505 parent.insert(ra, rb);
1506 }
1507}
1508
1509#[cfg(feature = "build")]
1513pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1514 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1515
1516 transform_bottom_up(
1518 ir,
1519 &mut |_| {},
1520 &mut |node: &mut HydroNode| {
1521 if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1522 node
1523 && let (Some(a), Some(b)) = (
1524 tick_of(&inner.metadata().location_id),
1525 tick_of(&metadata.location_id),
1526 )
1527 {
1528 uf_union(&mut uf, a, b);
1529 }
1530 },
1531 false,
1532 );
1533
1534 transform_bottom_up(
1536 ir,
1537 &mut |_| {},
1538 &mut |node: &mut HydroNode| {
1539 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1540 },
1541 false,
1542 );
1543}
1544
1545#[cfg(feature = "build")]
1546pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1547 let mut builders = SecondaryMap::new();
1548 let mut seen_tees = HashMap::new();
1549 let mut built_tees = HashMap::new();
1550 let mut next_stmt_id = 0;
1551 for leaf in ir {
1552 leaf.emit(
1553 &mut builders,
1554 &mut seen_tees,
1555 &mut built_tees,
1556 &mut next_stmt_id,
1557 );
1558 }
1559 builders
1560}
1561
1562#[cfg(feature = "build")]
1563pub fn traverse_dfir(
1564 ir: &mut [HydroRoot],
1565 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1566 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1567) {
1568 let mut seen_tees = HashMap::new();
1569 let mut built_tees = HashMap::new();
1570 let mut next_stmt_id = 0;
1571 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1572 ir.iter_mut().for_each(|leaf| {
1573 leaf.emit_core(
1574 &mut callback,
1575 &mut seen_tees,
1576 &mut built_tees,
1577 &mut next_stmt_id,
1578 );
1579 });
1580}
1581
1582pub fn transform_bottom_up(
1583 ir: &mut [HydroRoot],
1584 transform_root: &mut impl FnMut(&mut HydroRoot),
1585 transform_node: &mut impl FnMut(&mut HydroNode),
1586 check_well_formed: bool,
1587) {
1588 let mut seen_tees = HashMap::new();
1589 ir.iter_mut().for_each(|leaf| {
1590 leaf.transform_bottom_up(
1591 transform_root,
1592 transform_node,
1593 &mut seen_tees,
1594 check_well_formed,
1595 );
1596 });
1597}
1598
1599pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1600 let mut seen_tees = HashMap::new();
1601 ir.iter()
1602 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1603 .collect()
1604}
1605
1606type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1607thread_local! {
1608 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1609}
1610
1611pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1612 PRINTED_TEES.with(|printed_tees| {
1613 let mut printed_tees_mut = printed_tees.borrow_mut();
1614 *printed_tees_mut = Some((0, HashMap::new()));
1615 drop(printed_tees_mut);
1616
1617 let ret = f();
1618
1619 let mut printed_tees_mut = printed_tees.borrow_mut();
1620 *printed_tees_mut = None;
1621
1622 ret
1623 })
1624}
1625
1626pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1627
1628impl SharedNode {
1629 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1630 Rc::as_ptr(&self.0)
1631 }
1632}
1633
1634impl Debug for SharedNode {
1635 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1636 PRINTED_TEES.with(|printed_tees| {
1637 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1638 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1639
1640 if let Some(printed_tees_mut) = printed_tees_mut {
1641 if let Some(existing) = printed_tees_mut
1642 .1
1643 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1644 {
1645 write!(f, "<shared {}>", existing)
1646 } else {
1647 let next_id = printed_tees_mut.0;
1648 printed_tees_mut.0 += 1;
1649 printed_tees_mut
1650 .1
1651 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1652 drop(printed_tees_mut_borrow);
1653 write!(f, "<shared {}>: ", next_id)?;
1654 Debug::fmt(&self.0.borrow(), f)
1655 }
1656 } else {
1657 drop(printed_tees_mut_borrow);
1658 write!(f, "<shared>: ")?;
1659 Debug::fmt(&self.0.borrow(), f)
1660 }
1661 })
1662 }
1663}
1664
1665impl Hash for SharedNode {
1666 fn hash<H: Hasher>(&self, state: &mut H) {
1667 self.0.borrow_mut().hash(state);
1668 }
1669}
1670
1671#[derive(Clone, PartialEq, Eq, Debug)]
1672pub enum BoundKind {
1673 Unbounded,
1674 Bounded,
1675}
1676
1677#[derive(Clone, PartialEq, Eq, Debug)]
1678pub enum StreamOrder {
1679 NoOrder,
1680 TotalOrder,
1681}
1682
1683#[derive(Clone, PartialEq, Eq, Debug)]
1684pub enum StreamRetry {
1685 AtLeastOnce,
1686 ExactlyOnce,
1687}
1688
1689#[derive(Clone, PartialEq, Eq, Debug)]
1690pub enum KeyedSingletonBoundKind {
1691 Unbounded,
1692 BoundedValue,
1693 Bounded,
1694}
1695
1696#[derive(Clone, PartialEq, Eq, Debug)]
1697pub enum CollectionKind {
1698 Stream {
1699 bound: BoundKind,
1700 order: StreamOrder,
1701 retry: StreamRetry,
1702 element_type: DebugType,
1703 },
1704 Singleton {
1705 bound: BoundKind,
1706 element_type: DebugType,
1707 },
1708 Optional {
1709 bound: BoundKind,
1710 element_type: DebugType,
1711 },
1712 KeyedStream {
1713 bound: BoundKind,
1714 value_order: StreamOrder,
1715 value_retry: StreamRetry,
1716 key_type: DebugType,
1717 value_type: DebugType,
1718 },
1719 KeyedSingleton {
1720 bound: KeyedSingletonBoundKind,
1721 key_type: DebugType,
1722 value_type: DebugType,
1723 },
1724}
1725
1726impl CollectionKind {
1727 pub fn is_bounded(&self) -> bool {
1728 matches!(
1729 self,
1730 CollectionKind::Stream {
1731 bound: BoundKind::Bounded,
1732 ..
1733 } | CollectionKind::Singleton {
1734 bound: BoundKind::Bounded,
1735 ..
1736 } | CollectionKind::Optional {
1737 bound: BoundKind::Bounded,
1738 ..
1739 } | CollectionKind::KeyedStream {
1740 bound: BoundKind::Bounded,
1741 ..
1742 } | CollectionKind::KeyedSingleton {
1743 bound: KeyedSingletonBoundKind::Bounded,
1744 ..
1745 }
1746 )
1747 }
1748}
1749
1750#[derive(Clone)]
1751pub struct HydroIrMetadata {
1752 pub location_id: LocationId,
1753 pub collection_kind: CollectionKind,
1754 pub cardinality: Option<usize>,
1755 pub tag: Option<String>,
1756 pub op: HydroIrOpMetadata,
1757}
1758
1759impl Hash for HydroIrMetadata {
1761 fn hash<H: Hasher>(&self, _: &mut H) {}
1762}
1763
1764impl PartialEq for HydroIrMetadata {
1765 fn eq(&self, _: &Self) -> bool {
1766 true
1767 }
1768}
1769
1770impl Eq for HydroIrMetadata {}
1771
1772impl Debug for HydroIrMetadata {
1773 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1774 f.debug_struct("HydroIrMetadata")
1775 .field("location_id", &self.location_id)
1776 .field("collection_kind", &self.collection_kind)
1777 .finish()
1778 }
1779}
1780
1781#[derive(Clone)]
1784pub struct HydroIrOpMetadata {
1785 pub backtrace: Backtrace,
1786 pub cpu_usage: Option<f64>,
1787 pub network_recv_cpu_usage: Option<f64>,
1788 pub id: Option<usize>,
1789}
1790
1791impl HydroIrOpMetadata {
1792 #[expect(
1793 clippy::new_without_default,
1794 reason = "explicit calls to new ensure correct backtrace bounds"
1795 )]
1796 pub fn new() -> HydroIrOpMetadata {
1797 Self::new_with_skip(1)
1798 }
1799
1800 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1801 HydroIrOpMetadata {
1802 backtrace: Backtrace::get_backtrace(2 + skip_count),
1803 cpu_usage: None,
1804 network_recv_cpu_usage: None,
1805 id: None,
1806 }
1807 }
1808}
1809
1810impl Debug for HydroIrOpMetadata {
1811 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1812 f.debug_struct("HydroIrOpMetadata").finish()
1813 }
1814}
1815
1816impl Hash for HydroIrOpMetadata {
1817 fn hash<H: Hasher>(&self, _: &mut H) {}
1818}
1819
1820#[derive(Debug, Hash)]
1823pub enum HydroNode {
1824 Placeholder,
1825
1826 Cast {
1834 inner: Box<HydroNode>,
1835 metadata: HydroIrMetadata,
1836 },
1837
1838 ObserveNonDet {
1844 inner: Box<HydroNode>,
1845 trusted: bool, metadata: HydroIrMetadata,
1847 },
1848
1849 Source {
1850 source: HydroSource,
1851 metadata: HydroIrMetadata,
1852 },
1853
1854 SingletonSource {
1855 value: DebugExpr,
1856 first_tick_only: bool,
1857 metadata: HydroIrMetadata,
1858 },
1859
1860 CycleSource {
1861 cycle_id: CycleId,
1862 metadata: HydroIrMetadata,
1863 },
1864
1865 Tee {
1866 inner: SharedNode,
1867 metadata: HydroIrMetadata,
1868 },
1869
1870 Partition {
1871 inner: SharedNode,
1872 f: DebugExpr,
1873 is_true: bool,
1874 metadata: HydroIrMetadata,
1875 },
1876
1877 BeginAtomic {
1878 inner: Box<HydroNode>,
1879 metadata: HydroIrMetadata,
1880 },
1881
1882 EndAtomic {
1883 inner: Box<HydroNode>,
1884 metadata: HydroIrMetadata,
1885 },
1886
1887 Batch {
1888 inner: Box<HydroNode>,
1889 metadata: HydroIrMetadata,
1890 },
1891
1892 YieldConcat {
1893 inner: Box<HydroNode>,
1894 metadata: HydroIrMetadata,
1895 },
1896
1897 Chain {
1898 first: Box<HydroNode>,
1899 second: Box<HydroNode>,
1900 metadata: HydroIrMetadata,
1901 },
1902
1903 ChainFirst {
1904 first: Box<HydroNode>,
1905 second: Box<HydroNode>,
1906 metadata: HydroIrMetadata,
1907 },
1908
1909 CrossProduct {
1910 left: Box<HydroNode>,
1911 right: Box<HydroNode>,
1912 metadata: HydroIrMetadata,
1913 },
1914
1915 CrossSingleton {
1916 left: Box<HydroNode>,
1917 right: Box<HydroNode>,
1918 metadata: HydroIrMetadata,
1919 },
1920
1921 Join {
1922 left: Box<HydroNode>,
1923 right: Box<HydroNode>,
1924 metadata: HydroIrMetadata,
1925 },
1926
1927 Difference {
1928 pos: Box<HydroNode>,
1929 neg: Box<HydroNode>,
1930 metadata: HydroIrMetadata,
1931 },
1932
1933 AntiJoin {
1934 pos: Box<HydroNode>,
1935 neg: Box<HydroNode>,
1936 metadata: HydroIrMetadata,
1937 },
1938
1939 ResolveFutures {
1940 input: Box<HydroNode>,
1941 metadata: HydroIrMetadata,
1942 },
1943 ResolveFuturesBlocking {
1944 input: Box<HydroNode>,
1945 metadata: HydroIrMetadata,
1946 },
1947 ResolveFuturesOrdered {
1948 input: Box<HydroNode>,
1949 metadata: HydroIrMetadata,
1950 },
1951
1952 Map {
1953 f: DebugExpr,
1954 input: Box<HydroNode>,
1955 metadata: HydroIrMetadata,
1956 },
1957 FlatMap {
1958 f: DebugExpr,
1959 input: Box<HydroNode>,
1960 metadata: HydroIrMetadata,
1961 },
1962 FlatMapStreamBlocking {
1963 f: DebugExpr,
1964 input: Box<HydroNode>,
1965 metadata: HydroIrMetadata,
1966 },
1967 Filter {
1968 f: DebugExpr,
1969 input: Box<HydroNode>,
1970 metadata: HydroIrMetadata,
1971 },
1972 FilterMap {
1973 f: DebugExpr,
1974 input: Box<HydroNode>,
1975 metadata: HydroIrMetadata,
1976 },
1977
1978 DeferTick {
1979 input: Box<HydroNode>,
1980 metadata: HydroIrMetadata,
1981 },
1982 Enumerate {
1983 input: Box<HydroNode>,
1984 metadata: HydroIrMetadata,
1985 },
1986 Inspect {
1987 f: DebugExpr,
1988 input: Box<HydroNode>,
1989 metadata: HydroIrMetadata,
1990 },
1991
1992 Unique {
1993 input: Box<HydroNode>,
1994 metadata: HydroIrMetadata,
1995 },
1996
1997 Sort {
1998 input: Box<HydroNode>,
1999 metadata: HydroIrMetadata,
2000 },
2001 Fold {
2002 init: DebugExpr,
2003 acc: DebugExpr,
2004 input: Box<HydroNode>,
2005 metadata: HydroIrMetadata,
2006 },
2007
2008 Scan {
2009 init: DebugExpr,
2010 acc: DebugExpr,
2011 input: Box<HydroNode>,
2012 metadata: HydroIrMetadata,
2013 },
2014 FoldKeyed {
2015 init: DebugExpr,
2016 acc: DebugExpr,
2017 input: Box<HydroNode>,
2018 metadata: HydroIrMetadata,
2019 },
2020
2021 Reduce {
2022 f: DebugExpr,
2023 input: Box<HydroNode>,
2024 metadata: HydroIrMetadata,
2025 },
2026 ReduceKeyed {
2027 f: DebugExpr,
2028 input: Box<HydroNode>,
2029 metadata: HydroIrMetadata,
2030 },
2031 ReduceKeyedWatermark {
2032 f: DebugExpr,
2033 input: Box<HydroNode>,
2034 watermark: Box<HydroNode>,
2035 metadata: HydroIrMetadata,
2036 },
2037
2038 Network {
2039 name: Option<String>,
2040 networking_info: crate::networking::NetworkingInfo,
2041 serialize_fn: Option<DebugExpr>,
2042 instantiate_fn: DebugInstantiate,
2043 deserialize_fn: Option<DebugExpr>,
2044 input: Box<HydroNode>,
2045 metadata: HydroIrMetadata,
2046 },
2047
2048 ExternalInput {
2049 from_external_key: LocationKey,
2050 from_port_id: ExternalPortId,
2051 from_many: bool,
2052 codec_type: DebugType,
2053 port_hint: NetworkHint,
2054 instantiate_fn: DebugInstantiate,
2055 deserialize_fn: Option<DebugExpr>,
2056 metadata: HydroIrMetadata,
2057 },
2058
2059 Counter {
2060 tag: String,
2061 duration: DebugExpr,
2062 prefix: String,
2063 input: Box<HydroNode>,
2064 metadata: HydroIrMetadata,
2065 },
2066}
2067
2068pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2069pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2070
2071impl HydroNode {
2072 pub fn transform_bottom_up(
2073 &mut self,
2074 transform: &mut impl FnMut(&mut HydroNode),
2075 seen_tees: &mut SeenSharedNodes,
2076 check_well_formed: bool,
2077 ) {
2078 self.transform_children(
2079 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2080 seen_tees,
2081 );
2082
2083 transform(self);
2084
2085 let self_location = self.metadata().location_id.root();
2086
2087 if check_well_formed {
2088 match &*self {
2089 HydroNode::Network { .. } => {}
2090 _ => {
2091 self.input_metadata().iter().for_each(|i| {
2092 if i.location_id.root() != self_location {
2093 panic!(
2094 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2095 i,
2096 i.location_id.root(),
2097 self,
2098 self_location
2099 )
2100 }
2101 });
2102 }
2103 }
2104 }
2105 }
2106
2107 #[inline(always)]
2108 pub fn transform_children(
2109 &mut self,
2110 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2111 seen_tees: &mut SeenSharedNodes,
2112 ) {
2113 match self {
2114 HydroNode::Placeholder => {
2115 panic!();
2116 }
2117
2118 HydroNode::Source { .. }
2119 | HydroNode::SingletonSource { .. }
2120 | HydroNode::CycleSource { .. }
2121 | HydroNode::ExternalInput { .. } => {}
2122
2123 HydroNode::Tee { inner, .. } => {
2124 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2125 *inner = SharedNode(transformed.clone());
2126 } else {
2127 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2128 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2129 let mut orig = inner.0.replace(HydroNode::Placeholder);
2130 transform(&mut orig, seen_tees);
2131 *transformed_cell.borrow_mut() = orig;
2132 *inner = SharedNode(transformed_cell);
2133 }
2134 }
2135
2136 HydroNode::Partition { inner, .. } => {
2137 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2138 *inner = SharedNode(transformed.clone());
2139 } else {
2140 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2141 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2142 let mut orig = inner.0.replace(HydroNode::Placeholder);
2143 transform(&mut orig, seen_tees);
2144 *transformed_cell.borrow_mut() = orig;
2145 *inner = SharedNode(transformed_cell);
2146 }
2147 }
2148
2149 HydroNode::Cast { inner, .. }
2150 | HydroNode::ObserveNonDet { inner, .. }
2151 | HydroNode::BeginAtomic { inner, .. }
2152 | HydroNode::EndAtomic { inner, .. }
2153 | HydroNode::Batch { inner, .. }
2154 | HydroNode::YieldConcat { inner, .. } => {
2155 transform(inner.as_mut(), seen_tees);
2156 }
2157
2158 HydroNode::Chain { first, second, .. } => {
2159 transform(first.as_mut(), seen_tees);
2160 transform(second.as_mut(), seen_tees);
2161 }
2162
2163 HydroNode::ChainFirst { first, second, .. } => {
2164 transform(first.as_mut(), seen_tees);
2165 transform(second.as_mut(), seen_tees);
2166 }
2167
2168 HydroNode::CrossSingleton { left, right, .. }
2169 | HydroNode::CrossProduct { left, right, .. }
2170 | HydroNode::Join { left, right, .. } => {
2171 transform(left.as_mut(), seen_tees);
2172 transform(right.as_mut(), seen_tees);
2173 }
2174
2175 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2176 transform(pos.as_mut(), seen_tees);
2177 transform(neg.as_mut(), seen_tees);
2178 }
2179
2180 HydroNode::ReduceKeyedWatermark {
2181 input, watermark, ..
2182 } => {
2183 transform(input.as_mut(), seen_tees);
2184 transform(watermark.as_mut(), seen_tees);
2185 }
2186
2187 HydroNode::Map { input, .. }
2188 | HydroNode::ResolveFutures { input, .. }
2189 | HydroNode::ResolveFuturesBlocking { input, .. }
2190 | HydroNode::ResolveFuturesOrdered { input, .. }
2191 | HydroNode::FlatMap { input, .. }
2192 | HydroNode::FlatMapStreamBlocking { input, .. }
2193 | HydroNode::Filter { input, .. }
2194 | HydroNode::FilterMap { input, .. }
2195 | HydroNode::Sort { input, .. }
2196 | HydroNode::DeferTick { input, .. }
2197 | HydroNode::Enumerate { input, .. }
2198 | HydroNode::Inspect { input, .. }
2199 | HydroNode::Unique { input, .. }
2200 | HydroNode::Network { input, .. }
2201 | HydroNode::Fold { input, .. }
2202 | HydroNode::Scan { input, .. }
2203 | HydroNode::FoldKeyed { input, .. }
2204 | HydroNode::Reduce { input, .. }
2205 | HydroNode::ReduceKeyed { input, .. }
2206 | HydroNode::Counter { input, .. } => {
2207 transform(input.as_mut(), seen_tees);
2208 }
2209 }
2210 }
2211
2212 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2213 match self {
2214 HydroNode::Placeholder => HydroNode::Placeholder,
2215 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2216 inner: Box::new(inner.deep_clone(seen_tees)),
2217 metadata: metadata.clone(),
2218 },
2219 HydroNode::ObserveNonDet {
2220 inner,
2221 trusted,
2222 metadata,
2223 } => HydroNode::ObserveNonDet {
2224 inner: Box::new(inner.deep_clone(seen_tees)),
2225 trusted: *trusted,
2226 metadata: metadata.clone(),
2227 },
2228 HydroNode::Source { source, metadata } => HydroNode::Source {
2229 source: source.clone(),
2230 metadata: metadata.clone(),
2231 },
2232 HydroNode::SingletonSource {
2233 value,
2234 first_tick_only,
2235 metadata,
2236 } => HydroNode::SingletonSource {
2237 value: value.clone(),
2238 first_tick_only: *first_tick_only,
2239 metadata: metadata.clone(),
2240 },
2241 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2242 cycle_id: *cycle_id,
2243 metadata: metadata.clone(),
2244 },
2245 HydroNode::Tee { inner, metadata } => {
2246 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2247 HydroNode::Tee {
2248 inner: SharedNode(transformed.clone()),
2249 metadata: metadata.clone(),
2250 }
2251 } else {
2252 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2253 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2254 let cloned = inner.0.borrow().deep_clone(seen_tees);
2255 *new_rc.borrow_mut() = cloned;
2256 HydroNode::Tee {
2257 inner: SharedNode(new_rc),
2258 metadata: metadata.clone(),
2259 }
2260 }
2261 }
2262 HydroNode::Partition {
2263 inner,
2264 f,
2265 is_true,
2266 metadata,
2267 } => {
2268 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2269 HydroNode::Partition {
2270 inner: SharedNode(transformed.clone()),
2271 f: f.clone(),
2272 is_true: *is_true,
2273 metadata: metadata.clone(),
2274 }
2275 } else {
2276 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2277 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2278 let cloned = inner.0.borrow().deep_clone(seen_tees);
2279 *new_rc.borrow_mut() = cloned;
2280 HydroNode::Partition {
2281 inner: SharedNode(new_rc),
2282 f: f.clone(),
2283 is_true: *is_true,
2284 metadata: metadata.clone(),
2285 }
2286 }
2287 }
2288 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2289 inner: Box::new(inner.deep_clone(seen_tees)),
2290 metadata: metadata.clone(),
2291 },
2292 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2293 inner: Box::new(inner.deep_clone(seen_tees)),
2294 metadata: metadata.clone(),
2295 },
2296 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2297 inner: Box::new(inner.deep_clone(seen_tees)),
2298 metadata: metadata.clone(),
2299 },
2300 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2301 inner: Box::new(inner.deep_clone(seen_tees)),
2302 metadata: metadata.clone(),
2303 },
2304 HydroNode::Chain {
2305 first,
2306 second,
2307 metadata,
2308 } => HydroNode::Chain {
2309 first: Box::new(first.deep_clone(seen_tees)),
2310 second: Box::new(second.deep_clone(seen_tees)),
2311 metadata: metadata.clone(),
2312 },
2313 HydroNode::ChainFirst {
2314 first,
2315 second,
2316 metadata,
2317 } => HydroNode::ChainFirst {
2318 first: Box::new(first.deep_clone(seen_tees)),
2319 second: Box::new(second.deep_clone(seen_tees)),
2320 metadata: metadata.clone(),
2321 },
2322 HydroNode::CrossProduct {
2323 left,
2324 right,
2325 metadata,
2326 } => HydroNode::CrossProduct {
2327 left: Box::new(left.deep_clone(seen_tees)),
2328 right: Box::new(right.deep_clone(seen_tees)),
2329 metadata: metadata.clone(),
2330 },
2331 HydroNode::CrossSingleton {
2332 left,
2333 right,
2334 metadata,
2335 } => HydroNode::CrossSingleton {
2336 left: Box::new(left.deep_clone(seen_tees)),
2337 right: Box::new(right.deep_clone(seen_tees)),
2338 metadata: metadata.clone(),
2339 },
2340 HydroNode::Join {
2341 left,
2342 right,
2343 metadata,
2344 } => HydroNode::Join {
2345 left: Box::new(left.deep_clone(seen_tees)),
2346 right: Box::new(right.deep_clone(seen_tees)),
2347 metadata: metadata.clone(),
2348 },
2349 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2350 pos: Box::new(pos.deep_clone(seen_tees)),
2351 neg: Box::new(neg.deep_clone(seen_tees)),
2352 metadata: metadata.clone(),
2353 },
2354 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2355 pos: Box::new(pos.deep_clone(seen_tees)),
2356 neg: Box::new(neg.deep_clone(seen_tees)),
2357 metadata: metadata.clone(),
2358 },
2359 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2360 input: Box::new(input.deep_clone(seen_tees)),
2361 metadata: metadata.clone(),
2362 },
2363 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2364 HydroNode::ResolveFuturesBlocking {
2365 input: Box::new(input.deep_clone(seen_tees)),
2366 metadata: metadata.clone(),
2367 }
2368 }
2369 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2370 HydroNode::ResolveFuturesOrdered {
2371 input: Box::new(input.deep_clone(seen_tees)),
2372 metadata: metadata.clone(),
2373 }
2374 }
2375 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2376 f: f.clone(),
2377 input: Box::new(input.deep_clone(seen_tees)),
2378 metadata: metadata.clone(),
2379 },
2380 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2381 f: f.clone(),
2382 input: Box::new(input.deep_clone(seen_tees)),
2383 metadata: metadata.clone(),
2384 },
2385 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2386 HydroNode::FlatMapStreamBlocking {
2387 f: f.clone(),
2388 input: Box::new(input.deep_clone(seen_tees)),
2389 metadata: metadata.clone(),
2390 }
2391 }
2392 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2393 f: f.clone(),
2394 input: Box::new(input.deep_clone(seen_tees)),
2395 metadata: metadata.clone(),
2396 },
2397 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2398 f: f.clone(),
2399 input: Box::new(input.deep_clone(seen_tees)),
2400 metadata: metadata.clone(),
2401 },
2402 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2403 input: Box::new(input.deep_clone(seen_tees)),
2404 metadata: metadata.clone(),
2405 },
2406 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2407 input: Box::new(input.deep_clone(seen_tees)),
2408 metadata: metadata.clone(),
2409 },
2410 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2411 f: f.clone(),
2412 input: Box::new(input.deep_clone(seen_tees)),
2413 metadata: metadata.clone(),
2414 },
2415 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2416 input: Box::new(input.deep_clone(seen_tees)),
2417 metadata: metadata.clone(),
2418 },
2419 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2420 input: Box::new(input.deep_clone(seen_tees)),
2421 metadata: metadata.clone(),
2422 },
2423 HydroNode::Fold {
2424 init,
2425 acc,
2426 input,
2427 metadata,
2428 } => HydroNode::Fold {
2429 init: init.clone(),
2430 acc: acc.clone(),
2431 input: Box::new(input.deep_clone(seen_tees)),
2432 metadata: metadata.clone(),
2433 },
2434 HydroNode::Scan {
2435 init,
2436 acc,
2437 input,
2438 metadata,
2439 } => HydroNode::Scan {
2440 init: init.clone(),
2441 acc: acc.clone(),
2442 input: Box::new(input.deep_clone(seen_tees)),
2443 metadata: metadata.clone(),
2444 },
2445 HydroNode::FoldKeyed {
2446 init,
2447 acc,
2448 input,
2449 metadata,
2450 } => HydroNode::FoldKeyed {
2451 init: init.clone(),
2452 acc: acc.clone(),
2453 input: Box::new(input.deep_clone(seen_tees)),
2454 metadata: metadata.clone(),
2455 },
2456 HydroNode::ReduceKeyedWatermark {
2457 f,
2458 input,
2459 watermark,
2460 metadata,
2461 } => HydroNode::ReduceKeyedWatermark {
2462 f: f.clone(),
2463 input: Box::new(input.deep_clone(seen_tees)),
2464 watermark: Box::new(watermark.deep_clone(seen_tees)),
2465 metadata: metadata.clone(),
2466 },
2467 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2468 f: f.clone(),
2469 input: Box::new(input.deep_clone(seen_tees)),
2470 metadata: metadata.clone(),
2471 },
2472 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2473 f: f.clone(),
2474 input: Box::new(input.deep_clone(seen_tees)),
2475 metadata: metadata.clone(),
2476 },
2477 HydroNode::Network {
2478 name,
2479 networking_info,
2480 serialize_fn,
2481 instantiate_fn,
2482 deserialize_fn,
2483 input,
2484 metadata,
2485 } => HydroNode::Network {
2486 name: name.clone(),
2487 networking_info: networking_info.clone(),
2488 serialize_fn: serialize_fn.clone(),
2489 instantiate_fn: instantiate_fn.clone(),
2490 deserialize_fn: deserialize_fn.clone(),
2491 input: Box::new(input.deep_clone(seen_tees)),
2492 metadata: metadata.clone(),
2493 },
2494 HydroNode::ExternalInput {
2495 from_external_key,
2496 from_port_id,
2497 from_many,
2498 codec_type,
2499 port_hint,
2500 instantiate_fn,
2501 deserialize_fn,
2502 metadata,
2503 } => HydroNode::ExternalInput {
2504 from_external_key: *from_external_key,
2505 from_port_id: *from_port_id,
2506 from_many: *from_many,
2507 codec_type: codec_type.clone(),
2508 port_hint: *port_hint,
2509 instantiate_fn: instantiate_fn.clone(),
2510 deserialize_fn: deserialize_fn.clone(),
2511 metadata: metadata.clone(),
2512 },
2513 HydroNode::Counter {
2514 tag,
2515 duration,
2516 prefix,
2517 input,
2518 metadata,
2519 } => HydroNode::Counter {
2520 tag: tag.clone(),
2521 duration: duration.clone(),
2522 prefix: prefix.clone(),
2523 input: Box::new(input.deep_clone(seen_tees)),
2524 metadata: metadata.clone(),
2525 },
2526 }
2527 }
2528
2529 #[cfg(feature = "build")]
2530 pub fn emit_core(
2531 &mut self,
2532 builders_or_callback: &mut BuildersOrCallback<
2533 impl FnMut(&mut HydroRoot, &mut usize),
2534 impl FnMut(&mut HydroNode, &mut usize),
2535 >,
2536 seen_tees: &mut SeenSharedNodes,
2537 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2538 next_stmt_id: &mut usize,
2539 ) -> syn::Ident {
2540 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2541
2542 self.transform_bottom_up(
2543 &mut |node: &mut HydroNode| {
2544 let out_location = node.metadata().location_id.clone();
2545 match node {
2546 HydroNode::Placeholder => {
2547 panic!()
2548 }
2549
2550 HydroNode::Cast { .. } => {
2551 match builders_or_callback {
2554 BuildersOrCallback::Builders(_) => {}
2555 BuildersOrCallback::Callback(_, node_callback) => {
2556 node_callback(node, next_stmt_id);
2557 }
2558 }
2559
2560 *next_stmt_id += 1;
2561 }
2563
2564 HydroNode::ObserveNonDet {
2565 inner,
2566 trusted,
2567 metadata,
2568 ..
2569 } => {
2570 let inner_ident = ident_stack.pop().unwrap();
2571
2572 let observe_ident =
2573 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2574
2575 match builders_or_callback {
2576 BuildersOrCallback::Builders(graph_builders) => {
2577 graph_builders.observe_nondet(
2578 *trusted,
2579 &inner.metadata().location_id,
2580 inner_ident,
2581 &inner.metadata().collection_kind,
2582 &observe_ident,
2583 &metadata.collection_kind,
2584 &metadata.op,
2585 );
2586 }
2587 BuildersOrCallback::Callback(_, node_callback) => {
2588 node_callback(node, next_stmt_id);
2589 }
2590 }
2591
2592 *next_stmt_id += 1;
2593
2594 ident_stack.push(observe_ident);
2595 }
2596
2597 HydroNode::Batch {
2598 inner, metadata, ..
2599 } => {
2600 let inner_ident = ident_stack.pop().unwrap();
2601
2602 let batch_ident =
2603 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2604
2605 match builders_or_callback {
2606 BuildersOrCallback::Builders(graph_builders) => {
2607 graph_builders.batch(
2608 inner_ident,
2609 &inner.metadata().location_id,
2610 &inner.metadata().collection_kind,
2611 &batch_ident,
2612 &out_location,
2613 &metadata.op,
2614 );
2615 }
2616 BuildersOrCallback::Callback(_, node_callback) => {
2617 node_callback(node, next_stmt_id);
2618 }
2619 }
2620
2621 *next_stmt_id += 1;
2622
2623 ident_stack.push(batch_ident);
2624 }
2625
2626 HydroNode::YieldConcat { inner, .. } => {
2627 let inner_ident = ident_stack.pop().unwrap();
2628
2629 let yield_ident =
2630 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2631
2632 match builders_or_callback {
2633 BuildersOrCallback::Builders(graph_builders) => {
2634 graph_builders.yield_from_tick(
2635 inner_ident,
2636 &inner.metadata().location_id,
2637 &inner.metadata().collection_kind,
2638 &yield_ident,
2639 &out_location,
2640 );
2641 }
2642 BuildersOrCallback::Callback(_, node_callback) => {
2643 node_callback(node, next_stmt_id);
2644 }
2645 }
2646
2647 *next_stmt_id += 1;
2648
2649 ident_stack.push(yield_ident);
2650 }
2651
2652 HydroNode::BeginAtomic { inner, metadata } => {
2653 let inner_ident = ident_stack.pop().unwrap();
2654
2655 let begin_ident =
2656 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2657
2658 match builders_or_callback {
2659 BuildersOrCallback::Builders(graph_builders) => {
2660 graph_builders.begin_atomic(
2661 inner_ident,
2662 &inner.metadata().location_id,
2663 &inner.metadata().collection_kind,
2664 &begin_ident,
2665 &out_location,
2666 &metadata.op,
2667 );
2668 }
2669 BuildersOrCallback::Callback(_, node_callback) => {
2670 node_callback(node, next_stmt_id);
2671 }
2672 }
2673
2674 *next_stmt_id += 1;
2675
2676 ident_stack.push(begin_ident);
2677 }
2678
2679 HydroNode::EndAtomic { inner, .. } => {
2680 let inner_ident = ident_stack.pop().unwrap();
2681
2682 let end_ident =
2683 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2684
2685 match builders_or_callback {
2686 BuildersOrCallback::Builders(graph_builders) => {
2687 graph_builders.end_atomic(
2688 inner_ident,
2689 &inner.metadata().location_id,
2690 &inner.metadata().collection_kind,
2691 &end_ident,
2692 );
2693 }
2694 BuildersOrCallback::Callback(_, node_callback) => {
2695 node_callback(node, next_stmt_id);
2696 }
2697 }
2698
2699 *next_stmt_id += 1;
2700
2701 ident_stack.push(end_ident);
2702 }
2703
2704 HydroNode::Source {
2705 source, metadata, ..
2706 } => {
2707 if let HydroSource::ExternalNetwork() = source {
2708 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2709 } else {
2710 let source_ident =
2711 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2712
2713 let source_stmt = match source {
2714 HydroSource::Stream(expr) => {
2715 debug_assert!(metadata.location_id.is_top_level());
2716 parse_quote! {
2717 #source_ident = source_stream(#expr);
2718 }
2719 }
2720
2721 HydroSource::ExternalNetwork() => {
2722 unreachable!()
2723 }
2724
2725 HydroSource::Iter(expr) => {
2726 if metadata.location_id.is_top_level() {
2727 parse_quote! {
2728 #source_ident = source_iter(#expr);
2729 }
2730 } else {
2731 parse_quote! {
2733 #source_ident = source_iter(#expr) -> persist::<'static>();
2734 }
2735 }
2736 }
2737
2738 HydroSource::Spin() => {
2739 debug_assert!(metadata.location_id.is_top_level());
2740 parse_quote! {
2741 #source_ident = spin();
2742 }
2743 }
2744
2745 HydroSource::ClusterMembers(target_loc, state) => {
2746 debug_assert!(metadata.location_id.is_top_level());
2747
2748 let members_tee_ident = syn::Ident::new(
2749 &format!(
2750 "__cluster_members_tee_{}_{}",
2751 metadata.location_id.root().key(),
2752 target_loc.key(),
2753 ),
2754 Span::call_site(),
2755 );
2756
2757 match state {
2758 ClusterMembersState::Stream(d) => {
2759 parse_quote! {
2760 #members_tee_ident = source_stream(#d) -> tee();
2761 #source_ident = #members_tee_ident;
2762 }
2763 },
2764 ClusterMembersState::Uninit => syn::parse_quote! {
2765 #source_ident = source_stream(DUMMY);
2766 },
2767 ClusterMembersState::Tee(..) => parse_quote! {
2768 #source_ident = #members_tee_ident;
2769 },
2770 }
2771 }
2772
2773 HydroSource::Embedded(ident) => {
2774 parse_quote! {
2775 #source_ident = source_stream(#ident);
2776 }
2777 }
2778
2779 HydroSource::EmbeddedSingleton(ident) => {
2780 parse_quote! {
2781 #source_ident = source_iter([#ident]);
2782 }
2783 }
2784 };
2785
2786 match builders_or_callback {
2787 BuildersOrCallback::Builders(graph_builders) => {
2788 let builder = graph_builders.get_dfir_mut(&out_location);
2789 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2790 }
2791 BuildersOrCallback::Callback(_, node_callback) => {
2792 node_callback(node, next_stmt_id);
2793 }
2794 }
2795
2796 *next_stmt_id += 1;
2797
2798 ident_stack.push(source_ident);
2799 }
2800 }
2801
2802 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2803 let source_ident =
2804 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2805
2806 match builders_or_callback {
2807 BuildersOrCallback::Builders(graph_builders) => {
2808 let builder = graph_builders.get_dfir_mut(&out_location);
2809
2810 if *first_tick_only {
2811 assert!(
2812 !metadata.location_id.is_top_level(),
2813 "first_tick_only SingletonSource must be inside a tick"
2814 );
2815 }
2816
2817 if *first_tick_only
2818 || (metadata.location_id.is_top_level()
2819 && metadata.collection_kind.is_bounded())
2820 {
2821 builder.add_dfir(
2822 parse_quote! {
2823 #source_ident = source_iter([#value]);
2824 },
2825 None,
2826 Some(&next_stmt_id.to_string()),
2827 );
2828 } else {
2829 builder.add_dfir(
2830 parse_quote! {
2831 #source_ident = source_iter([#value]) -> persist::<'static>();
2832 },
2833 None,
2834 Some(&next_stmt_id.to_string()),
2835 );
2836 }
2837 }
2838 BuildersOrCallback::Callback(_, node_callback) => {
2839 node_callback(node, next_stmt_id);
2840 }
2841 }
2842
2843 *next_stmt_id += 1;
2844
2845 ident_stack.push(source_ident);
2846 }
2847
2848 HydroNode::CycleSource { cycle_id, .. } => {
2849 let ident = cycle_id.as_ident();
2850
2851 match builders_or_callback {
2852 BuildersOrCallback::Builders(_) => {}
2853 BuildersOrCallback::Callback(_, node_callback) => {
2854 node_callback(node, next_stmt_id);
2855 }
2856 }
2857
2858 *next_stmt_id += 1;
2860
2861 ident_stack.push(ident);
2862 }
2863
2864 HydroNode::Tee { inner, .. } => {
2865 let ret_ident = if let Some(built_idents) =
2866 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2867 {
2868 match builders_or_callback {
2869 BuildersOrCallback::Builders(_) => {}
2870 BuildersOrCallback::Callback(_, node_callback) => {
2871 node_callback(node, next_stmt_id);
2872 }
2873 }
2874
2875 built_idents[0].clone()
2876 } else {
2877 let inner_ident = ident_stack.pop().unwrap();
2880
2881 let tee_ident =
2882 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2883
2884 built_tees.insert(
2885 inner.0.as_ref() as *const RefCell<HydroNode>,
2886 vec![tee_ident.clone()],
2887 );
2888
2889 match builders_or_callback {
2890 BuildersOrCallback::Builders(graph_builders) => {
2891 let builder = graph_builders.get_dfir_mut(&out_location);
2892 builder.add_dfir(
2893 parse_quote! {
2894 #tee_ident = #inner_ident -> tee();
2895 },
2896 None,
2897 Some(&next_stmt_id.to_string()),
2898 );
2899 }
2900 BuildersOrCallback::Callback(_, node_callback) => {
2901 node_callback(node, next_stmt_id);
2902 }
2903 }
2904
2905 tee_ident
2906 };
2907
2908 *next_stmt_id += 1;
2912 ident_stack.push(ret_ident);
2913 }
2914
2915 HydroNode::Partition {
2916 inner, f, is_true, ..
2917 } => {
2918 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
2920 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
2921 match builders_or_callback {
2922 BuildersOrCallback::Builders(_) => {}
2923 BuildersOrCallback::Callback(_, node_callback) => {
2924 node_callback(node, next_stmt_id);
2925 }
2926 }
2927
2928 let idx = if is_true { 0 } else { 1 };
2929 built_idents[idx].clone()
2930 } else {
2931 let inner_ident = ident_stack.pop().unwrap();
2934
2935 let partition_ident = syn::Ident::new(
2936 &format!("stream_{}_partition", *next_stmt_id),
2937 Span::call_site(),
2938 );
2939 let true_ident = syn::Ident::new(
2940 &format!("stream_{}_true", *next_stmt_id),
2941 Span::call_site(),
2942 );
2943 let false_ident = syn::Ident::new(
2944 &format!("stream_{}_false", *next_stmt_id),
2945 Span::call_site(),
2946 );
2947
2948 built_tees.insert(
2949 ptr,
2950 vec![true_ident.clone(), false_ident.clone()],
2951 );
2952
2953 match builders_or_callback {
2954 BuildersOrCallback::Builders(graph_builders) => {
2955 let builder = graph_builders.get_dfir_mut(&out_location);
2956 builder.add_dfir(
2957 parse_quote! {
2958 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
2959 #true_ident = #partition_ident[0];
2960 #false_ident = #partition_ident[1];
2961 },
2962 None,
2963 Some(&next_stmt_id.to_string()),
2964 );
2965 }
2966 BuildersOrCallback::Callback(_, node_callback) => {
2967 node_callback(node, next_stmt_id);
2968 }
2969 }
2970
2971 if is_true { true_ident } else { false_ident }
2972 };
2973
2974 *next_stmt_id += 1;
2975 ident_stack.push(ret_ident);
2976 }
2977
2978 HydroNode::Chain { .. } => {
2979 let second_ident = ident_stack.pop().unwrap();
2981 let first_ident = ident_stack.pop().unwrap();
2982
2983 let chain_ident =
2984 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2985
2986 match builders_or_callback {
2987 BuildersOrCallback::Builders(graph_builders) => {
2988 let builder = graph_builders.get_dfir_mut(&out_location);
2989 builder.add_dfir(
2990 parse_quote! {
2991 #chain_ident = chain();
2992 #first_ident -> [0]#chain_ident;
2993 #second_ident -> [1]#chain_ident;
2994 },
2995 None,
2996 Some(&next_stmt_id.to_string()),
2997 );
2998 }
2999 BuildersOrCallback::Callback(_, node_callback) => {
3000 node_callback(node, next_stmt_id);
3001 }
3002 }
3003
3004 *next_stmt_id += 1;
3005
3006 ident_stack.push(chain_ident);
3007 }
3008
3009 HydroNode::ChainFirst { .. } => {
3010 let second_ident = ident_stack.pop().unwrap();
3011 let first_ident = ident_stack.pop().unwrap();
3012
3013 let chain_ident =
3014 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3015
3016 match builders_or_callback {
3017 BuildersOrCallback::Builders(graph_builders) => {
3018 let builder = graph_builders.get_dfir_mut(&out_location);
3019 builder.add_dfir(
3020 parse_quote! {
3021 #chain_ident = chain_first_n(1);
3022 #first_ident -> [0]#chain_ident;
3023 #second_ident -> [1]#chain_ident;
3024 },
3025 None,
3026 Some(&next_stmt_id.to_string()),
3027 );
3028 }
3029 BuildersOrCallback::Callback(_, node_callback) => {
3030 node_callback(node, next_stmt_id);
3031 }
3032 }
3033
3034 *next_stmt_id += 1;
3035
3036 ident_stack.push(chain_ident);
3037 }
3038
3039 HydroNode::CrossSingleton { right, .. } => {
3040 let right_ident = ident_stack.pop().unwrap();
3041 let left_ident = ident_stack.pop().unwrap();
3042
3043 let cross_ident =
3044 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3045
3046 match builders_or_callback {
3047 BuildersOrCallback::Builders(graph_builders) => {
3048 let builder = graph_builders.get_dfir_mut(&out_location);
3049
3050 if right.metadata().location_id.is_top_level()
3051 && right.metadata().collection_kind.is_bounded()
3052 {
3053 builder.add_dfir(
3054 parse_quote! {
3055 #cross_ident = cross_singleton();
3056 #left_ident -> [input]#cross_ident;
3057 #right_ident -> persist::<'static>() -> [single]#cross_ident;
3058 },
3059 None,
3060 Some(&next_stmt_id.to_string()),
3061 );
3062 } else {
3063 builder.add_dfir(
3064 parse_quote! {
3065 #cross_ident = cross_singleton();
3066 #left_ident -> [input]#cross_ident;
3067 #right_ident -> [single]#cross_ident;
3068 },
3069 None,
3070 Some(&next_stmt_id.to_string()),
3071 );
3072 }
3073 }
3074 BuildersOrCallback::Callback(_, node_callback) => {
3075 node_callback(node, next_stmt_id);
3076 }
3077 }
3078
3079 *next_stmt_id += 1;
3080
3081 ident_stack.push(cross_ident);
3082 }
3083
3084 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3085 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3086 parse_quote!(cross_join_multiset)
3087 } else {
3088 parse_quote!(join_multiset)
3089 };
3090
3091 let (HydroNode::CrossProduct { left, right, .. }
3092 | HydroNode::Join { left, right, .. }) = node
3093 else {
3094 unreachable!()
3095 };
3096
3097 let is_top_level = left.metadata().location_id.is_top_level()
3098 && right.metadata().location_id.is_top_level();
3099 let left_lifetime = if left.metadata().location_id.is_top_level() {
3100 quote!('static)
3101 } else {
3102 quote!('tick)
3103 };
3104
3105 let right_lifetime = if right.metadata().location_id.is_top_level() {
3106 quote!('static)
3107 } else {
3108 quote!('tick)
3109 };
3110
3111 let right_ident = ident_stack.pop().unwrap();
3112 let left_ident = ident_stack.pop().unwrap();
3113
3114 let stream_ident =
3115 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3116
3117 match builders_or_callback {
3118 BuildersOrCallback::Builders(graph_builders) => {
3119 let builder = graph_builders.get_dfir_mut(&out_location);
3120 builder.add_dfir(
3121 if is_top_level {
3122 parse_quote! {
3125 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3126 #left_ident -> [0]#stream_ident;
3127 #right_ident -> [1]#stream_ident;
3128 }
3129 } else {
3130 parse_quote! {
3131 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3132 #left_ident -> [0]#stream_ident;
3133 #right_ident -> [1]#stream_ident;
3134 }
3135 }
3136 ,
3137 None,
3138 Some(&next_stmt_id.to_string()),
3139 );
3140 }
3141 BuildersOrCallback::Callback(_, node_callback) => {
3142 node_callback(node, next_stmt_id);
3143 }
3144 }
3145
3146 *next_stmt_id += 1;
3147
3148 ident_stack.push(stream_ident);
3149 }
3150
3151 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3152 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3153 parse_quote!(difference)
3154 } else {
3155 parse_quote!(anti_join)
3156 };
3157
3158 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3159 node
3160 else {
3161 unreachable!()
3162 };
3163
3164 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3165 quote!('static)
3166 } else {
3167 quote!('tick)
3168 };
3169
3170 let neg_ident = ident_stack.pop().unwrap();
3171 let pos_ident = ident_stack.pop().unwrap();
3172
3173 let stream_ident =
3174 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3175
3176 match builders_or_callback {
3177 BuildersOrCallback::Builders(graph_builders) => {
3178 let builder = graph_builders.get_dfir_mut(&out_location);
3179 builder.add_dfir(
3180 parse_quote! {
3181 #stream_ident = #operator::<'tick, #neg_lifetime>();
3182 #pos_ident -> [pos]#stream_ident;
3183 #neg_ident -> [neg]#stream_ident;
3184 },
3185 None,
3186 Some(&next_stmt_id.to_string()),
3187 );
3188 }
3189 BuildersOrCallback::Callback(_, node_callback) => {
3190 node_callback(node, next_stmt_id);
3191 }
3192 }
3193
3194 *next_stmt_id += 1;
3195
3196 ident_stack.push(stream_ident);
3197 }
3198
3199 HydroNode::ResolveFutures { .. } => {
3200 let input_ident = ident_stack.pop().unwrap();
3201
3202 let futures_ident =
3203 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3204
3205 match builders_or_callback {
3206 BuildersOrCallback::Builders(graph_builders) => {
3207 let builder = graph_builders.get_dfir_mut(&out_location);
3208 builder.add_dfir(
3209 parse_quote! {
3210 #futures_ident = #input_ident -> resolve_futures();
3211 },
3212 None,
3213 Some(&next_stmt_id.to_string()),
3214 );
3215 }
3216 BuildersOrCallback::Callback(_, node_callback) => {
3217 node_callback(node, next_stmt_id);
3218 }
3219 }
3220
3221 *next_stmt_id += 1;
3222
3223 ident_stack.push(futures_ident);
3224 }
3225
3226 HydroNode::ResolveFuturesBlocking { .. } => {
3227 let input_ident = ident_stack.pop().unwrap();
3228
3229 let futures_ident =
3230 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3231
3232 match builders_or_callback {
3233 BuildersOrCallback::Builders(graph_builders) => {
3234 let builder = graph_builders.get_dfir_mut(&out_location);
3235 builder.add_dfir(
3236 parse_quote! {
3237 #futures_ident = #input_ident -> resolve_futures_blocking();
3238 },
3239 None,
3240 Some(&next_stmt_id.to_string()),
3241 );
3242 }
3243 BuildersOrCallback::Callback(_, node_callback) => {
3244 node_callback(node, next_stmt_id);
3245 }
3246 }
3247
3248 *next_stmt_id += 1;
3249
3250 ident_stack.push(futures_ident);
3251 }
3252
3253 HydroNode::ResolveFuturesOrdered { .. } => {
3254 let input_ident = ident_stack.pop().unwrap();
3255
3256 let futures_ident =
3257 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3258
3259 match builders_or_callback {
3260 BuildersOrCallback::Builders(graph_builders) => {
3261 let builder = graph_builders.get_dfir_mut(&out_location);
3262 builder.add_dfir(
3263 parse_quote! {
3264 #futures_ident = #input_ident -> resolve_futures_ordered();
3265 },
3266 None,
3267 Some(&next_stmt_id.to_string()),
3268 );
3269 }
3270 BuildersOrCallback::Callback(_, node_callback) => {
3271 node_callback(node, next_stmt_id);
3272 }
3273 }
3274
3275 *next_stmt_id += 1;
3276
3277 ident_stack.push(futures_ident);
3278 }
3279
3280 HydroNode::Map { f, .. } => {
3281 let input_ident = ident_stack.pop().unwrap();
3282
3283 let map_ident =
3284 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3285
3286 match builders_or_callback {
3287 BuildersOrCallback::Builders(graph_builders) => {
3288 let builder = graph_builders.get_dfir_mut(&out_location);
3289 builder.add_dfir(
3290 parse_quote! {
3291 #map_ident = #input_ident -> map(#f);
3292 },
3293 None,
3294 Some(&next_stmt_id.to_string()),
3295 );
3296 }
3297 BuildersOrCallback::Callback(_, node_callback) => {
3298 node_callback(node, next_stmt_id);
3299 }
3300 }
3301
3302 *next_stmt_id += 1;
3303
3304 ident_stack.push(map_ident);
3305 }
3306
3307 HydroNode::FlatMap { f, .. } => {
3308 let input_ident = ident_stack.pop().unwrap();
3309
3310 let flat_map_ident =
3311 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3312
3313 match builders_or_callback {
3314 BuildersOrCallback::Builders(graph_builders) => {
3315 let builder = graph_builders.get_dfir_mut(&out_location);
3316 builder.add_dfir(
3317 parse_quote! {
3318 #flat_map_ident = #input_ident -> flat_map(#f);
3319 },
3320 None,
3321 Some(&next_stmt_id.to_string()),
3322 );
3323 }
3324 BuildersOrCallback::Callback(_, node_callback) => {
3325 node_callback(node, next_stmt_id);
3326 }
3327 }
3328
3329 *next_stmt_id += 1;
3330
3331 ident_stack.push(flat_map_ident);
3332 }
3333
3334 HydroNode::FlatMapStreamBlocking { f, .. } => {
3335 let input_ident = ident_stack.pop().unwrap();
3336
3337 let flat_map_stream_blocking_ident =
3338 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3339
3340 match builders_or_callback {
3341 BuildersOrCallback::Builders(graph_builders) => {
3342 let builder = graph_builders.get_dfir_mut(&out_location);
3343 builder.add_dfir(
3344 parse_quote! {
3345 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f);
3346 },
3347 None,
3348 Some(&next_stmt_id.to_string()),
3349 );
3350 }
3351 BuildersOrCallback::Callback(_, node_callback) => {
3352 node_callback(node, next_stmt_id);
3353 }
3354 }
3355
3356 *next_stmt_id += 1;
3357
3358 ident_stack.push(flat_map_stream_blocking_ident);
3359 }
3360
3361 HydroNode::Filter { f, .. } => {
3362 let input_ident = ident_stack.pop().unwrap();
3363
3364 let filter_ident =
3365 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3366
3367 match builders_or_callback {
3368 BuildersOrCallback::Builders(graph_builders) => {
3369 let builder = graph_builders.get_dfir_mut(&out_location);
3370 builder.add_dfir(
3371 parse_quote! {
3372 #filter_ident = #input_ident -> filter(#f);
3373 },
3374 None,
3375 Some(&next_stmt_id.to_string()),
3376 );
3377 }
3378 BuildersOrCallback::Callback(_, node_callback) => {
3379 node_callback(node, next_stmt_id);
3380 }
3381 }
3382
3383 *next_stmt_id += 1;
3384
3385 ident_stack.push(filter_ident);
3386 }
3387
3388 HydroNode::FilterMap { f, .. } => {
3389 let input_ident = ident_stack.pop().unwrap();
3390
3391 let filter_map_ident =
3392 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3393
3394 match builders_or_callback {
3395 BuildersOrCallback::Builders(graph_builders) => {
3396 let builder = graph_builders.get_dfir_mut(&out_location);
3397 builder.add_dfir(
3398 parse_quote! {
3399 #filter_map_ident = #input_ident -> filter_map(#f);
3400 },
3401 None,
3402 Some(&next_stmt_id.to_string()),
3403 );
3404 }
3405 BuildersOrCallback::Callback(_, node_callback) => {
3406 node_callback(node, next_stmt_id);
3407 }
3408 }
3409
3410 *next_stmt_id += 1;
3411
3412 ident_stack.push(filter_map_ident);
3413 }
3414
3415 HydroNode::Sort { .. } => {
3416 let input_ident = ident_stack.pop().unwrap();
3417
3418 let sort_ident =
3419 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3420
3421 match builders_or_callback {
3422 BuildersOrCallback::Builders(graph_builders) => {
3423 let builder = graph_builders.get_dfir_mut(&out_location);
3424 builder.add_dfir(
3425 parse_quote! {
3426 #sort_ident = #input_ident -> sort();
3427 },
3428 None,
3429 Some(&next_stmt_id.to_string()),
3430 );
3431 }
3432 BuildersOrCallback::Callback(_, node_callback) => {
3433 node_callback(node, next_stmt_id);
3434 }
3435 }
3436
3437 *next_stmt_id += 1;
3438
3439 ident_stack.push(sort_ident);
3440 }
3441
3442 HydroNode::DeferTick { .. } => {
3443 let input_ident = ident_stack.pop().unwrap();
3444
3445 let defer_tick_ident =
3446 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3447
3448 match builders_or_callback {
3449 BuildersOrCallback::Builders(graph_builders) => {
3450 let builder = graph_builders.get_dfir_mut(&out_location);
3451 builder.add_dfir(
3452 parse_quote! {
3453 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3454 },
3455 None,
3456 Some(&next_stmt_id.to_string()),
3457 );
3458 }
3459 BuildersOrCallback::Callback(_, node_callback) => {
3460 node_callback(node, next_stmt_id);
3461 }
3462 }
3463
3464 *next_stmt_id += 1;
3465
3466 ident_stack.push(defer_tick_ident);
3467 }
3468
3469 HydroNode::Enumerate { input, .. } => {
3470 let input_ident = ident_stack.pop().unwrap();
3471
3472 let enumerate_ident =
3473 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3474
3475 match builders_or_callback {
3476 BuildersOrCallback::Builders(graph_builders) => {
3477 let builder = graph_builders.get_dfir_mut(&out_location);
3478 let lifetime = if input.metadata().location_id.is_top_level() {
3479 quote!('static)
3480 } else {
3481 quote!('tick)
3482 };
3483 builder.add_dfir(
3484 parse_quote! {
3485 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3486 },
3487 None,
3488 Some(&next_stmt_id.to_string()),
3489 );
3490 }
3491 BuildersOrCallback::Callback(_, node_callback) => {
3492 node_callback(node, next_stmt_id);
3493 }
3494 }
3495
3496 *next_stmt_id += 1;
3497
3498 ident_stack.push(enumerate_ident);
3499 }
3500
3501 HydroNode::Inspect { f, .. } => {
3502 let input_ident = ident_stack.pop().unwrap();
3503
3504 let inspect_ident =
3505 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3506
3507 match builders_or_callback {
3508 BuildersOrCallback::Builders(graph_builders) => {
3509 let builder = graph_builders.get_dfir_mut(&out_location);
3510 builder.add_dfir(
3511 parse_quote! {
3512 #inspect_ident = #input_ident -> inspect(#f);
3513 },
3514 None,
3515 Some(&next_stmt_id.to_string()),
3516 );
3517 }
3518 BuildersOrCallback::Callback(_, node_callback) => {
3519 node_callback(node, next_stmt_id);
3520 }
3521 }
3522
3523 *next_stmt_id += 1;
3524
3525 ident_stack.push(inspect_ident);
3526 }
3527
3528 HydroNode::Unique { input, .. } => {
3529 let input_ident = ident_stack.pop().unwrap();
3530
3531 let unique_ident =
3532 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3533
3534 match builders_or_callback {
3535 BuildersOrCallback::Builders(graph_builders) => {
3536 let builder = graph_builders.get_dfir_mut(&out_location);
3537 let lifetime = if input.metadata().location_id.is_top_level() {
3538 quote!('static)
3539 } else {
3540 quote!('tick)
3541 };
3542
3543 builder.add_dfir(
3544 parse_quote! {
3545 #unique_ident = #input_ident -> unique::<#lifetime>();
3546 },
3547 None,
3548 Some(&next_stmt_id.to_string()),
3549 );
3550 }
3551 BuildersOrCallback::Callback(_, node_callback) => {
3552 node_callback(node, next_stmt_id);
3553 }
3554 }
3555
3556 *next_stmt_id += 1;
3557
3558 ident_stack.push(unique_ident);
3559 }
3560
3561 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3562 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3563 if input.metadata().location_id.is_top_level()
3564 && input.metadata().collection_kind.is_bounded()
3565 {
3566 parse_quote!(fold_no_replay)
3567 } else {
3568 parse_quote!(fold)
3569 }
3570 } else if matches!(node, HydroNode::Scan { .. }) {
3571 parse_quote!(scan)
3572 } else if let HydroNode::FoldKeyed { input, .. } = node {
3573 if input.metadata().location_id.is_top_level()
3574 && input.metadata().collection_kind.is_bounded()
3575 {
3576 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3577 } else {
3578 parse_quote!(fold_keyed)
3579 }
3580 } else {
3581 unreachable!()
3582 };
3583
3584 let (HydroNode::Fold { input, .. }
3585 | HydroNode::FoldKeyed { input, .. }
3586 | HydroNode::Scan { input, .. }) = node
3587 else {
3588 unreachable!()
3589 };
3590
3591 let lifetime = if input.metadata().location_id.is_top_level() {
3592 quote!('static)
3593 } else {
3594 quote!('tick)
3595 };
3596
3597 let input_ident = ident_stack.pop().unwrap();
3598
3599 let (HydroNode::Fold { init, acc, .. }
3600 | HydroNode::FoldKeyed { init, acc, .. }
3601 | HydroNode::Scan { init, acc, .. }) = &*node
3602 else {
3603 unreachable!()
3604 };
3605
3606 let fold_ident =
3607 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3608
3609 match builders_or_callback {
3610 BuildersOrCallback::Builders(graph_builders) => {
3611 if matches!(node, HydroNode::Fold { .. })
3612 && node.metadata().location_id.is_top_level()
3613 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3614 && graph_builders.singleton_intermediates()
3615 && !node.metadata().collection_kind.is_bounded()
3616 {
3617 let builder = graph_builders.get_dfir_mut(&out_location);
3618
3619 let acc: syn::Expr = parse_quote!({
3620 let mut __inner = #acc;
3621 move |__state, __value| {
3622 __inner(__state, __value);
3623 Some(__state.clone())
3624 }
3625 });
3626
3627 builder.add_dfir(
3628 parse_quote! {
3629 source_iter([(#init)()]) -> [0]#fold_ident;
3630 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3631 #fold_ident = chain();
3632 },
3633 None,
3634 Some(&next_stmt_id.to_string()),
3635 );
3636 } else if matches!(node, HydroNode::FoldKeyed { .. })
3637 && node.metadata().location_id.is_top_level()
3638 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3639 && graph_builders.singleton_intermediates()
3640 && !node.metadata().collection_kind.is_bounded()
3641 {
3642 let builder = graph_builders.get_dfir_mut(&out_location);
3643
3644 let acc: syn::Expr = parse_quote!({
3645 let mut __init = #init;
3646 let mut __inner = #acc;
3647 move |__state, __kv: (_, _)| {
3648 let __state = __state
3650 .entry(::std::clone::Clone::clone(&__kv.0))
3651 .or_insert_with(|| (__init)());
3652 __inner(__state, __kv.1);
3653 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3654 }
3655 });
3656
3657 builder.add_dfir(
3658 parse_quote! {
3659 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3660 },
3661 None,
3662 Some(&next_stmt_id.to_string()),
3663 );
3664 } else {
3665 let builder = graph_builders.get_dfir_mut(&out_location);
3666 builder.add_dfir(
3667 parse_quote! {
3668 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3669 },
3670 None,
3671 Some(&next_stmt_id.to_string()),
3672 );
3673 }
3674 }
3675 BuildersOrCallback::Callback(_, node_callback) => {
3676 node_callback(node, next_stmt_id);
3677 }
3678 }
3679
3680 *next_stmt_id += 1;
3681
3682 ident_stack.push(fold_ident);
3683 }
3684
3685 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3686 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3687 if input.metadata().location_id.is_top_level()
3688 && input.metadata().collection_kind.is_bounded()
3689 {
3690 parse_quote!(reduce_no_replay)
3691 } else {
3692 parse_quote!(reduce)
3693 }
3694 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3695 if input.metadata().location_id.is_top_level()
3696 && input.metadata().collection_kind.is_bounded()
3697 {
3698 todo!(
3699 "Calling keyed reduce on a top-level bounded collection is not supported"
3700 )
3701 } else {
3702 parse_quote!(reduce_keyed)
3703 }
3704 } else {
3705 unreachable!()
3706 };
3707
3708 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3709 else {
3710 unreachable!()
3711 };
3712
3713 let lifetime = if input.metadata().location_id.is_top_level() {
3714 quote!('static)
3715 } else {
3716 quote!('tick)
3717 };
3718
3719 let input_ident = ident_stack.pop().unwrap();
3720
3721 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3722 else {
3723 unreachable!()
3724 };
3725
3726 let reduce_ident =
3727 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3728
3729 match builders_or_callback {
3730 BuildersOrCallback::Builders(graph_builders) => {
3731 if matches!(node, HydroNode::Reduce { .. })
3732 && node.metadata().location_id.is_top_level()
3733 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3734 && graph_builders.singleton_intermediates()
3735 && !node.metadata().collection_kind.is_bounded()
3736 {
3737 todo!(
3738 "Reduce with optional intermediates is not yet supported in simulator"
3739 );
3740 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3741 && node.metadata().location_id.is_top_level()
3742 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3743 && graph_builders.singleton_intermediates()
3744 && !node.metadata().collection_kind.is_bounded()
3745 {
3746 todo!(
3747 "Reduce keyed with optional intermediates is not yet supported in simulator"
3748 );
3749 } else {
3750 let builder = graph_builders.get_dfir_mut(&out_location);
3751 builder.add_dfir(
3752 parse_quote! {
3753 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3754 },
3755 None,
3756 Some(&next_stmt_id.to_string()),
3757 );
3758 }
3759 }
3760 BuildersOrCallback::Callback(_, node_callback) => {
3761 node_callback(node, next_stmt_id);
3762 }
3763 }
3764
3765 *next_stmt_id += 1;
3766
3767 ident_stack.push(reduce_ident);
3768 }
3769
3770 HydroNode::ReduceKeyedWatermark {
3771 f,
3772 input,
3773 metadata,
3774 ..
3775 } => {
3776 let lifetime = if input.metadata().location_id.is_top_level() {
3777 quote!('static)
3778 } else {
3779 quote!('tick)
3780 };
3781
3782 let watermark_ident = ident_stack.pop().unwrap();
3784 let input_ident = ident_stack.pop().unwrap();
3785
3786 let chain_ident = syn::Ident::new(
3787 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3788 Span::call_site(),
3789 );
3790
3791 let fold_ident =
3792 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3793
3794 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3795 && input.metadata().collection_kind.is_bounded()
3796 {
3797 parse_quote!(fold_no_replay)
3798 } else {
3799 parse_quote!(fold)
3800 };
3801
3802 match builders_or_callback {
3803 BuildersOrCallback::Builders(graph_builders) => {
3804 if metadata.location_id.is_top_level()
3805 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3806 && graph_builders.singleton_intermediates()
3807 && !metadata.collection_kind.is_bounded()
3808 {
3809 todo!(
3810 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3811 )
3812 } else {
3813 let builder = graph_builders.get_dfir_mut(&out_location);
3814 builder.add_dfir(
3815 parse_quote! {
3816 #chain_ident = chain();
3817 #input_ident
3818 -> map(|x| (Some(x), None))
3819 -> [0]#chain_ident;
3820 #watermark_ident
3821 -> map(|watermark| (None, Some(watermark)))
3822 -> [1]#chain_ident;
3823
3824 #fold_ident = #chain_ident
3825 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3826 let __reduce_keyed_fn = #f;
3827 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3828 if let Some((k, v)) = opt_payload {
3829 if let Some(curr_watermark) = *opt_curr_watermark {
3830 if k < curr_watermark {
3831 return;
3832 }
3833 }
3834 match map.entry(k) {
3835 ::std::collections::hash_map::Entry::Vacant(e) => {
3836 e.insert(v);
3837 }
3838 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3839 __reduce_keyed_fn(e.get_mut(), v);
3840 }
3841 }
3842 } else {
3843 let watermark = opt_watermark.unwrap();
3844 if let Some(curr_watermark) = *opt_curr_watermark {
3845 if watermark <= curr_watermark {
3846 return;
3847 }
3848 }
3849 *opt_curr_watermark = opt_watermark;
3850 map.retain(|k, _| *k >= watermark);
3851 }
3852 }
3853 })
3854 -> flat_map(|(map, _curr_watermark)| map);
3855 },
3856 None,
3857 Some(&next_stmt_id.to_string()),
3858 );
3859 }
3860 }
3861 BuildersOrCallback::Callback(_, node_callback) => {
3862 node_callback(node, next_stmt_id);
3863 }
3864 }
3865
3866 *next_stmt_id += 1;
3867
3868 ident_stack.push(fold_ident);
3869 }
3870
3871 HydroNode::Network {
3872 networking_info,
3873 serialize_fn: serialize_pipeline,
3874 instantiate_fn,
3875 deserialize_fn: deserialize_pipeline,
3876 input,
3877 ..
3878 } => {
3879 let input_ident = ident_stack.pop().unwrap();
3880
3881 let receiver_stream_ident =
3882 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3883
3884 match builders_or_callback {
3885 BuildersOrCallback::Builders(graph_builders) => {
3886 let (sink_expr, source_expr) = match instantiate_fn {
3887 DebugInstantiate::Building => (
3888 syn::parse_quote!(DUMMY_SINK),
3889 syn::parse_quote!(DUMMY_SOURCE),
3890 ),
3891
3892 DebugInstantiate::Finalized(finalized) => {
3893 (finalized.sink.clone(), finalized.source.clone())
3894 }
3895 };
3896
3897 graph_builders.create_network(
3898 &input.metadata().location_id,
3899 &out_location,
3900 input_ident,
3901 &receiver_stream_ident,
3902 serialize_pipeline.as_ref(),
3903 sink_expr,
3904 source_expr,
3905 deserialize_pipeline.as_ref(),
3906 *next_stmt_id,
3907 networking_info,
3908 );
3909 }
3910 BuildersOrCallback::Callback(_, node_callback) => {
3911 node_callback(node, next_stmt_id);
3912 }
3913 }
3914
3915 *next_stmt_id += 1;
3916
3917 ident_stack.push(receiver_stream_ident);
3918 }
3919
3920 HydroNode::ExternalInput {
3921 instantiate_fn,
3922 deserialize_fn: deserialize_pipeline,
3923 ..
3924 } => {
3925 let receiver_stream_ident =
3926 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3927
3928 match builders_or_callback {
3929 BuildersOrCallback::Builders(graph_builders) => {
3930 let (_, source_expr) = match instantiate_fn {
3931 DebugInstantiate::Building => (
3932 syn::parse_quote!(DUMMY_SINK),
3933 syn::parse_quote!(DUMMY_SOURCE),
3934 ),
3935
3936 DebugInstantiate::Finalized(finalized) => {
3937 (finalized.sink.clone(), finalized.source.clone())
3938 }
3939 };
3940
3941 graph_builders.create_external_source(
3942 &out_location,
3943 source_expr,
3944 &receiver_stream_ident,
3945 deserialize_pipeline.as_ref(),
3946 *next_stmt_id,
3947 );
3948 }
3949 BuildersOrCallback::Callback(_, node_callback) => {
3950 node_callback(node, next_stmt_id);
3951 }
3952 }
3953
3954 *next_stmt_id += 1;
3955
3956 ident_stack.push(receiver_stream_ident);
3957 }
3958
3959 HydroNode::Counter {
3960 tag,
3961 duration,
3962 prefix,
3963 ..
3964 } => {
3965 let input_ident = ident_stack.pop().unwrap();
3966
3967 let counter_ident =
3968 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3969
3970 match builders_or_callback {
3971 BuildersOrCallback::Builders(graph_builders) => {
3972 let arg = format!("{}({})", prefix, tag);
3973 let builder = graph_builders.get_dfir_mut(&out_location);
3974 builder.add_dfir(
3975 parse_quote! {
3976 #counter_ident = #input_ident -> _counter(#arg, #duration);
3977 },
3978 None,
3979 Some(&next_stmt_id.to_string()),
3980 );
3981 }
3982 BuildersOrCallback::Callback(_, node_callback) => {
3983 node_callback(node, next_stmt_id);
3984 }
3985 }
3986
3987 *next_stmt_id += 1;
3988
3989 ident_stack.push(counter_ident);
3990 }
3991 }
3992 },
3993 seen_tees,
3994 false,
3995 );
3996
3997 ident_stack
3998 .pop()
3999 .expect("ident_stack should have exactly one element after traversal")
4000 }
4001
4002 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4003 match self {
4004 HydroNode::Placeholder => {
4005 panic!()
4006 }
4007 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4008 HydroNode::Source { source, .. } => match source {
4009 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4010 HydroSource::ExternalNetwork()
4011 | HydroSource::Spin()
4012 | HydroSource::ClusterMembers(_, _)
4013 | HydroSource::Embedded(_)
4014 | HydroSource::EmbeddedSingleton(_) => {} },
4016 HydroNode::SingletonSource { value, .. } => {
4017 transform(value);
4018 }
4019 HydroNode::CycleSource { .. }
4020 | HydroNode::Tee { .. }
4021 | HydroNode::YieldConcat { .. }
4022 | HydroNode::BeginAtomic { .. }
4023 | HydroNode::EndAtomic { .. }
4024 | HydroNode::Batch { .. }
4025 | HydroNode::Chain { .. }
4026 | HydroNode::ChainFirst { .. }
4027 | HydroNode::CrossProduct { .. }
4028 | HydroNode::CrossSingleton { .. }
4029 | HydroNode::ResolveFutures { .. }
4030 | HydroNode::ResolveFuturesBlocking { .. }
4031 | HydroNode::ResolveFuturesOrdered { .. }
4032 | HydroNode::Join { .. }
4033 | HydroNode::Difference { .. }
4034 | HydroNode::AntiJoin { .. }
4035 | HydroNode::DeferTick { .. }
4036 | HydroNode::Enumerate { .. }
4037 | HydroNode::Unique { .. }
4038 | HydroNode::Sort { .. } => {}
4039 HydroNode::Map { f, .. }
4040 | HydroNode::FlatMap { f, .. }
4041 | HydroNode::FlatMapStreamBlocking { f, .. }
4042 | HydroNode::Filter { f, .. }
4043 | HydroNode::FilterMap { f, .. }
4044 | HydroNode::Inspect { f, .. }
4045 | HydroNode::Partition { f, .. }
4046 | HydroNode::Reduce { f, .. }
4047 | HydroNode::ReduceKeyed { f, .. }
4048 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4049 transform(f);
4050 }
4051 HydroNode::Fold { init, acc, .. }
4052 | HydroNode::Scan { init, acc, .. }
4053 | HydroNode::FoldKeyed { init, acc, .. } => {
4054 transform(init);
4055 transform(acc);
4056 }
4057 HydroNode::Network {
4058 serialize_fn,
4059 deserialize_fn,
4060 ..
4061 } => {
4062 if let Some(serialize_fn) = serialize_fn {
4063 transform(serialize_fn);
4064 }
4065 if let Some(deserialize_fn) = deserialize_fn {
4066 transform(deserialize_fn);
4067 }
4068 }
4069 HydroNode::ExternalInput { deserialize_fn, .. } => {
4070 if let Some(deserialize_fn) = deserialize_fn {
4071 transform(deserialize_fn);
4072 }
4073 }
4074 HydroNode::Counter { duration, .. } => {
4075 transform(duration);
4076 }
4077 }
4078 }
4079
4080 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4081 &self.metadata().op
4082 }
4083
4084 pub fn metadata(&self) -> &HydroIrMetadata {
4085 match self {
4086 HydroNode::Placeholder => {
4087 panic!()
4088 }
4089 HydroNode::Cast { metadata, .. } => metadata,
4090 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4091 HydroNode::Source { metadata, .. } => metadata,
4092 HydroNode::SingletonSource { metadata, .. } => metadata,
4093 HydroNode::CycleSource { metadata, .. } => metadata,
4094 HydroNode::Tee { metadata, .. } => metadata,
4095 HydroNode::Partition { metadata, .. } => metadata,
4096 HydroNode::YieldConcat { metadata, .. } => metadata,
4097 HydroNode::BeginAtomic { metadata, .. } => metadata,
4098 HydroNode::EndAtomic { metadata, .. } => metadata,
4099 HydroNode::Batch { metadata, .. } => metadata,
4100 HydroNode::Chain { metadata, .. } => metadata,
4101 HydroNode::ChainFirst { metadata, .. } => metadata,
4102 HydroNode::CrossProduct { metadata, .. } => metadata,
4103 HydroNode::CrossSingleton { metadata, .. } => metadata,
4104 HydroNode::Join { metadata, .. } => metadata,
4105 HydroNode::Difference { metadata, .. } => metadata,
4106 HydroNode::AntiJoin { metadata, .. } => metadata,
4107 HydroNode::ResolveFutures { metadata, .. } => metadata,
4108 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4109 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4110 HydroNode::Map { metadata, .. } => metadata,
4111 HydroNode::FlatMap { metadata, .. } => metadata,
4112 HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4113 HydroNode::Filter { metadata, .. } => metadata,
4114 HydroNode::FilterMap { metadata, .. } => metadata,
4115 HydroNode::DeferTick { metadata, .. } => metadata,
4116 HydroNode::Enumerate { metadata, .. } => metadata,
4117 HydroNode::Inspect { metadata, .. } => metadata,
4118 HydroNode::Unique { metadata, .. } => metadata,
4119 HydroNode::Sort { metadata, .. } => metadata,
4120 HydroNode::Scan { metadata, .. } => metadata,
4121 HydroNode::Fold { metadata, .. } => metadata,
4122 HydroNode::FoldKeyed { metadata, .. } => metadata,
4123 HydroNode::Reduce { metadata, .. } => metadata,
4124 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4125 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4126 HydroNode::ExternalInput { metadata, .. } => metadata,
4127 HydroNode::Network { metadata, .. } => metadata,
4128 HydroNode::Counter { metadata, .. } => metadata,
4129 }
4130 }
4131
4132 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4133 &mut self.metadata_mut().op
4134 }
4135
4136 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4137 match self {
4138 HydroNode::Placeholder => {
4139 panic!()
4140 }
4141 HydroNode::Cast { metadata, .. } => metadata,
4142 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4143 HydroNode::Source { metadata, .. } => metadata,
4144 HydroNode::SingletonSource { metadata, .. } => metadata,
4145 HydroNode::CycleSource { metadata, .. } => metadata,
4146 HydroNode::Tee { metadata, .. } => metadata,
4147 HydroNode::Partition { metadata, .. } => metadata,
4148 HydroNode::YieldConcat { metadata, .. } => metadata,
4149 HydroNode::BeginAtomic { metadata, .. } => metadata,
4150 HydroNode::EndAtomic { metadata, .. } => metadata,
4151 HydroNode::Batch { metadata, .. } => metadata,
4152 HydroNode::Chain { metadata, .. } => metadata,
4153 HydroNode::ChainFirst { metadata, .. } => metadata,
4154 HydroNode::CrossProduct { metadata, .. } => metadata,
4155 HydroNode::CrossSingleton { metadata, .. } => metadata,
4156 HydroNode::Join { metadata, .. } => metadata,
4157 HydroNode::Difference { metadata, .. } => metadata,
4158 HydroNode::AntiJoin { metadata, .. } => metadata,
4159 HydroNode::ResolveFutures { metadata, .. } => metadata,
4160 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4161 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4162 HydroNode::Map { metadata, .. } => metadata,
4163 HydroNode::FlatMap { metadata, .. } => metadata,
4164 HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4165 HydroNode::Filter { metadata, .. } => metadata,
4166 HydroNode::FilterMap { metadata, .. } => metadata,
4167 HydroNode::DeferTick { metadata, .. } => metadata,
4168 HydroNode::Enumerate { metadata, .. } => metadata,
4169 HydroNode::Inspect { metadata, .. } => metadata,
4170 HydroNode::Unique { metadata, .. } => metadata,
4171 HydroNode::Sort { metadata, .. } => metadata,
4172 HydroNode::Scan { metadata, .. } => metadata,
4173 HydroNode::Fold { metadata, .. } => metadata,
4174 HydroNode::FoldKeyed { metadata, .. } => metadata,
4175 HydroNode::Reduce { metadata, .. } => metadata,
4176 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4177 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4178 HydroNode::ExternalInput { metadata, .. } => metadata,
4179 HydroNode::Network { metadata, .. } => metadata,
4180 HydroNode::Counter { metadata, .. } => metadata,
4181 }
4182 }
4183
4184 pub fn input(&self) -> Vec<&HydroNode> {
4185 match self {
4186 HydroNode::Placeholder => {
4187 panic!()
4188 }
4189 HydroNode::Source { .. }
4190 | HydroNode::SingletonSource { .. }
4191 | HydroNode::ExternalInput { .. }
4192 | HydroNode::CycleSource { .. }
4193 | HydroNode::Tee { .. }
4194 | HydroNode::Partition { .. } => {
4195 vec![]
4197 }
4198 HydroNode::Cast { inner, .. }
4199 | HydroNode::ObserveNonDet { inner, .. }
4200 | HydroNode::YieldConcat { inner, .. }
4201 | HydroNode::BeginAtomic { inner, .. }
4202 | HydroNode::EndAtomic { inner, .. }
4203 | HydroNode::Batch { inner, .. } => {
4204 vec![inner]
4205 }
4206 HydroNode::Chain { first, second, .. } => {
4207 vec![first, second]
4208 }
4209 HydroNode::ChainFirst { first, second, .. } => {
4210 vec![first, second]
4211 }
4212 HydroNode::CrossProduct { left, right, .. }
4213 | HydroNode::CrossSingleton { left, right, .. }
4214 | HydroNode::Join { left, right, .. } => {
4215 vec![left, right]
4216 }
4217 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4218 vec![pos, neg]
4219 }
4220 HydroNode::Map { input, .. }
4221 | HydroNode::FlatMap { input, .. }
4222 | HydroNode::FlatMapStreamBlocking { input, .. }
4223 | HydroNode::Filter { input, .. }
4224 | HydroNode::FilterMap { input, .. }
4225 | HydroNode::Sort { input, .. }
4226 | HydroNode::DeferTick { input, .. }
4227 | HydroNode::Enumerate { input, .. }
4228 | HydroNode::Inspect { input, .. }
4229 | HydroNode::Unique { input, .. }
4230 | HydroNode::Network { input, .. }
4231 | HydroNode::Counter { input, .. }
4232 | HydroNode::ResolveFutures { input, .. }
4233 | HydroNode::ResolveFuturesBlocking { input, .. }
4234 | HydroNode::ResolveFuturesOrdered { input, .. }
4235 | HydroNode::Fold { input, .. }
4236 | HydroNode::FoldKeyed { input, .. }
4237 | HydroNode::Reduce { input, .. }
4238 | HydroNode::ReduceKeyed { input, .. }
4239 | HydroNode::Scan { input, .. } => {
4240 vec![input]
4241 }
4242 HydroNode::ReduceKeyedWatermark {
4243 input, watermark, ..
4244 } => {
4245 vec![input, watermark]
4246 }
4247 }
4248 }
4249
4250 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4251 self.input()
4252 .iter()
4253 .map(|input_node| input_node.metadata())
4254 .collect()
4255 }
4256
4257 pub fn is_shared_with_others(&self) -> bool {
4261 match self {
4262 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4263 Rc::strong_count(&inner.0) > 1
4264 }
4265 _ => false,
4266 }
4267 }
4268
4269 pub fn print_root(&self) -> String {
4270 match self {
4271 HydroNode::Placeholder => {
4272 panic!()
4273 }
4274 HydroNode::Cast { .. } => "Cast()".to_owned(),
4275 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4276 HydroNode::Source { source, .. } => format!("Source({:?})", source),
4277 HydroNode::SingletonSource {
4278 value,
4279 first_tick_only,
4280 ..
4281 } => format!(
4282 "SingletonSource({:?}, first_tick_only={})",
4283 value, first_tick_only
4284 ),
4285 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4286 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4287 HydroNode::Partition { f, is_true, .. } => {
4288 format!("Partition({:?}, is_true={})", f, is_true)
4289 }
4290 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4291 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4292 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4293 HydroNode::Batch { .. } => "Batch()".to_owned(),
4294 HydroNode::Chain { first, second, .. } => {
4295 format!("Chain({}, {})", first.print_root(), second.print_root())
4296 }
4297 HydroNode::ChainFirst { first, second, .. } => {
4298 format!(
4299 "ChainFirst({}, {})",
4300 first.print_root(),
4301 second.print_root()
4302 )
4303 }
4304 HydroNode::CrossProduct { left, right, .. } => {
4305 format!(
4306 "CrossProduct({}, {})",
4307 left.print_root(),
4308 right.print_root()
4309 )
4310 }
4311 HydroNode::CrossSingleton { left, right, .. } => {
4312 format!(
4313 "CrossSingleton({}, {})",
4314 left.print_root(),
4315 right.print_root()
4316 )
4317 }
4318 HydroNode::Join { left, right, .. } => {
4319 format!("Join({}, {})", left.print_root(), right.print_root())
4320 }
4321 HydroNode::Difference { pos, neg, .. } => {
4322 format!("Difference({}, {})", pos.print_root(), neg.print_root())
4323 }
4324 HydroNode::AntiJoin { pos, neg, .. } => {
4325 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4326 }
4327 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4328 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4329 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4330 HydroNode::Map { f, .. } => format!("Map({:?})", f),
4331 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4332 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
4333 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4334 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4335 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4336 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4337 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4338 HydroNode::Unique { .. } => "Unique()".to_owned(),
4339 HydroNode::Sort { .. } => "Sort()".to_owned(),
4340 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4341 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4342 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4343 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4344 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4345 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4346 HydroNode::Network { .. } => "Network()".to_owned(),
4347 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4348 HydroNode::Counter { tag, duration, .. } => {
4349 format!("Counter({:?}, {:?})", tag, duration)
4350 }
4351 }
4352 }
4353}
4354
4355#[cfg(feature = "build")]
4356fn instantiate_network<'a, D>(
4357 env: &mut D::InstantiateEnv,
4358 from_location: &LocationId,
4359 to_location: &LocationId,
4360 processes: &SparseSecondaryMap<LocationKey, D::Process>,
4361 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4362 name: Option<&str>,
4363 networking_info: &crate::networking::NetworkingInfo,
4364) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4365where
4366 D: Deploy<'a>,
4367{
4368 let ((sink, source), connect_fn) = match (from_location, to_location) {
4369 (&LocationId::Process(from), &LocationId::Process(to)) => {
4370 let from_node = processes
4371 .get(from)
4372 .unwrap_or_else(|| {
4373 panic!("A process used in the graph was not instantiated: {}", from)
4374 })
4375 .clone();
4376 let to_node = processes
4377 .get(to)
4378 .unwrap_or_else(|| {
4379 panic!("A process used in the graph was not instantiated: {}", to)
4380 })
4381 .clone();
4382
4383 let sink_port = from_node.next_port();
4384 let source_port = to_node.next_port();
4385
4386 (
4387 D::o2o_sink_source(
4388 env,
4389 &from_node,
4390 &sink_port,
4391 &to_node,
4392 &source_port,
4393 name,
4394 networking_info,
4395 ),
4396 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4397 )
4398 }
4399 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4400 let from_node = processes
4401 .get(from)
4402 .unwrap_or_else(|| {
4403 panic!("A process used in the graph was not instantiated: {}", from)
4404 })
4405 .clone();
4406 let to_node = clusters
4407 .get(to)
4408 .unwrap_or_else(|| {
4409 panic!("A cluster used in the graph was not instantiated: {}", to)
4410 })
4411 .clone();
4412
4413 let sink_port = from_node.next_port();
4414 let source_port = to_node.next_port();
4415
4416 (
4417 D::o2m_sink_source(
4418 env,
4419 &from_node,
4420 &sink_port,
4421 &to_node,
4422 &source_port,
4423 name,
4424 networking_info,
4425 ),
4426 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4427 )
4428 }
4429 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4430 let from_node = clusters
4431 .get(from)
4432 .unwrap_or_else(|| {
4433 panic!("A cluster used in the graph was not instantiated: {}", from)
4434 })
4435 .clone();
4436 let to_node = processes
4437 .get(to)
4438 .unwrap_or_else(|| {
4439 panic!("A process used in the graph was not instantiated: {}", to)
4440 })
4441 .clone();
4442
4443 let sink_port = from_node.next_port();
4444 let source_port = to_node.next_port();
4445
4446 (
4447 D::m2o_sink_source(
4448 env,
4449 &from_node,
4450 &sink_port,
4451 &to_node,
4452 &source_port,
4453 name,
4454 networking_info,
4455 ),
4456 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4457 )
4458 }
4459 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4460 let from_node = clusters
4461 .get(from)
4462 .unwrap_or_else(|| {
4463 panic!("A cluster used in the graph was not instantiated: {}", from)
4464 })
4465 .clone();
4466 let to_node = clusters
4467 .get(to)
4468 .unwrap_or_else(|| {
4469 panic!("A cluster used in the graph was not instantiated: {}", to)
4470 })
4471 .clone();
4472
4473 let sink_port = from_node.next_port();
4474 let source_port = to_node.next_port();
4475
4476 (
4477 D::m2m_sink_source(
4478 env,
4479 &from_node,
4480 &sink_port,
4481 &to_node,
4482 &source_port,
4483 name,
4484 networking_info,
4485 ),
4486 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4487 )
4488 }
4489 (LocationId::Tick(_, _), _) => panic!(),
4490 (_, LocationId::Tick(_, _)) => panic!(),
4491 (LocationId::Atomic(_), _) => panic!(),
4492 (_, LocationId::Atomic(_)) => panic!(),
4493 };
4494 (sink, source, connect_fn)
4495}
4496
4497#[cfg(test)]
4498mod test {
4499 use std::mem::size_of;
4500
4501 use stageleft::{QuotedWithContext, q};
4502
4503 use super::*;
4504
4505 #[test]
4506 #[cfg_attr(
4507 not(feature = "build"),
4508 ignore = "expects inclusion of feature-gated fields"
4509 )]
4510 fn hydro_node_size() {
4511 assert_eq!(size_of::<HydroNode>(), 248);
4512 }
4513
4514 #[test]
4515 #[cfg_attr(
4516 not(feature = "build"),
4517 ignore = "expects inclusion of feature-gated fields"
4518 )]
4519 fn hydro_root_size() {
4520 assert_eq!(size_of::<HydroRoot>(), 136);
4521 }
4522
4523 #[test]
4524 fn test_simplify_q_macro_basic() {
4525 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4527 let result = simplify_q_macro(simple_expr.clone());
4528 assert_eq!(result, simple_expr);
4529 }
4530
4531 #[test]
4532 fn test_simplify_q_macro_actual_stageleft_call() {
4533 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4535 let result = simplify_q_macro(stageleft_call);
4536 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4539 }
4540
4541 #[test]
4542 fn test_closure_no_pipe_at_start() {
4543 let stageleft_call = q!({
4545 let foo = 123;
4546 move |b: usize| b + foo
4547 })
4548 .splice_fn1_ctx(&());
4549 let result = simplify_q_macro(stageleft_call);
4550 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4551 }
4552}