Streaming Aggregation Optimization

As it is known to all, SQL is the de-facto standard for data analytics. For streaming analytics, SQL would enable a larger pool of people to specify applications on data streams in less time, which benefits from many features of SQL, such as it is declarative and can be optimized effectively. In this page, we will introduce some useful optimizations of streaming aggregation which bring great improvement in the performance of streaming processing.

GroupBy Aggregation Optimization

Generally, the group aggregate function processes input records one by one, i.e., getting accumulator from state, accumulating record to accumulator, and writing accumulator back into state. This process pattern may incur much overhead of state. Besides, it is very common to encounter data skew in production which is annoying because skew has a great impact on the performance of stream processing. Thus, some effective measures are proposed to optimize group aggregation.

MiniBatch Aggregation

The main idea of MiniBatch aggregation is caching a bundle of inputs in a buffer inside the aggregation operator. When the bundle of inputs is triggered to process, only one state operation is needed for the inputs with same key, which can reduce the state overhead significantly. The follow figure provide a visual representation of MiniBatch Aggregation.

MiniBatch optimization is disabled by default. To enable this optimization, the following configuration should be properly set.

Key Default Description
sql.exec.mini-batch.allowLatency.ms
Long.MIN_VALUE The maximum latency allowed for a flink sql job. Value > 0 means MiniBatch enabled, otherwise MiniBatch is disabled.
sql.exec.mini-batch.size
Long.MIN_VALUE The maximum number of inputs that a buffer can accommodate. Currently, the aggregation operator uses Java HashMap as the buffer, thus it is necessary to set this parameter to ensure memory safety and be GC-Friendly.

Local-Global Aggregation

Local-Global is proposed to solve data skew problem by dividing an group aggregation into two stages, that is doing local aggregation in upstream firstly, and followed by global aggregation in downstream, which is similar to Combine + Reduce pattern in MapReduce. For example, considering the following SQL:

SELECT color, sum(id)
FROM T
GROUP BY color

It is possible that the records on stream are skewed, thus some instances of aggregation operator have to process much more records than others, which leads to hotspot. As it is shown, large number of inputs is accumulated into a few accumulators by local aggregation, which can release the burden of global aggregation in downstream. See this figure for better understanding of Local-Global pattern.

Local-Global Aggregation is enabled by default as long as MiniBatch optimization is turned on. The related configuration is shown below.

Key Default Description
sql.optimizer.agg.phase.enforcer
NONE Strategy for agg phase. This parameter is used in both stream and batch mode. Only NONE, ONE_PHASE or TWO_PHASE can be set. In stream mode, ONE_PHASE means Local-Global aggregation is disabled, NONE and TWO_PHASE mean Local-Global aggregation is enabled.

Distinct-agg split

Local-Global optimization is effective to eliminate data skew for normal aggregation, such as SUM, COUNT, MAX, MIN. But its performance is not satisfactory when dealing with distinct aggregation. Distinct-agg split is thus proposed to solve this problem by splitting a distinct group aggregation into two layers of aggregation automatically(Agg1 and Agg2, the splitting results of built-in aggregate functions are shown in the following table). The records under a hot key are breaked up by adding a bucket number of the distinct

Original Agg Agg1 Agg2
COUNT_DISTINCT
COUNT_DISTINCT SUM
COUNT
COUNT SUM
MAX
MAX MAX
MIN
MIN MIN
SUM
SUM SUM
AVG
SUM, COUNT SUM / SUM
FIRST_VALUE
FIRST_VALUE FIRST_VALUE
LAST_VALUE
LAST_VALUE LAST_VALUE

key as the new primary key together in the inner aggregation. The bucket number is calculated as: hash_code(distinct_key) % BUCKET_NUM. See the example below for better understanding.

--- original SQL ---
select color, count(distinct id) from T group by color

--- Distinct-agg split result is Equivalent to the following SQL  ---
select color, sum(cnt)
from (
    select color, count(distinct id) as cnt
    from T
    group by color, mod(hash_code(id), 1024)
)
group by color

The execution graph of the upper SQL with Local-Global or Distinct-agg split enabled is shown as below. It can be seen that data under a hot key are evenly redistributed and accumulated among operator instances of AGG1, thus hotspot is eliminated effectively.

Distinct-Agg Split optimization is disabled by default. It is recommended to be enabled only when there is data skew problem in distinct aggregation, because it may incur extra overheads, such as network shuffle. The related configuration is shown below.

Key Default Description
sql.optimizer.data-skew.distinct-agg
false Tell the optimizer whether there exists data skew in distinct aggregation so as to enable the aggregation split optimization.
sql.optimizer.data-skew.distinct-agg.bucket
1024 Configure the number of buckets when splitting distinct aggregation.

Incremental Aggregation

When both Local-Global and Distinct-agg split are enabled, a distinct aggregation will be optimized into four aggregations, i.e., Local-Agg1, Global-Agg1, Local-Agg2 and Global-Agg2 (Agg1 and Agg2 are results of splitting a distinct Aggregation). As a result, additional resources and state overhead is introduced. Incremental optimization is proposed to merge Global-Agg1 and Local-Agg2 into a equivalent Incremental-Agg to solve this problem.

Considering the following SQL:

SELECT color, count(distinct id), count(id)
FROM T
GROUP BY color

The execution graph with Incremental optimization enabled or disabled is shown as below:

Incremental Aggregation is enabled by default when both Local-Global Aggregation and Distinct-Agg Split optimization are enabled. The following configuration is used to turn on/off this optimization.

Key Default Description
sql.exec.incremental-agg.enabled
true Whether to enable incremental aggregation.

Back to top