digraph G {
subgraph cluster0 {
isCluster="true";
label="WholeStageCodegen (2)\n \nduration: total (min, med, max (stageId: taskId))\n0 ms (0 ms, 0 ms, 0 ms (stage 461.0: task 3149))";
1 [labelType="html" label="<b>HashAggregate</b><br><br>spill size: 0.0 B<br>time in aggregation build total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 461.0: task 3149))<br>peak memory total (min, med, max (stageId: taskId))<br>6.3 MiB (256.0 KiB, 256.0 KiB, 256.0 KiB (stage 461.0: task 3149))<br>number of output rows: 0<br>number of sort fallback tasks: 0<br>avg hash probes per key: 0"];
}
2 [labelType="html" label="<b>Exchange</b><br><br>shuffle records written: 0<br>local merged chunks fetched: 0<br>shuffle write time: 0 ms<br>remote merged bytes read: 0.0 B<br>local merged blocks fetched: 0<br>corrupt merged block chunks: 0<br>remote merged reqs duration: 0 ms<br>remote merged blocks fetched: 0<br>records read: 0<br>local bytes read: 0.0 B<br>fetch wait time: 0 ms<br>remote bytes read: 0.0 B<br>merged fetch fallback count: 0<br>local blocks read: 0<br>remote merged chunks fetched: 0<br>remote blocks read: 0<br>data size: 0.0 B<br>local merged bytes read: 0.0 B<br>number of partitions: 25<br>remote reqs duration: 0 ms<br>remote bytes read to disk: 0.0 B<br>shuffle bytes written: 0.0 B"];
subgraph cluster3 {
isCluster="true";
label="WholeStageCodegen (1)\n \nduration: total (min, med, max (stageId: taskId))\n45 ms (0 ms, 1 ms, 7 ms (stage 460.0: task 3121))";
4 [labelType="html" label="<b>HashAggregate</b><br><br>spill size: 0.0 B<br>time in aggregation build total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 460.0: task 3121))<br>peak memory total (min, med, max (stageId: taskId))<br>6.3 MiB (256.0 KiB, 256.0 KiB, 256.0 KiB (stage 460.0: task 3121))<br>number of output rows: 0<br>number of sort fallback tasks: 0<br>avg hash probes per key: 0"];
}
5 [labelType="html" label="<b>InMemoryTableScan</b><br><br>number of output rows: 0"];
6 [labelType="html" label="<b>Exchange</b><br><br>shuffle records written: 0<br>local merged chunks fetched: 0<br>shuffle write time: 0 ms<br>remote merged bytes read: 0.0 B<br>local merged blocks fetched: 0<br>corrupt merged block chunks: 0<br>remote merged reqs duration: 0 ms<br>remote merged blocks fetched: 0<br>records read: 0<br>local bytes read: 0.0 B<br>fetch wait time: 0 ms<br>remote bytes read: 0.0 B<br>merged fetch fallback count: 0<br>local blocks read: 0<br>remote merged chunks fetched: 0<br>remote blocks read: 0<br>data size: 0.0 B<br>local merged bytes read: 0.0 B<br>number of partitions: 25<br>remote reqs duration: 0 ms<br>remote bytes read to disk: 0.0 B<br>shuffle bytes written: 0.0 B"];
subgraph cluster7 {
isCluster="true";
label="WholeStageCodegen (2)\n \nduration: total (min, med, max (stageId: taskId))\n50 ms (0 ms, 0 ms, 9 ms (stage 459.0: task 3098))";
8 [labelType="html" label="<b>HashAggregate</b><br><br>spill size: 0.0 B<br>time in aggregation build total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 459.0: task 3094))<br>peak memory total (min, med, max (stageId: taskId))<br>6.3 MiB (256.0 KiB, 256.0 KiB, 256.0 KiB (stage 459.0: task 3094))<br>number of output rows: 0<br>number of sort fallback tasks: 0<br>avg hash probes per key: 0"];
}
9 [labelType="html" label="<b>Exchange</b><br><br>shuffle records written: 0<br>local merged chunks fetched: 0<br>shuffle write time: 0 ms<br>remote merged bytes read: 0.0 B<br>local merged blocks fetched: 0<br>corrupt merged block chunks: 0<br>remote merged reqs duration: 0 ms<br>remote merged blocks fetched: 0<br>records read: 0<br>local bytes read: 0.0 B<br>fetch wait time: 0 ms<br>remote bytes read: 0.0 B<br>merged fetch fallback count: 0<br>local blocks read: 0<br>remote merged chunks fetched: 0<br>remote blocks read: 0<br>data size: 0.0 B<br>local merged bytes read: 0.0 B<br>number of partitions: 25<br>remote reqs duration: 0 ms<br>remote bytes read to disk: 0.0 B<br>shuffle bytes written: 0.0 B"];
10 [labelType="html" label="<b>HashAggregate</b><br><br>spill size: 0.0 B<br>time in aggregation build: 0 ms<br>peak memory: 0.0 B<br>number of output rows: 0<br>number of sort fallback tasks: 0<br>avg hash probes per key: 0"];
subgraph cluster11 {
isCluster="true";
label="WholeStageCodegen (1)\n \nduration: 0 ms";
12 [labelType="html" label="<b>Scan ExistingRDD</b><br><br>number of output rows: 0"];
}
2->1;
4->2;
5->4;
6->5;
8->6;
9->8;
10->9;
12->10;
}
13
HashAggregate(keys=[shardId#40836], functions=[max(version#40838)])
WholeStageCodegen (2)
Exchange hashpartitioning(shardId#40836, 25), ENSURE_REQUIREMENTS, [plan_id=12379]
HashAggregate(keys=[shardId#40836], functions=[partial_max(version#40838)])
WholeStageCodegen (1)
InMemoryTableScan [shardId#40836, version#40838]
Exchange hashpartitioning(worklistShardItemId#40837L, 25), REPARTITION_BY_NUM, [plan_id=12351]
HashAggregate(keys=[demandChannel#40840, shardId#40836, kpis#40842, version#40838, qty#40839, worklistShardItemId#40837L, demandStream#40841], functions=[])
WholeStageCodegen (2)
Exchange hashpartitioning(demandChannel#40840, shardId#40836, kpis#40842, version#40838, qty#40839, worklistShardItemId#40837L, demandStream#40841, 25), ENSURE_REQUIREMENTS, [plan_id=12347]
HashAggregate(keys=[demandChannel#40840, shardId#40836, knownfloatingpointnormalized(transform(kpis#40842, lambdafunction(knownfloatingpointnormalized(if (isnull(lambda arg#41483)) null else named_struct(label, lambda arg#41483.label, dateTime, lambda arg#41483.dateTime, value, knownfloatingpointnormalized(normalizenanandzero(lambda arg#41483.value)))), lambda arg#41483, false))) AS kpis#40842, version#40838, knownfloatingpointnormalized(normalizenanandzero(qty#40839)) AS qty#40839, worklistShardItemId#40837L, demandStream#40841], functions=[])
Scan ExistingRDD[shardId#40836,worklistShardItemId#40837L,version#40838,qty#40839,demandChannel#40840,demandStream#40841,kpis#40842]
WholeStageCodegen (1)
== Physical Plan ==
* HashAggregate (10)
+- Exchange (9)
+- * HashAggregate (8)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- Exchange (7)
+- * HashAggregate (6)
+- Exchange (5)
+- HashAggregate (4)
+- * Scan ExistingRDD (3)
(1) InMemoryTableScan
Output [2]: [shardId#40836, version#40838]
Arguments: [shardId#40836, version#40838]
(2) InMemoryRelation
Arguments: [shardId#40836, worklistShardItemId#40837L, version#40838, qty#40839, demandChannel#40840, demandStream#40841, kpis#40842], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@763c116,StorageLevel(disk, memory, 2 replicas),Exchange hashpartitioning(worklistShardItemId#40837L, 25), REPARTITION_BY_NUM, [plan_id=12351]
+- *(2) HashAggregate(keys=[demandChannel#40840, shardId#40836, kpis#40842, version#40838, qty#40839, worklistShardItemId#40837L, demandStream#40841], functions=[], output=[shardId#40836, worklistShardItemId#40837L, version#40838, qty#40839, demandChannel#40840, demandStream#40841, kpis#40842])
+- Exchange hashpartitioning(demandChannel#40840, shardId#40836, kpis#40842, version#40838, qty#40839, worklistShardItemId#40837L, demandStream#40841, 25), ENSURE_REQUIREMENTS, [plan_id=12347]
+- HashAggregate(keys=[demandChannel#40840, shardId#40836, knownfloatingpointnormalized(transform(kpis#40842, lambdafunction(knownfloatingpointnormalized(if (isnull(lambda arg#41483)) null else named_struct(label, lambda arg#41483.label, dateTime, lambda arg#41483.dateTime, value, knownfloatingpointnormalized(normalizenanandzero(lambda arg#41483.value)))), lambda arg#41483, false))) AS kpis#40842, version#40838, knownfloatingpointnormalized(normalizenanandzero(qty#40839)) AS qty#40839, worklistShardItemId#40837L, demandStream#40841], functions=[], output=[demandChannel#40840, shardId#40836, kpis#40842, version#40838, qty#40839, worklistShardItemId#40837L, demandStream#40841])
+- *(1) Scan ExistingRDD[shardId#40836,worklistShardItemId#40837L,version#40838,qty#40839,demandChannel#40840,demandStream#40841,kpis#40842]
,None)
(3) Scan ExistingRDD [codegen id : 1]
Output [7]: [shardId#40836, worklistShardItemId#40837L, version#40838, qty#40839, demandChannel#40840, demandStream#40841, kpis#40842]
Arguments: [shardId#40836, worklistShardItemId#40837L, version#40838, qty#40839, demandChannel#40840, demandStream#40841, kpis#40842], SQLExecutionRDD[1213] at start at FileStorageAdapterImpl.java:529, ExistingRDD, UnknownPartitioning(0)
(4) HashAggregate
Input [7]: [shardId#40836, worklistShardItemId#40837L, version#40838, qty#40839, demandChannel#40840, demandStream#40841, kpis#40842]
Keys [7]: [demandChannel#40840, shardId#40836, knownfloatingpointnormalized(transform(kpis#40842, lambdafunction(knownfloatingpointnormalized(if (isnull(lambda arg#41483)) null else named_struct(label, lambda arg#41483.label, dateTime, lambda arg#41483.dateTime, value, knownfloatingpointnormalized(normalizenanandzero(lambda arg#41483.value)))), lambda arg#41483, false))) AS kpis#40842, version#40838, knownfloatingpointnormalized(normalizenanandzero(qty#40839)) AS qty#40839, worklistShardItemId#40837L, demandStream#40841]
Functions: []
Aggregate Attributes: []
Results [7]: [demandChannel#40840, shardId#40836, kpis#40842, version#40838, qty#40839, worklistShardItemId#40837L, demandStream#40841]
(5) Exchange
Input [7]: [demandChannel#40840, shardId#40836, kpis#40842, version#40838, qty#40839, worklistShardItemId#40837L, demandStream#40841]
Arguments: hashpartitioning(demandChannel#40840, shardId#40836, kpis#40842, version#40838, qty#40839, worklistShardItemId#40837L, demandStream#40841, 25), ENSURE_REQUIREMENTS, [plan_id=12347]
(6) HashAggregate [codegen id : 2]
Input [7]: [demandChannel#40840, shardId#40836, kpis#40842, version#40838, qty#40839, worklistShardItemId#40837L, demandStream#40841]
Keys [7]: [demandChannel#40840, shardId#40836, kpis#40842, version#40838, qty#40839, worklistShardItemId#40837L, demandStream#40841]
Functions: []
Aggregate Attributes: []
Results [7]: [shardId#40836, worklistShardItemId#40837L, version#40838, qty#40839, demandChannel#40840, demandStream#40841, kpis#40842]
(7) Exchange
Input [7]: [shardId#40836, worklistShardItemId#40837L, version#40838, qty#40839, demandChannel#40840, demandStream#40841, kpis#40842]
Arguments: hashpartitioning(worklistShardItemId#40837L, 25), REPARTITION_BY_NUM, [plan_id=12351]
(8) HashAggregate [codegen id : 1]
Input [2]: [shardId#40836, version#40838]
Keys [1]: [shardId#40836]
Functions [1]: [partial_max(version#40838)]
Aggregate Attributes [1]: [max#41632]
Results [2]: [shardId#40836, max#41633]
(9) Exchange
Input [2]: [shardId#40836, max#41633]
Arguments: hashpartitioning(shardId#40836, 25), ENSURE_REQUIREMENTS, [plan_id=12379]
(10) HashAggregate [codegen id : 2]
Input [2]: [shardId#40836, max#41633]
Keys [1]: [shardId#40836]
Functions [1]: [max(version#40838)]
Aggregate Attributes [1]: [max(version#40838)#41523]
Results [2]: [shardId#40836, max(version#40838)#41523 AS version#41524]