FlinkSQL聚合函数(Aggregate Function)详解
发布网友
发布时间:2024-09-04 22:40
我来回答
共1个回答
热心网友
时间:2024-11-19 11:31
使用场景:聚合函数,也就是 UDAF,通常用于将多条数据输入并输出一条数据的场景。
图中展示了一个聚合函数的示例以及聚合函数包含的重要方法。
案例场景:
关于饮料的表格,有三个字段:id、name、price。表格中有5行数据,要找到所有饮料中最贵的饮料的价格,即执行一个max()聚合函数得到结果,遍历所有5行数据,最终结果只有一个数值。
开发流程:
实现AggregateFunction接口,其中所有的方法必须是public的、非static的。
必须实现以下方法:
某些场景下必须实现:
关于输入、输出参数数据类型的方法:
默认情况下,用户的Input输入参数(accumulate(Acc accumulator, Input输入参数)的输入参数Input输入参数)、accumulator(Acc聚合中间结果createAccumulator()的返回结果)、Output输出参数数据类型(Output输出参数getValue(Acc accumulator)的Output输出参数)会被Flink使用反射获取到。
对于accumulator和Output输出参数类型,Flink SQL的类型推导在遇到复杂类型时会推导出错误的结果(注意:Input输入参数因为是上游算子传入的,类型信息是确定的,不会出现推导错误),比如非基本类型POJO的复杂类型。
与ScalarFunction和TableFunction类似,AggregateFunction提供了AggregateFunction#getResultType()和AggregateFunction#getAccumulatorType()指定最终返回值类型和accumulator的类型,两个函数的返回值类型是TypeInformation。
案例:加权平均值
实现思路:
为了计算加权平均值,accumulator需要存储加权总和以及数据的条数,定义了类WeightedAvgAccumulator作为accumulator,Flink的checkpoint机制会自动保存accumulator,在失败时进行恢复,保证精确一次的语义。
WeightedAvg(聚合函数)的accumulate方法有三个输入参数,第一个是WeightedAvgAccum accumulator,另外两个是用户自定义的输入:输入的值ivalue和输入的权重iweight。尽管retract()、merge()、resetAccumulator()方法在大多数聚合类型中都不是必须实现的,但在示例中提供了它们的实现,并且定义了getResultType()和getAccumulatorType()。
代码案例:
测试结果: