1use std::collections::{HashMap, HashSet};
2use std::error::Error;
3use std::fmt::{Display, Write};
4use std::num::ParseIntError;
5use std::sync::OnceLock;
6
7use auto_impl::auto_impl;
8use slotmap::{Key, SecondaryMap, SlotMap};
9
10pub use super::graphviz::{HydroDot, escape_dot};
11pub use super::json::HydroJson;
12pub use super::mermaid::{HydroMermaid, escape_mermaid};
14use crate::compile::ir::backtrace::Backtrace;
15use crate::compile::ir::{DebugExpr, HydroIrMetadata, HydroNode, HydroRoot, HydroSource};
16use crate::location::dynamic::LocationId;
17use crate::location::{LocationKey, LocationType};
18
19#[derive(Debug, Clone)]
21pub enum NodeLabel {
22 Static(String),
24 WithExprs {
26 op_name: String,
27 exprs: Vec<DebugExpr>,
28 },
29}
30
31impl NodeLabel {
32 pub fn static_label(s: String) -> Self {
34 Self::Static(s)
35 }
36
37 pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
39 Self::WithExprs { op_name, exprs }
40 }
41}
42
43impl Display for NodeLabel {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 match self {
46 Self::Static(s) => write!(f, "{}", s),
47 Self::WithExprs { op_name, exprs } => {
48 if exprs.is_empty() {
49 write!(f, "{}()", op_name)
50 } else {
51 let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
52 write!(f, "{}({})", op_name, expr_strs.join(", "))
53 }
54 }
55 }
56 }
57}
58
59pub struct IndentedGraphWriter<'a, W> {
62 pub write: W,
63 pub indent: usize,
64 pub config: HydroWriteConfig<'a>,
65}
66
67impl<'a, W> IndentedGraphWriter<'a, W> {
68 pub fn new(write: W) -> Self {
70 Self {
71 write,
72 indent: 0,
73 config: HydroWriteConfig::default(),
74 }
75 }
76
77 pub fn new_with_config(write: W, config: HydroWriteConfig<'a>) -> Self {
79 Self {
80 write,
81 indent: 0,
82 config,
83 }
84 }
85}
86
87impl<W: Write> IndentedGraphWriter<'_, W> {
88 pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
90 writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
91 }
92}
93
94pub type GraphWriteError = std::fmt::Error;
96
97#[auto_impl(&mut, Box)]
99pub trait HydroGraphWrite {
100 type Err: Error;
102
103 fn write_prologue(&mut self) -> Result<(), Self::Err>;
105
106 fn write_node_definition(
108 &mut self,
109 node_id: VizNodeKey,
110 node_label: &NodeLabel,
111 node_type: HydroNodeType,
112 location_key: Option<LocationKey>,
113 location_type: Option<LocationType>,
114 backtrace: Option<&Backtrace>,
115 ) -> Result<(), Self::Err>;
116
117 fn write_edge(
119 &mut self,
120 src_id: VizNodeKey,
121 dst_id: VizNodeKey,
122 edge_properties: &HashSet<HydroEdgeProp>,
123 label: Option<&str>,
124 ) -> Result<(), Self::Err>;
125
126 fn write_location_start(
128 &mut self,
129 location_key: LocationKey,
130 location_type: LocationType,
131 ) -> Result<(), Self::Err>;
132
133 fn write_node(&mut self, node_id: VizNodeKey) -> Result<(), Self::Err>;
135
136 fn write_location_end(&mut self) -> Result<(), Self::Err>;
138
139 fn write_epilogue(&mut self) -> Result<(), Self::Err>;
141}
142
143pub mod node_type_utils {
145 use super::HydroNodeType;
146
147 const NODE_TYPE_DATA: &[(HydroNodeType, &str)] = &[
149 (HydroNodeType::Source, "Source"),
150 (HydroNodeType::Transform, "Transform"),
151 (HydroNodeType::Join, "Join"),
152 (HydroNodeType::Aggregation, "Aggregation"),
153 (HydroNodeType::Network, "Network"),
154 (HydroNodeType::Sink, "Sink"),
155 (HydroNodeType::Tee, "Tee"),
156 (HydroNodeType::NonDeterministic, "NonDeterministic"),
157 ];
158
159 pub fn to_string(node_type: HydroNodeType) -> &'static str {
161 NODE_TYPE_DATA
162 .iter()
163 .find(|(nt, _)| *nt == node_type)
164 .map(|(_, name)| *name)
165 .unwrap_or("Unknown")
166 }
167
168 pub fn all_types_with_strings() -> Vec<(HydroNodeType, &'static str)> {
170 NODE_TYPE_DATA.to_vec()
171 }
172}
173
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum HydroNodeType {
177 Source,
178 Transform,
179 Join,
180 Aggregation,
181 Network,
182 Sink,
183 Tee,
184 NonDeterministic,
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
189pub enum HydroEdgeProp {
190 Bounded,
191 Unbounded,
192 TotalOrder,
193 NoOrder,
194 Keyed,
195 Stream,
197 KeyedSingleton,
198 KeyedStream,
199 Singleton,
200 Optional,
201 Network,
202 Cycle,
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct UnifiedEdgeStyle {
209 pub line_pattern: LinePattern,
211 pub line_width: u8,
213 pub arrowhead: ArrowheadStyle,
215 pub line_style: LineStyle,
217 pub halo: HaloStyle,
219 pub waviness: WavinessStyle,
221 pub animation: AnimationStyle,
223 pub color: &'static str,
225}
226
227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228pub enum LinePattern {
229 Solid,
230 Dotted,
231 Dashed,
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub enum ArrowheadStyle {
236 TriangleFilled,
237 CircleFilled,
238 DiamondOpen,
239 Default,
240}
241
242#[derive(Debug, Clone, Copy, PartialEq, Eq)]
243pub enum LineStyle {
244 Single,
246 HashMarks,
248}
249
250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub enum HaloStyle {
252 None,
253 LightBlue,
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum WavinessStyle {
258 None,
259 Wavy,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum AnimationStyle {
264 Static,
265 Animated,
266}
267
268impl Default for UnifiedEdgeStyle {
269 fn default() -> Self {
270 Self {
271 line_pattern: LinePattern::Solid,
272 line_width: 1,
273 arrowhead: ArrowheadStyle::Default,
274 line_style: LineStyle::Single,
275 halo: HaloStyle::None,
276 waviness: WavinessStyle::None,
277 animation: AnimationStyle::Static,
278 color: "#666666",
279 }
280 }
281}
282
283pub fn get_unified_edge_style(
296 edge_properties: &HashSet<HydroEdgeProp>,
297 src_location: Option<usize>,
298 dst_location: Option<usize>,
299) -> UnifiedEdgeStyle {
300 let mut style = UnifiedEdgeStyle::default();
301
302 let is_network = edge_properties.contains(&HydroEdgeProp::Network)
304 || (src_location.is_some() && dst_location.is_some() && src_location != dst_location);
305
306 if is_network {
307 style.line_pattern = LinePattern::Dashed;
308 style.animation = AnimationStyle::Animated;
309 } else {
310 style.line_pattern = LinePattern::Solid;
311 style.animation = AnimationStyle::Static;
312 }
313
314 if edge_properties.contains(&HydroEdgeProp::Unbounded) {
316 style.halo = HaloStyle::LightBlue;
317 } else {
318 style.halo = HaloStyle::None;
319 }
320
321 if edge_properties.contains(&HydroEdgeProp::Stream) {
323 style.arrowhead = ArrowheadStyle::TriangleFilled;
324 style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedStream) {
326 style.arrowhead = ArrowheadStyle::TriangleFilled;
327 style.color = "#2563eb"; } else if edge_properties.contains(&HydroEdgeProp::KeyedSingleton) {
329 style.arrowhead = ArrowheadStyle::TriangleFilled;
330 style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Singleton) {
332 style.arrowhead = ArrowheadStyle::CircleFilled;
333 style.color = "#000000"; } else if edge_properties.contains(&HydroEdgeProp::Optional) {
335 style.arrowhead = ArrowheadStyle::DiamondOpen;
336 style.color = "#6b7280"; }
338
339 if edge_properties.contains(&HydroEdgeProp::Keyed) {
341 style.line_style = LineStyle::HashMarks; } else {
343 style.line_style = LineStyle::Single;
344 }
345
346 if edge_properties.contains(&HydroEdgeProp::NoOrder) {
348 style.waviness = WavinessStyle::Wavy;
349 } else if edge_properties.contains(&HydroEdgeProp::TotalOrder) {
350 style.waviness = WavinessStyle::None;
351 }
352
353 style
354}
355
356pub fn extract_edge_properties_from_collection_kind(
360 collection_kind: &crate::compile::ir::CollectionKind,
361) -> HashSet<HydroEdgeProp> {
362 use crate::compile::ir::CollectionKind;
363
364 let mut properties = HashSet::new();
365
366 match collection_kind {
367 CollectionKind::Stream { bound, order, .. } => {
368 properties.insert(HydroEdgeProp::Stream);
369 add_bound_property(&mut properties, bound);
370 add_order_property(&mut properties, order);
371 }
372 CollectionKind::KeyedStream {
373 bound, value_order, ..
374 } => {
375 properties.insert(HydroEdgeProp::KeyedStream);
376 properties.insert(HydroEdgeProp::Keyed);
377 add_bound_property(&mut properties, bound);
378 add_order_property(&mut properties, value_order);
379 }
380 CollectionKind::Singleton { bound, .. } => {
381 properties.insert(HydroEdgeProp::Singleton);
382 add_bound_property(&mut properties, bound);
383 properties.insert(HydroEdgeProp::TotalOrder);
385 }
386 CollectionKind::Optional { bound, .. } => {
387 properties.insert(HydroEdgeProp::Optional);
388 add_bound_property(&mut properties, bound);
389 properties.insert(HydroEdgeProp::TotalOrder);
391 }
392 CollectionKind::KeyedSingleton { bound, .. } => {
393 properties.insert(HydroEdgeProp::Singleton);
394 properties.insert(HydroEdgeProp::Keyed);
395 add_keyed_singleton_bound_property(&mut properties, bound);
397 properties.insert(HydroEdgeProp::TotalOrder);
398 }
399 }
400
401 properties
402}
403
404fn add_bound_property(
406 properties: &mut HashSet<HydroEdgeProp>,
407 bound: &crate::compile::ir::BoundKind,
408) {
409 use crate::compile::ir::BoundKind;
410
411 match bound {
412 BoundKind::Bounded => {
413 properties.insert(HydroEdgeProp::Bounded);
414 }
415 BoundKind::Unbounded => {
416 properties.insert(HydroEdgeProp::Unbounded);
417 }
418 }
419}
420
421fn add_keyed_singleton_bound_property(
423 properties: &mut HashSet<HydroEdgeProp>,
424 bound: &crate::compile::ir::KeyedSingletonBoundKind,
425) {
426 use crate::compile::ir::KeyedSingletonBoundKind;
427
428 match bound {
429 KeyedSingletonBoundKind::Bounded | KeyedSingletonBoundKind::BoundedValue => {
430 properties.insert(HydroEdgeProp::Bounded);
431 }
432 KeyedSingletonBoundKind::Unbounded => {
433 properties.insert(HydroEdgeProp::Unbounded);
434 }
435 }
436}
437
438fn add_order_property(
440 properties: &mut HashSet<HydroEdgeProp>,
441 order: &crate::compile::ir::StreamOrder,
442) {
443 use crate::compile::ir::StreamOrder;
444
445 match order {
446 StreamOrder::TotalOrder => {
447 properties.insert(HydroEdgeProp::TotalOrder);
448 }
449 StreamOrder::NoOrder => {
450 properties.insert(HydroEdgeProp::NoOrder);
451 }
452 }
453}
454
455pub fn is_network_edge(src_location: &LocationId, dst_location: &LocationId) -> bool {
458 src_location.root() != dst_location.root()
460}
461
462pub fn add_network_edge_tag(
464 properties: &mut HashSet<HydroEdgeProp>,
465 src_location: &LocationId,
466 dst_location: &LocationId,
467) {
468 if is_network_edge(src_location, dst_location) {
469 properties.insert(HydroEdgeProp::Network);
470 }
471}
472
473#[derive(Debug, Clone, Copy)]
475pub struct HydroWriteConfig<'a> {
476 pub show_metadata: bool,
477 pub show_location_groups: bool,
478 pub use_short_labels: bool,
479 pub location_names: &'a SecondaryMap<LocationKey, String>,
480}
481
482impl Default for HydroWriteConfig<'_> {
483 fn default() -> Self {
484 static EMPTY: OnceLock<SecondaryMap<LocationKey, String>> = OnceLock::new();
485 Self {
486 show_metadata: false,
487 show_location_groups: true,
488 use_short_labels: true, location_names: EMPTY.get_or_init(SecondaryMap::new),
490 }
491 }
492}
493
494#[derive(Clone)]
496pub struct HydroGraphNode {
497 pub label: NodeLabel,
498 pub node_type: HydroNodeType,
499 pub location_key: Option<LocationKey>,
500 pub backtrace: Option<Backtrace>,
501}
502
503slotmap::new_key_type! {
504 pub struct VizNodeKey;
508}
509
510impl Display for VizNodeKey {
511 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512 write!(f, "viz{:?}", self.data()) }
514}
515
516impl std::str::FromStr for VizNodeKey {
519 type Err = Option<ParseIntError>;
520
521 fn from_str(s: &str) -> Result<Self, Self::Err> {
522 let nvn = s.strip_prefix("viz").ok_or(None)?;
523 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
524 let idx: u64 = idx.parse()?;
525 let ver: u64 = ver.parse()?;
526 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
527 }
528}
529
530impl VizNodeKey {
531 #[cfg(test)]
533 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x0000008f00000001)); #[cfg(test)]
537 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x0000008f00000002)); }
539
540#[derive(Debug, Clone)]
542pub struct HydroGraphEdge {
543 pub src: VizNodeKey,
544 pub dst: VizNodeKey,
545 pub edge_properties: HashSet<HydroEdgeProp>,
546 pub label: Option<String>,
547}
548
549#[derive(Default)]
551pub struct HydroGraphStructure {
552 pub nodes: SlotMap<VizNodeKey, HydroGraphNode>,
553 pub edges: Vec<HydroGraphEdge>,
554 pub locations: SecondaryMap<LocationKey, LocationType>,
555}
556
557impl HydroGraphStructure {
558 pub fn new() -> Self {
559 Self::default()
560 }
561
562 pub fn add_node(
563 &mut self,
564 label: NodeLabel,
565 node_type: HydroNodeType,
566 location_key: Option<LocationKey>,
567 ) -> VizNodeKey {
568 self.add_node_with_backtrace(label, node_type, location_key, None)
569 }
570
571 pub fn add_node_with_backtrace(
572 &mut self,
573 label: NodeLabel,
574 node_type: HydroNodeType,
575 location_key: Option<LocationKey>,
576 backtrace: Option<Backtrace>,
577 ) -> VizNodeKey {
578 self.nodes.insert(HydroGraphNode {
579 label,
580 node_type,
581 location_key,
582 backtrace,
583 })
584 }
585
586 pub fn add_node_with_metadata(
588 &mut self,
589 label: NodeLabel,
590 node_type: HydroNodeType,
591 metadata: &HydroIrMetadata,
592 ) -> VizNodeKey {
593 let location_key = Some(setup_location(self, metadata));
594 let backtrace = Some(metadata.op.backtrace.clone());
595 self.add_node_with_backtrace(label, node_type, location_key, backtrace)
596 }
597
598 pub fn add_edge(
599 &mut self,
600 src: VizNodeKey,
601 dst: VizNodeKey,
602 edge_properties: HashSet<HydroEdgeProp>,
603 label: Option<String>,
604 ) {
605 self.edges.push(HydroGraphEdge {
606 src,
607 dst,
608 edge_properties,
609 label,
610 });
611 }
612
613 pub fn add_edge_single(
615 &mut self,
616 src: VizNodeKey,
617 dst: VizNodeKey,
618 edge_type: HydroEdgeProp,
619 label: Option<String>,
620 ) {
621 let mut properties = HashSet::new();
622 properties.insert(edge_type);
623 self.edges.push(HydroGraphEdge {
624 src,
625 dst,
626 edge_properties: properties,
627 label,
628 });
629 }
630
631 pub fn add_location(&mut self, location_key: LocationKey, location_type: LocationType) {
632 self.locations.insert(location_key, location_type);
633 }
634}
635
636pub fn extract_op_name(full_label: String) -> String {
638 full_label
639 .split('(')
640 .next()
641 .unwrap_or("unknown")
642 .to_lowercase()
643}
644
645pub fn extract_short_label(full_label: &str) -> String {
647 if let Some(op_name) = full_label.split('(').next() {
649 let base_name = op_name.to_lowercase();
650 match base_name.as_str() {
651 "source" => {
653 if full_label.contains("Iter") {
654 "source_iter".to_owned()
655 } else if full_label.contains("Stream") {
656 "source_stream".to_owned()
657 } else if full_label.contains("ExternalNetwork") {
658 "external_network".to_owned()
659 } else if full_label.contains("Spin") {
660 "spin".to_owned()
661 } else {
662 "source".to_owned()
663 }
664 }
665 "network" => {
666 if full_label.contains("deser") {
667 "network(recv)".to_owned()
668 } else if full_label.contains("ser") {
669 "network(send)".to_owned()
670 } else {
671 "network".to_owned()
672 }
673 }
674 _ => base_name,
676 }
677 } else {
678 if full_label.len() > 20 {
680 format!("{}...", &full_label[..17])
681 } else {
682 full_label.to_owned()
683 }
684 }
685}
686
687fn setup_location(structure: &mut HydroGraphStructure, metadata: &HydroIrMetadata) -> LocationKey {
689 let root = metadata.location_id.root();
690 let location_key = root.key();
691 let location_type = root.location_type().unwrap();
692 structure.add_location(location_key, location_type);
693 location_key
694}
695
696fn add_edge_with_metadata(
699 structure: &mut HydroGraphStructure,
700 src_id: VizNodeKey,
701 dst_id: VizNodeKey,
702 src_metadata: Option<&HydroIrMetadata>,
703 dst_metadata: Option<&HydroIrMetadata>,
704 label: Option<String>,
705) {
706 let mut properties = HashSet::new();
707
708 if let Some(metadata) = src_metadata {
710 properties.extend(extract_edge_properties_from_collection_kind(
711 &metadata.collection_kind,
712 ));
713 }
714
715 if let (Some(src_meta), Some(dst_meta)) = (src_metadata, dst_metadata) {
717 add_network_edge_tag(
718 &mut properties,
719 &src_meta.location_id,
720 &dst_meta.location_id,
721 );
722 }
723
724 if properties.is_empty() {
726 properties.insert(HydroEdgeProp::Stream);
727 }
728
729 structure.add_edge(src_id, dst_id, properties, label);
730}
731
732fn write_graph_structure<W>(
734 structure: &HydroGraphStructure,
735 graph_write: W,
736 config: HydroWriteConfig<'_>,
737) -> Result<(), W::Err>
738where
739 W: HydroGraphWrite,
740{
741 let mut graph_write = graph_write;
742 graph_write.write_prologue()?;
744
745 for (node_id, node) in structure.nodes.iter() {
747 let location_type = node
748 .location_key
749 .and_then(|loc_key| structure.locations.get(loc_key))
750 .copied();
751
752 graph_write.write_node_definition(
753 node_id,
754 &node.label,
755 node.node_type,
756 node.location_key,
757 location_type,
758 node.backtrace.as_ref(),
759 )?;
760 }
761
762 if config.show_location_groups {
764 let mut nodes_by_location = SecondaryMap::<LocationKey, Vec<VizNodeKey>>::new();
765 for (node_id, node) in structure.nodes.iter() {
766 if let Some(location_key) = node.location_key {
767 nodes_by_location
768 .entry(location_key)
769 .expect("location was removed")
770 .or_default()
771 .push(node_id);
772 }
773 }
774
775 for (location_key, node_ids) in nodes_by_location.iter() {
776 if let Some(&location_type) = structure.locations.get(location_key) {
777 graph_write.write_location_start(location_key, location_type)?;
778 for &node_id in node_ids.iter() {
779 graph_write.write_node(node_id)?;
780 }
781 graph_write.write_location_end()?;
782 }
783 }
784 }
785
786 for edge in structure.edges.iter() {
788 graph_write.write_edge(
789 edge.src,
790 edge.dst,
791 &edge.edge_properties,
792 edge.label.as_deref(),
793 )?;
794 }
795
796 graph_write.write_epilogue()?;
797 Ok(())
798}
799
800impl HydroRoot {
801 pub fn build_graph_structure(
803 &self,
804 structure: &mut HydroGraphStructure,
805 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
806 config: HydroWriteConfig<'_>,
807 ) -> VizNodeKey {
808 fn build_sink_node(
810 structure: &mut HydroGraphStructure,
811 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
812 config: HydroWriteConfig<'_>,
813 input: &HydroNode,
814 sink_metadata: Option<&HydroIrMetadata>,
815 label: NodeLabel,
816 ) -> VizNodeKey {
817 let input_id = input.build_graph_structure(structure, seen_tees, config);
818
819 let effective_metadata = if let Some(meta) = sink_metadata {
821 Some(meta)
822 } else {
823 match input {
824 HydroNode::Placeholder => None,
825 _ => Some(input.metadata()),
827 }
828 };
829
830 let location_key = effective_metadata.map(|m| setup_location(structure, m));
831 let sink_id = structure.add_node_with_backtrace(
832 label,
833 HydroNodeType::Sink,
834 location_key,
835 effective_metadata.map(|m| m.op.backtrace.clone()),
836 );
837
838 let input_metadata = input.metadata();
840 add_edge_with_metadata(
841 structure,
842 input_id,
843 sink_id,
844 Some(input_metadata),
845 sink_metadata,
846 None,
847 );
848
849 sink_id
850 }
851
852 match self {
853 HydroRoot::ForEach { f, input, .. } => build_sink_node(
855 structure,
856 seen_tees,
857 config,
858 input,
859 None,
860 NodeLabel::with_exprs("for_each".to_owned(), vec![f.clone()]),
861 ),
862
863 HydroRoot::SendExternal {
864 to_external_key,
865 to_port_id,
866 input,
867 ..
868 } => build_sink_node(
869 structure,
870 seen_tees,
871 config,
872 input,
873 None,
874 NodeLabel::with_exprs(
875 format!("send_external({}:{})", to_external_key, to_port_id),
876 vec![],
877 ),
878 ),
879
880 HydroRoot::DestSink { sink, input, .. } => build_sink_node(
881 structure,
882 seen_tees,
883 config,
884 input,
885 None,
886 NodeLabel::with_exprs("dest_sink".to_owned(), vec![sink.clone()]),
887 ),
888
889 HydroRoot::CycleSink {
890 cycle_id, input, ..
891 } => build_sink_node(
892 structure,
893 seen_tees,
894 config,
895 input,
896 None,
897 NodeLabel::static_label(format!("cycle_sink({})", cycle_id)),
898 ),
899
900 HydroRoot::EmbeddedOutput { ident, input, .. } => build_sink_node(
901 structure,
902 seen_tees,
903 config,
904 input,
905 None,
906 NodeLabel::static_label(format!("embedded_output({})", ident)),
907 ),
908
909 HydroRoot::Null { input, .. } => build_sink_node(
910 structure,
911 seen_tees,
912 config,
913 input,
914 None,
915 NodeLabel::static_label("null".to_owned()),
916 ),
917 }
918 }
919}
920
921impl HydroNode {
922 pub fn build_graph_structure(
924 &self,
925 structure: &mut HydroGraphStructure,
926 seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
927 config: HydroWriteConfig<'_>,
928 ) -> VizNodeKey {
929 struct TransformParams<'a> {
933 structure: &'a mut HydroGraphStructure,
934 seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, VizNodeKey>,
935 config: HydroWriteConfig<'a>,
936 input: &'a HydroNode,
937 metadata: &'a HydroIrMetadata,
938 op_name: String,
939 node_type: HydroNodeType,
940 }
941
942 fn build_simple_transform(params: TransformParams) -> VizNodeKey {
944 let input_id = params.input.build_graph_structure(
945 params.structure,
946 params.seen_tees,
947 params.config,
948 );
949 let node_id = params.structure.add_node_with_metadata(
950 NodeLabel::Static(params.op_name.to_string()),
951 params.node_type,
952 params.metadata,
953 );
954
955 let input_metadata = params.input.metadata();
957 add_edge_with_metadata(
958 params.structure,
959 input_id,
960 node_id,
961 Some(input_metadata),
962 Some(params.metadata),
963 None,
964 );
965
966 node_id
967 }
968
969 fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> VizNodeKey {
971 let input_id = params.input.build_graph_structure(
972 params.structure,
973 params.seen_tees,
974 params.config,
975 );
976 let node_id = params.structure.add_node_with_metadata(
977 NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
978 params.node_type,
979 params.metadata,
980 );
981
982 let input_metadata = params.input.metadata();
984 add_edge_with_metadata(
985 params.structure,
986 input_id,
987 node_id,
988 Some(input_metadata),
989 Some(params.metadata),
990 None,
991 );
992
993 node_id
994 }
995
996 fn build_dual_expr_transform(
998 params: TransformParams,
999 expr1: &DebugExpr,
1000 expr2: &DebugExpr,
1001 ) -> VizNodeKey {
1002 let input_id = params.input.build_graph_structure(
1003 params.structure,
1004 params.seen_tees,
1005 params.config,
1006 );
1007 let node_id = params.structure.add_node_with_metadata(
1008 NodeLabel::with_exprs(
1009 params.op_name.to_string(),
1010 vec![expr1.clone(), expr2.clone()],
1011 ),
1012 params.node_type,
1013 params.metadata,
1014 );
1015
1016 let input_metadata = params.input.metadata();
1018 add_edge_with_metadata(
1019 params.structure,
1020 input_id,
1021 node_id,
1022 Some(input_metadata),
1023 Some(params.metadata),
1024 None,
1025 );
1026
1027 node_id
1028 }
1029
1030 fn build_source_node(
1032 structure: &mut HydroGraphStructure,
1033 metadata: &HydroIrMetadata,
1034 label: String,
1035 ) -> VizNodeKey {
1036 structure.add_node_with_metadata(
1037 NodeLabel::Static(label),
1038 HydroNodeType::Source,
1039 metadata,
1040 )
1041 }
1042
1043 match self {
1044 HydroNode::Placeholder => structure.add_node(
1045 NodeLabel::Static("PLACEHOLDER".to_owned()),
1046 HydroNodeType::Transform,
1047 None,
1048 ),
1049
1050 HydroNode::Source {
1051 source, metadata, ..
1052 } => {
1053 let label = match source {
1054 HydroSource::Stream(expr) => format!("source_stream({})", expr),
1055 HydroSource::ExternalNetwork() => "external_network()".to_owned(),
1056 HydroSource::Iter(expr) => format!("source_iter({})", expr),
1057 HydroSource::Spin() => "spin()".to_owned(),
1058 HydroSource::ClusterMembers(location_id, _) => {
1059 format!(
1060 "source_stream(cluster_membership_stream({:?}))",
1061 location_id
1062 )
1063 }
1064 HydroSource::Embedded(ident) => {
1065 format!("embedded_input({})", ident)
1066 }
1067 HydroSource::EmbeddedSingleton(ident) => {
1068 format!("embedded_singleton_input({})", ident)
1069 }
1070 };
1071 build_source_node(structure, metadata, label)
1072 }
1073
1074 HydroNode::SingletonSource {
1075 value,
1076 first_tick_only,
1077 metadata,
1078 } => {
1079 let label = if *first_tick_only {
1080 format!("singleton_first_tick({})", value)
1081 } else {
1082 format!("singleton({})", value)
1083 };
1084 build_source_node(structure, metadata, label)
1085 }
1086
1087 HydroNode::ExternalInput {
1088 from_external_key,
1089 from_port_id,
1090 metadata,
1091 ..
1092 } => build_source_node(
1093 structure,
1094 metadata,
1095 format!("external_input({}:{})", from_external_key, from_port_id),
1096 ),
1097
1098 HydroNode::CycleSource {
1099 cycle_id, metadata, ..
1100 } => build_source_node(structure, metadata, format!("cycle_source({})", cycle_id)),
1101
1102 HydroNode::Tee { inner, metadata } => {
1103 let ptr = inner.as_ptr();
1104 if let Some(&existing_id) = seen_tees.get(&ptr) {
1105 return existing_id;
1106 }
1107
1108 let input_id = inner
1109 .0
1110 .borrow()
1111 .build_graph_structure(structure, seen_tees, config);
1112 let tee_id = structure.add_node_with_metadata(
1113 NodeLabel::Static(extract_op_name(self.print_root())),
1114 HydroNodeType::Tee,
1115 metadata,
1116 );
1117
1118 seen_tees.insert(ptr, tee_id);
1119
1120 let inner_borrow = inner.0.borrow();
1122 let input_metadata = inner_borrow.metadata();
1123 add_edge_with_metadata(
1124 structure,
1125 input_id,
1126 tee_id,
1127 Some(input_metadata),
1128 Some(metadata),
1129 None,
1130 );
1131 drop(inner_borrow);
1132
1133 tee_id
1134 }
1135
1136 HydroNode::Partition {
1137 inner, metadata, ..
1138 } => {
1139 let ptr = inner.as_ptr();
1140 if let Some(&existing_id) = seen_tees.get(&ptr) {
1141 return existing_id;
1142 }
1143
1144 let input_id = inner
1145 .0
1146 .borrow()
1147 .build_graph_structure(structure, seen_tees, config);
1148 let partition_id = structure.add_node_with_metadata(
1149 NodeLabel::Static(extract_op_name(self.print_root())),
1150 HydroNodeType::Tee,
1151 metadata,
1152 );
1153
1154 seen_tees.insert(ptr, partition_id);
1155
1156 let inner_borrow = inner.0.borrow();
1158 let input_metadata = inner_borrow.metadata();
1159 add_edge_with_metadata(
1160 structure,
1161 input_id,
1162 partition_id,
1163 Some(input_metadata),
1164 Some(metadata),
1165 None,
1166 );
1167 drop(inner_borrow);
1168
1169 partition_id
1170 }
1171
1172 HydroNode::ObserveNonDet {
1174 inner, metadata, ..
1175 } => build_simple_transform(TransformParams {
1176 structure,
1177 seen_tees,
1178 config,
1179 input: inner,
1180 metadata,
1181 op_name: extract_op_name(self.print_root()),
1182 node_type: HydroNodeType::NonDeterministic,
1183 }),
1184
1185 HydroNode::Cast { inner, metadata }
1187 | HydroNode::DeferTick {
1188 input: inner,
1189 metadata,
1190 }
1191 | HydroNode::Enumerate {
1192 input: inner,
1193 metadata,
1194 ..
1195 }
1196 | HydroNode::Unique {
1197 input: inner,
1198 metadata,
1199 }
1200 | HydroNode::ResolveFutures {
1201 input: inner,
1202 metadata,
1203 }
1204 | HydroNode::ResolveFuturesBlocking {
1205 input: inner,
1206 metadata,
1207 }
1208 | HydroNode::ResolveFuturesOrdered {
1209 input: inner,
1210 metadata,
1211 } => build_simple_transform(TransformParams {
1212 structure,
1213 seen_tees,
1214 config,
1215 input: inner,
1216 metadata,
1217 op_name: extract_op_name(self.print_root()),
1218 node_type: HydroNodeType::Transform,
1219 }),
1220
1221 HydroNode::Sort {
1223 input: inner,
1224 metadata,
1225 } => build_simple_transform(TransformParams {
1226 structure,
1227 seen_tees,
1228 config,
1229 input: inner,
1230 metadata,
1231 op_name: extract_op_name(self.print_root()),
1232 node_type: HydroNodeType::Aggregation,
1233 }),
1234
1235 HydroNode::Map { f, input, metadata }
1237 | HydroNode::Filter { f, input, metadata }
1238 | HydroNode::FlatMap { f, input, metadata }
1239 | HydroNode::FlatMapStreamBlocking { f, input, metadata }
1240 | HydroNode::FilterMap { f, input, metadata }
1241 | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
1242 TransformParams {
1243 structure,
1244 seen_tees,
1245 config,
1246 input,
1247 metadata,
1248 op_name: extract_op_name(self.print_root()),
1249 node_type: HydroNodeType::Transform,
1250 },
1251 f,
1252 ),
1253
1254 HydroNode::Reduce { f, input, metadata }
1256 | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
1257 TransformParams {
1258 structure,
1259 seen_tees,
1260 config,
1261 input,
1262 metadata,
1263 op_name: extract_op_name(self.print_root()),
1264 node_type: HydroNodeType::Aggregation,
1265 },
1266 f,
1267 ),
1268
1269 HydroNode::Join {
1271 left,
1272 right,
1273 metadata,
1274 }
1275 | HydroNode::CrossProduct {
1276 left,
1277 right,
1278 metadata,
1279 }
1280 | HydroNode::CrossSingleton {
1281 left,
1282 right,
1283 metadata,
1284 } => {
1285 let left_id = left.build_graph_structure(structure, seen_tees, config);
1286 let right_id = right.build_graph_structure(structure, seen_tees, config);
1287 let node_id = structure.add_node_with_metadata(
1288 NodeLabel::Static(extract_op_name(self.print_root())),
1289 HydroNodeType::Join,
1290 metadata,
1291 );
1292
1293 let left_metadata = left.metadata();
1295 add_edge_with_metadata(
1296 structure,
1297 left_id,
1298 node_id,
1299 Some(left_metadata),
1300 Some(metadata),
1301 Some("left".to_owned()),
1302 );
1303
1304 let right_metadata = right.metadata();
1306 add_edge_with_metadata(
1307 structure,
1308 right_id,
1309 node_id,
1310 Some(right_metadata),
1311 Some(metadata),
1312 Some("right".to_owned()),
1313 );
1314
1315 node_id
1316 }
1317
1318 HydroNode::Difference {
1320 pos: left,
1321 neg: right,
1322 metadata,
1323 }
1324 | HydroNode::AntiJoin {
1325 pos: left,
1326 neg: right,
1327 metadata,
1328 } => {
1329 let left_id = left.build_graph_structure(structure, seen_tees, config);
1330 let right_id = right.build_graph_structure(structure, seen_tees, config);
1331 let node_id = structure.add_node_with_metadata(
1332 NodeLabel::Static(extract_op_name(self.print_root())),
1333 HydroNodeType::Join,
1334 metadata,
1335 );
1336
1337 let left_metadata = left.metadata();
1339 add_edge_with_metadata(
1340 structure,
1341 left_id,
1342 node_id,
1343 Some(left_metadata),
1344 Some(metadata),
1345 Some("pos".to_owned()),
1346 );
1347
1348 let right_metadata = right.metadata();
1350 add_edge_with_metadata(
1351 structure,
1352 right_id,
1353 node_id,
1354 Some(right_metadata),
1355 Some(metadata),
1356 Some("neg".to_owned()),
1357 );
1358
1359 node_id
1360 }
1361
1362 HydroNode::Fold {
1364 init,
1365 acc,
1366 input,
1367 metadata,
1368 }
1369 | HydroNode::FoldKeyed {
1370 init,
1371 acc,
1372 input,
1373 metadata,
1374 }
1375 | HydroNode::Scan {
1376 init,
1377 acc,
1378 input,
1379 metadata,
1380 } => {
1381 let node_type = HydroNodeType::Aggregation; build_dual_expr_transform(
1384 TransformParams {
1385 structure,
1386 seen_tees,
1387 config,
1388 input,
1389 metadata,
1390 op_name: extract_op_name(self.print_root()),
1391 node_type,
1392 },
1393 init,
1394 acc,
1395 )
1396 }
1397
1398 HydroNode::ReduceKeyedWatermark {
1400 f,
1401 input,
1402 watermark,
1403 metadata,
1404 } => {
1405 let input_id = input.build_graph_structure(structure, seen_tees, config);
1406 let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
1407 let location_key = Some(setup_location(structure, metadata));
1408 let join_node_id = structure.add_node_with_backtrace(
1409 NodeLabel::Static(extract_op_name(self.print_root())),
1410 HydroNodeType::Join,
1411 location_key,
1412 Some(metadata.op.backtrace.clone()),
1413 );
1414
1415 let input_metadata = input.metadata();
1417 add_edge_with_metadata(
1418 structure,
1419 input_id,
1420 join_node_id,
1421 Some(input_metadata),
1422 Some(metadata),
1423 Some("input".to_owned()),
1424 );
1425
1426 let watermark_metadata = watermark.metadata();
1428 add_edge_with_metadata(
1429 structure,
1430 watermark_id,
1431 join_node_id,
1432 Some(watermark_metadata),
1433 Some(metadata),
1434 Some("watermark".to_owned()),
1435 );
1436
1437 let node_id = structure.add_node_with_backtrace(
1438 NodeLabel::with_exprs(extract_op_name(self.print_root()), vec![f.clone()]),
1439 HydroNodeType::Aggregation,
1440 location_key,
1441 Some(metadata.op.backtrace.clone()),
1442 );
1443
1444 let join_metadata = metadata; add_edge_with_metadata(
1447 structure,
1448 join_node_id,
1449 node_id,
1450 Some(join_metadata),
1451 Some(metadata),
1452 None,
1453 );
1454
1455 node_id
1456 }
1457
1458 HydroNode::Network {
1459 serialize_fn,
1460 deserialize_fn,
1461 input,
1462 metadata,
1463 ..
1464 } => {
1465 let input_id = input.build_graph_structure(structure, seen_tees, config);
1466 let _from_location_key = setup_location(structure, metadata);
1467
1468 let root = metadata.location_id.root();
1469 let to_location_key = root.key();
1470 let to_location_type = root.location_type().unwrap();
1471 structure.add_location(to_location_key, to_location_type);
1472
1473 let mut label = "network(".to_owned();
1474 if serialize_fn.is_some() {
1475 label.push_str("send");
1476 }
1477 if deserialize_fn.is_some() {
1478 if serialize_fn.is_some() {
1479 label.push_str(" + ");
1480 }
1481 label.push_str("recv");
1482 }
1483 label.push(')');
1484
1485 let network_id = structure.add_node_with_backtrace(
1486 NodeLabel::Static(label),
1487 HydroNodeType::Network,
1488 Some(to_location_key),
1489 Some(metadata.op.backtrace.clone()),
1490 );
1491
1492 let input_metadata = input.metadata();
1494 add_edge_with_metadata(
1495 structure,
1496 input_id,
1497 network_id,
1498 Some(input_metadata),
1499 Some(metadata),
1500 Some(format!("to {:?}({})", to_location_type, to_location_key)),
1501 );
1502
1503 network_id
1504 }
1505
1506 HydroNode::Batch { inner, metadata } => build_simple_transform(TransformParams {
1508 structure,
1509 seen_tees,
1510 config,
1511 input: inner,
1512 metadata,
1513 op_name: extract_op_name(self.print_root()),
1514 node_type: HydroNodeType::NonDeterministic,
1515 }),
1516
1517 HydroNode::YieldConcat { inner, .. } => {
1518 inner.build_graph_structure(structure, seen_tees, config)
1520 }
1521
1522 HydroNode::BeginAtomic { inner, .. } => {
1523 inner.build_graph_structure(structure, seen_tees, config)
1524 }
1525
1526 HydroNode::EndAtomic { inner, .. } => {
1527 inner.build_graph_structure(structure, seen_tees, config)
1528 }
1529
1530 HydroNode::Chain {
1531 first,
1532 second,
1533 metadata,
1534 } => {
1535 let first_id = first.build_graph_structure(structure, seen_tees, config);
1536 let second_id = second.build_graph_structure(structure, seen_tees, config);
1537 let location_key = Some(setup_location(structure, metadata));
1538 let chain_id = structure.add_node_with_backtrace(
1539 NodeLabel::Static(extract_op_name(self.print_root())),
1540 HydroNodeType::Transform,
1541 location_key,
1542 Some(metadata.op.backtrace.clone()),
1543 );
1544
1545 let first_metadata = first.metadata();
1547 add_edge_with_metadata(
1548 structure,
1549 first_id,
1550 chain_id,
1551 Some(first_metadata),
1552 Some(metadata),
1553 Some("first".to_owned()),
1554 );
1555
1556 let second_metadata = second.metadata();
1558 add_edge_with_metadata(
1559 structure,
1560 second_id,
1561 chain_id,
1562 Some(second_metadata),
1563 Some(metadata),
1564 Some("second".to_owned()),
1565 );
1566
1567 chain_id
1568 }
1569
1570 HydroNode::ChainFirst {
1571 first,
1572 second,
1573 metadata,
1574 } => {
1575 let first_id = first.build_graph_structure(structure, seen_tees, config);
1576 let second_id = second.build_graph_structure(structure, seen_tees, config);
1577 let location_key = Some(setup_location(structure, metadata));
1578 let chain_id = structure.add_node_with_backtrace(
1579 NodeLabel::Static(extract_op_name(self.print_root())),
1580 HydroNodeType::Transform,
1581 location_key,
1582 Some(metadata.op.backtrace.clone()),
1583 );
1584
1585 let first_metadata = first.metadata();
1587 add_edge_with_metadata(
1588 structure,
1589 first_id,
1590 chain_id,
1591 Some(first_metadata),
1592 Some(metadata),
1593 Some("first".to_owned()),
1594 );
1595
1596 let second_metadata = second.metadata();
1598 add_edge_with_metadata(
1599 structure,
1600 second_id,
1601 chain_id,
1602 Some(second_metadata),
1603 Some(metadata),
1604 Some("second".to_owned()),
1605 );
1606
1607 chain_id
1608 }
1609
1610 HydroNode::Counter {
1611 tag: _,
1612 prefix: _,
1613 duration,
1614 input,
1615 metadata,
1616 } => build_single_expr_transform(
1617 TransformParams {
1618 structure,
1619 seen_tees,
1620 config,
1621 input,
1622 metadata,
1623 op_name: extract_op_name(self.print_root()),
1624 node_type: HydroNodeType::Transform,
1625 },
1626 duration,
1627 ),
1628 }
1629 }
1630}
1631
1632macro_rules! render_hydro_ir {
1635 ($name:ident, $write_fn:ident) => {
1636 pub fn $name(roots: &[HydroRoot], config: HydroWriteConfig<'_>) -> String {
1637 let mut output = String::new();
1638 $write_fn(&mut output, roots, config).unwrap();
1639 output
1640 }
1641 };
1642}
1643
1644macro_rules! write_hydro_ir {
1646 ($name:ident, $writer_type:ty, $constructor:expr) => {
1647 pub fn $name(
1648 output: impl std::fmt::Write,
1649 roots: &[HydroRoot],
1650 config: HydroWriteConfig<'_>,
1651 ) -> std::fmt::Result {
1652 let mut graph_write: $writer_type = $constructor(output, config);
1653 write_hydro_ir_graph(&mut graph_write, roots, config)
1654 }
1655 };
1656}
1657
1658render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1659write_hydro_ir!(
1660 write_hydro_ir_mermaid,
1661 HydroMermaid<_>,
1662 HydroMermaid::new_with_config
1663);
1664
1665render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1666write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1667
1668render_hydro_ir!(render_hydro_ir_hydroscope, write_hydro_ir_json);
1670
1671render_hydro_ir!(render_hydro_ir_json, write_hydro_ir_json);
1673write_hydro_ir!(write_hydro_ir_json, HydroJson<_>, HydroJson::new);
1674
1675fn write_hydro_ir_graph<W>(
1676 graph_write: W,
1677 roots: &[HydroRoot],
1678 config: HydroWriteConfig<'_>,
1679) -> Result<(), W::Err>
1680where
1681 W: HydroGraphWrite,
1682{
1683 let mut structure = HydroGraphStructure::new();
1684 let mut seen_tees = HashMap::new();
1685
1686 for leaf in roots {
1688 leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1689 }
1690
1691 write_graph_structure(&structure, graph_write, config)
1692}