digraph G {
subgraph cluster0 {
isCluster="true";
label="WholeStageCodegen (2)\n \nduration: total (min, med, max (stageId: taskId))\n13 ms (0 ms, 1 ms, 1 ms (stage 161.0: task 906))";
1 [labelType="html" label="<b>HashAggregate</b><br><br>spill size: 0.0 B<br>time in aggregation build total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 161.0: task 906))<br>peak memory total (min, med, max (stageId: taskId))<br>6.3 MiB (256.0 KiB, 256.0 KiB, 256.0 KiB (stage 161.0: task 906))<br>number of output rows: 0<br>number of sort fallback tasks: 0<br>avg hash probes per key: 0"];
}
2 [labelType="html" label="<b>Exchange</b><br><br>shuffle records written: 0<br>local merged chunks fetched: 0<br>shuffle write time: 0 ms<br>remote merged bytes read: 0.0 B<br>local merged blocks fetched: 0<br>corrupt merged block chunks: 0<br>remote merged reqs duration: 0 ms<br>remote merged blocks fetched: 0<br>records read: 0<br>local bytes read: 0.0 B<br>fetch wait time: 0 ms<br>remote bytes read: 0.0 B<br>merged fetch fallback count: 0<br>local blocks read: 0<br>remote merged chunks fetched: 0<br>remote blocks read: 0<br>data size: 0.0 B<br>local merged bytes read: 0.0 B<br>number of partitions: 25<br>remote reqs duration: 0 ms<br>remote bytes read to disk: 0.0 B<br>shuffle bytes written: 0.0 B"];
subgraph cluster3 {
isCluster="true";
label="WholeStageCodegen (1)\n \nduration: total (min, med, max (stageId: taskId))\n109 ms (1 ms, 2 ms, 9 ms (stage 160.0: task 879))";
4 [labelType="html" label="<b>HashAggregate</b><br><br>spill size: 0.0 B<br>time in aggregation build total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 160.0: task 879))<br>peak memory total (min, med, max (stageId: taskId))<br>6.3 MiB (256.0 KiB, 256.0 KiB, 256.0 KiB (stage 160.0: task 879))<br>number of output rows: 0<br>number of sort fallback tasks: 0<br>avg hash probes per key: 0"];
}
5 [labelType="html" label="<b>InMemoryTableScan</b><br><br>number of output rows: 0"];
6 [labelType="html" label="<b>Exchange</b><br><br>shuffle records written: 0<br>local merged chunks fetched: 0<br>shuffle write time: 0 ms<br>remote merged bytes read: 0.0 B<br>local merged blocks fetched: 0<br>corrupt merged block chunks: 0<br>remote merged reqs duration: 0 ms<br>remote merged blocks fetched: 0<br>records read: 0<br>local bytes read: 0.0 B<br>fetch wait time: 0 ms<br>remote bytes read: 0.0 B<br>merged fetch fallback count: 0<br>local blocks read: 0<br>remote merged chunks fetched: 0<br>remote blocks read: 0<br>data size: 0.0 B<br>local merged bytes read: 0.0 B<br>number of partitions: 25<br>remote reqs duration: 0 ms<br>remote bytes read to disk: 0.0 B<br>shuffle bytes written: 0.0 B"];
subgraph cluster7 {
isCluster="true";
label="WholeStageCodegen (2)\n \nduration: total (min, med, max (stageId: taskId))\n151 ms (1 ms, 2 ms, 16 ms (stage 159.0: task 858))";
8 [labelType="html" label="<b>HashAggregate</b><br><br>spill size: 0.0 B<br>time in aggregation build total (min, med, max (stageId: taskId))<br>0 ms (0 ms, 0 ms, 0 ms (stage 159.0: task 861))<br>peak memory total (min, med, max (stageId: taskId))<br>6.3 MiB (256.0 KiB, 256.0 KiB, 256.0 KiB (stage 159.0: task 861))<br>number of output rows: 0<br>number of sort fallback tasks: 0<br>avg hash probes per key: 0"];
}
9 [labelType="html" label="<b>Exchange</b><br><br>shuffle records written: 0<br>local merged chunks fetched: 0<br>shuffle write time: 0 ms<br>remote merged bytes read: 0.0 B<br>local merged blocks fetched: 0<br>corrupt merged block chunks: 0<br>remote merged reqs duration: 0 ms<br>remote merged blocks fetched: 0<br>records read: 0<br>local bytes read: 0.0 B<br>fetch wait time: 0 ms<br>remote bytes read: 0.0 B<br>merged fetch fallback count: 0<br>local blocks read: 0<br>remote merged chunks fetched: 0<br>remote blocks read: 0<br>data size: 0.0 B<br>local merged bytes read: 0.0 B<br>number of partitions: 25<br>remote reqs duration: 0 ms<br>remote bytes read to disk: 0.0 B<br>shuffle bytes written: 0.0 B"];
10 [labelType="html" label="<b>HashAggregate</b><br><br>spill size: 0.0 B<br>time in aggregation build: 0 ms<br>peak memory: 0.0 B<br>number of output rows: 0<br>number of sort fallback tasks: 0<br>avg hash probes per key: 0"];
subgraph cluster11 {
isCluster="true";
label="WholeStageCodegen (1)\n \nduration: 0 ms";
12 [labelType="html" label="<b>Scan ExistingRDD</b><br><br>number of output rows: 0"];
}
2->1;
4->2;
5->4;
6->5;
8->6;
9->8;
10->9;
12->10;
}
13
HashAggregate(keys=[shardId#11368], functions=[max(version#11370)])
WholeStageCodegen (2)
Exchange hashpartitioning(shardId#11368, 25), ENSURE_REQUIREMENTS, [plan_id=3803]
HashAggregate(keys=[shardId#11368], functions=[partial_max(version#11370)])
WholeStageCodegen (1)
InMemoryTableScan [shardId#11368, version#11370]
Exchange hashpartitioning(worklistShardItemId#11369L, 25), REPARTITION_BY_NUM, [plan_id=3775]
HashAggregate(keys=[demandChannel#11372, shardId#11368, kpis#11374, version#11370, qty#11371, worklistShardItemId#11369L, demandStream#11373], functions=[])
WholeStageCodegen (2)
Exchange hashpartitioning(demandChannel#11372, shardId#11368, kpis#11374, version#11370, qty#11371, worklistShardItemId#11369L, demandStream#11373, 25), ENSURE_REQUIREMENTS, [plan_id=3771]
HashAggregate(keys=[demandChannel#11372, shardId#11368, knownfloatingpointnormalized(transform(kpis#11374, lambdafunction(knownfloatingpointnormalized(if (isnull(lambda arg#13501)) null else named_struct(label, lambda arg#13501.label, dateTime, lambda arg#13501.dateTime, value, knownfloatingpointnormalized(normalizenanandzero(lambda arg#13501.value)))), lambda arg#13501, false))) AS kpis#11374, version#11370, knownfloatingpointnormalized(normalizenanandzero(qty#11371)) AS qty#11371, worklistShardItemId#11369L, demandStream#11373], functions=[])
Scan ExistingRDD[shardId#11368,worklistShardItemId#11369L,version#11370,qty#11371,demandChannel#11372,demandStream#11373,kpis#11374]
WholeStageCodegen (1)
== Physical Plan ==
* HashAggregate (10)
+- Exchange (9)
+- * HashAggregate (8)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- Exchange (7)
+- * HashAggregate (6)
+- Exchange (5)
+- HashAggregate (4)
+- * Scan ExistingRDD (3)
(1) InMemoryTableScan
Output [2]: [shardId#11368, version#11370]
Arguments: [shardId#11368, version#11370]
(2) InMemoryRelation
Arguments: [shardId#11368, worklistShardItemId#11369L, version#11370, qty#11371, demandChannel#11372, demandStream#11373, kpis#11374], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@763c116,StorageLevel(disk, memory, 2 replicas),Exchange hashpartitioning(worklistShardItemId#11369L, 25), REPARTITION_BY_NUM, [plan_id=3775]
+- *(2) HashAggregate(keys=[demandChannel#11372, shardId#11368, kpis#11374, version#11370, qty#11371, worklistShardItemId#11369L, demandStream#11373], functions=[], output=[shardId#11368, worklistShardItemId#11369L, version#11370, qty#11371, demandChannel#11372, demandStream#11373, kpis#11374])
+- Exchange hashpartitioning(demandChannel#11372, shardId#11368, kpis#11374, version#11370, qty#11371, worklistShardItemId#11369L, demandStream#11373, 25), ENSURE_REQUIREMENTS, [plan_id=3771]
+- HashAggregate(keys=[demandChannel#11372, shardId#11368, knownfloatingpointnormalized(transform(kpis#11374, lambdafunction(knownfloatingpointnormalized(if (isnull(lambda arg#13501)) null else named_struct(label, lambda arg#13501.label, dateTime, lambda arg#13501.dateTime, value, knownfloatingpointnormalized(normalizenanandzero(lambda arg#13501.value)))), lambda arg#13501, false))) AS kpis#11374, version#11370, knownfloatingpointnormalized(normalizenanandzero(qty#11371)) AS qty#11371, worklistShardItemId#11369L, demandStream#11373], functions=[], output=[demandChannel#11372, shardId#11368, kpis#11374, version#11370, qty#11371, worklistShardItemId#11369L, demandStream#11373])
+- *(1) Scan ExistingRDD[shardId#11368,worklistShardItemId#11369L,version#11370,qty#11371,demandChannel#11372,demandStream#11373,kpis#11374]
,None)
(3) Scan ExistingRDD [codegen id : 1]
Output [7]: [shardId#11368, worklistShardItemId#11369L, version#11370, qty#11371, demandChannel#11372, demandStream#11373, kpis#11374]
Arguments: [shardId#11368, worklistShardItemId#11369L, version#11370, qty#11371, demandChannel#11372, demandStream#11373, kpis#11374], SQLExecutionRDD[394] at start at FileStorageAdapterImpl.java:529, ExistingRDD, UnknownPartitioning(0)
(4) HashAggregate
Input [7]: [shardId#11368, worklistShardItemId#11369L, version#11370, qty#11371, demandChannel#11372, demandStream#11373, kpis#11374]
Keys [7]: [demandChannel#11372, shardId#11368, knownfloatingpointnormalized(transform(kpis#11374, lambdafunction(knownfloatingpointnormalized(if (isnull(lambda arg#13501)) null else named_struct(label, lambda arg#13501.label, dateTime, lambda arg#13501.dateTime, value, knownfloatingpointnormalized(normalizenanandzero(lambda arg#13501.value)))), lambda arg#13501, false))) AS kpis#11374, version#11370, knownfloatingpointnormalized(normalizenanandzero(qty#11371)) AS qty#11371, worklistShardItemId#11369L, demandStream#11373]
Functions: []
Aggregate Attributes: []
Results [7]: [demandChannel#11372, shardId#11368, kpis#11374, version#11370, qty#11371, worklistShardItemId#11369L, demandStream#11373]
(5) Exchange
Input [7]: [demandChannel#11372, shardId#11368, kpis#11374, version#11370, qty#11371, worklistShardItemId#11369L, demandStream#11373]
Arguments: hashpartitioning(demandChannel#11372, shardId#11368, kpis#11374, version#11370, qty#11371, worklistShardItemId#11369L, demandStream#11373, 25), ENSURE_REQUIREMENTS, [plan_id=3771]
(6) HashAggregate [codegen id : 2]
Input [7]: [demandChannel#11372, shardId#11368, kpis#11374, version#11370, qty#11371, worklistShardItemId#11369L, demandStream#11373]
Keys [7]: [demandChannel#11372, shardId#11368, kpis#11374, version#11370, qty#11371, worklistShardItemId#11369L, demandStream#11373]
Functions: []
Aggregate Attributes: []
Results [7]: [shardId#11368, worklistShardItemId#11369L, version#11370, qty#11371, demandChannel#11372, demandStream#11373, kpis#11374]
(7) Exchange
Input [7]: [shardId#11368, worklistShardItemId#11369L, version#11370, qty#11371, demandChannel#11372, demandStream#11373, kpis#11374]
Arguments: hashpartitioning(worklistShardItemId#11369L, 25), REPARTITION_BY_NUM, [plan_id=3775]
(8) HashAggregate [codegen id : 1]
Input [2]: [shardId#11368, version#11370]
Keys [1]: [shardId#11368]
Functions [1]: [partial_max(version#11370)]
Aggregate Attributes [1]: [max#13650]
Results [2]: [shardId#11368, max#13651]
(9) Exchange
Input [2]: [shardId#11368, max#13651]
Arguments: hashpartitioning(shardId#11368, 25), ENSURE_REQUIREMENTS, [plan_id=3803]
(10) HashAggregate [codegen id : 2]
Input [2]: [shardId#11368, max#13651]
Keys [1]: [shardId#11368]
Functions [1]: [max(version#11370)]
Aggregate Attributes [1]: [max(version#11370)#13541]
Results [2]: [shardId#11368, max(version#11370)#13541 AS version#13542]