窗口(window)是處理無限流的核心。窗口將流分割成有限大小的“桶”,我們可以在桶上應用計算。本文檔重點介紹如何在Flink中執行窗口操作,以及程序員如何從其提供的功能中獲得最大的好處。
一個有窗口的Flink程序的一般結構如下所示。第一個片段指的是鍵控流,而第二個片段指的是非鍵控流。可以看到,唯一的區別是keyBy(…)調用鍵流,而window(…)調用非鍵流的windowwall(…)。這也將作為頁面其余部分的路標。
Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
一般真實的流都是無界的,怎樣處理無界的數據?
在自然環境中,數據的產生原本就是流式的。無論是來自 Web 服務器的事件數據,證券交易所的交易數據,還是來自工廠車間機器上的傳感器數據,其數據都是流式的。但是當你 分析數據時,可以圍繞 有界流(bounded)或 無界流(unbounded)兩種模型來組織處理數據,當然,選擇不同的模型,程序的執行和處理方式也都會不同。
上面圖片來源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/learn-flink/overview/
官方文檔
翻轉窗口賦值器將每個元素賦值給一個指定窗口大小的窗口。滾動的窗口有固定的尺寸,而且不重疊。例如,如果您指定一個大小為5分鐘的滾動窗口,則當前窗口將被評估,并每5分鐘啟動一個新窗口,如下圖所示:
【特點】
【示例代碼】
TumblingEventTimeWindows:滾動事件時間窗口
TumblingProcessingTimeWindows:滾動處理時間窗口
val input: DataStream[T]=...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
滑動窗口賦值器將元素賦值給固定長度的窗口。類似于滾動窗口賦值器,窗口的大小由窗口大小參數配置。另外一個窗口滑動參數控制滑動窗口啟動的頻率。因此,如果滑動窗口小于窗口大小,則滑動窗口可以重疊。在這種情況下,元素被分配給多個窗口。
例如,您可以將大小為10分鐘的窗口滑動5分鐘。這樣,每隔5分鐘就會出現一個窗口,其中包含在最后10分鐘內到達的事件,如下圖所示:
【特點】
【示例代碼】
SlidingEventTimeWindows:滑動事件時間窗口
SlidingProcessingTimeWindows:滑動處理時間窗口
val input: DataStream[T]=...
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
會話窗口分配器根據活動的會話對元素進行分組。與滑動窗口不同,會話窗口沒有重疊,也沒有固定的開始和結束時間。相反,當會話窗口在一段時間內沒有接收到元素時,即當一個不活動間隙發生時,會話窗口將關閉。會話窗口分配器可以配置一個靜態會話間隙,也可以配置一個會話間隙提取器函數,該函數定義了不活動的時間長度。當這段時間到期時,當前會話關閉,隨后的元素被分配到一個新的會話窗口。
【特點】
【示例代碼】
EventTimeSessionWindows:會話事件時間窗口
SlidingProcessingTimeWindows:會話處理時間窗口
val input: DataStream[T]=...
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long={
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long={
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
窗口分配器 —— window() 方法
TumblingEventTimeWindows:滾動事件時間窗口
TumblingProcessingTimeWindows:滾動處理時間窗口
SlidingEventTimeWindows:滑動事件時間窗口
SlidingProcessingTimeWindows:滑動處理時間窗口
EventTimeSessionWindows:會話事件時間窗口
SlidingProcessingTimeWindows:會話處理時間窗口
window function 定義了要對窗口中收集的數據做的計算操作。可以分為兩類。
val input: DataStream[(String, Long)]=...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2)=> (v1._1, v1._2 + v2._2) }
val input: DataStream[(String, Long)]=...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
一個ProcessWindowFunction可以這樣定義和使用:
val input: DataStream[(String, Long)]=...
input
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String])={
var count=0L
for (in <- input) {
count=count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
官方文檔
Flink 明確支持以下三種時間語義:
上面圖片來源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/time/
我們可以直接在代碼中,對執行環境調用 setStreamTimeCharacteristic
方法,設置流的時間特性,具體的時間,還需要從數據中提取時間戳(timestamp)
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
官方文檔
當 Flink 以 Event Time 模式處理數據流時,它會根據數據里的時間戳來
處理基于時間的算子,由于網絡、分布式等原因,會導致亂序數據的產生,亂序數據會讓窗口計算不準確。Watermark正是處理亂序數據而來的。
遇到一個時間戳達到了窗口關閉時間,不應該立刻觸發窗口計算,而是等
待一段時間,等遲到的數據來了再關閉窗口。
時間戳的分配與 watermark 的生成是齊頭并進的,其可以告訴 Flink 應用程序事件時間的進度。其可以通過指定 WatermarkGenerator 來配置 watermark 的生成方式。
使用 Flink API 時需要設置一個同時包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具類中也提供了許多常用的 watermark 策略,并且用戶也可以在某些必要場景下構建自己的 watermark 策略。WatermarkStrategy 接口如下:
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
/**
* 根據策略實例化一個可分配時間戳的 {@link TimestampAssigner}。
*/
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
/**
* 根據策略實例化一個 watermark 生成器。
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
通常情況下,你不用實現此接口,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個工具類將自定義的 TimestampAssigner 與 WatermarkGenerator 進行綁定。
【例如】你想要要使用有界無序(bounded-out-of-orderness)watermark 生成器和一個 lambda 表達式作為時間戳分配器,那么可以按照如下方式實現:
WatermarkStrategy
.forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
.withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long=element._1
})
【溫馨提示】其中 TimestampAssigner 的設置與否是可選的,大多數情況下,可以不用去特別指定。
WatermarkStrategy 可以在 Flink 應用程序中的兩處使用:
【溫馨提示】第一種方式相比會更好,因為數據源可以利用 watermark 生成邏輯中有關分片/分區(shards/partitions/splits)的信息。使用這種方式,數據源通常可以更精準地跟蹤 watermark,整體 watermark 生成將更精確。
【示例】僅當無法直接在數據源上設置策略時,才應該使用第二種方式(在任意轉換操作之后設置 WatermarkStrategy):
val env=StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[MyEvent]=env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter())
val withTimestampsAndWatermarks: DataStream[MyEvent]=stream
.filter( _.severity==WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>)
withTimestampsAndWatermarks
.keyBy( _.getGroup )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce( (a, b)=> a.add(b) )
.addSink(...)
【示例】處理空閑數據源
如果數據源中的某一個分區/分片在一段時間內未發送事件數據,則意味著 WatermarkGenerator 也不會獲得任何新數據去生成 watermark。我們稱這類數據源為空閑輸入或空閑源。在這種情況下,當某些其他分區仍然發送事件數據的時候就會出現問題。由于下游算子 watermark 的計算方式是取所有不同的上游并行數據源 watermark 的最小值,則其 watermark 將不會發生變化。
WatermarkStrategy
.forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1))
// 注意時間是毫秒,所以根據時間戳不同,可能需要乘以1000
dataStream.assignAscendingTimestamps(_.timestamp * 1000)
// MyAssigner 可以有兩種類型,都繼承自 TimestampAssigner
dataStream.assignAscendingTimestamps(new MyAssigner())
定義了抽取時間戳,以及生成 watermark 的方法,有兩種類型
可以棄用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 了
在 Flink 新的 WatermarkStrategy,TimestampAssigner 和 WatermarkGenerator 的抽象接口之前,Flink 使用的是 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks。你仍可以在 API 中看到它們,但建議使用新接口,因為其對時間戳和 watermark 等重點的抽象和分離很清晰,并且還統一了周期性和標記形式的 watermark 生成方式。
flink1.11版本后 建議用WatermarkStrategy(Watermark生成策略)生成Watermark,當創建DataStream對象后,使用如下方法指定策略:assignTimestampsAndWatermarks(WatermarkStrategy<T>)
通常情況下,你不用實現此接口,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個工具類將自定義的 TimestampAssigner 與 WatermarkGenerator 進行綁定。
通過調用WatermarkStrategy對象上的forBoundedOutOfOrderness方法來實現,接收一個Duration類型的參數作為最大亂序(out of order)長度。WatermarkStrategy對象上的withTimestampAssigner方法為從事件數據中提取時間戳提供了接口。
【示例】
package com.com.streaming.watermarkstrategy;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.time.LocalDateTime;
//在assignTimestampsAndWatermarks中用WatermarkStrategy.forBoundedOutOfOrderness方法抽取Timestamp和生成周期性水位線示例
public class ForBoundedOutOfOrderness {
public static void main(String[] args) throws Exception{
//創建流處理環境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//設置EventTime語義
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置周期生成Watermark間隔(10毫秒)
env.getConfig().setAutoWatermarkInterval(10L);
//并行度1
env.setParallelism(1);
//演示數據
DataStreamSource<ClickEvent> mySource=env.fromElements(
new ClickEvent(LocalDateTime.now(), "user1", 1L, 1),
new ClickEvent(LocalDateTime.now(), "user1", 2L, 2),
new ClickEvent(LocalDateTime.now(), "user1", 3L, 3),
new ClickEvent(LocalDateTime.now(), "user1", 4L, 4),
new ClickEvent(LocalDateTime.now(), "user1", 5L, 5),
new ClickEvent(LocalDateTime.now(), "user1", 6L, 6),
new ClickEvent(LocalDateTime.now(), "user1", 7L, 7),
new ClickEvent(LocalDateTime.now(), "user1", 8L, 8)
);
//WatermarkStrategy.forBoundedOutOfOrderness周期性生成水位線
//可更好處理延遲數據
//BoundedOutOfOrdernessWatermarks<T>實現WatermarkGenerator<T>
SingleOutputStreamOperator<ClickEvent> streamTS=mySource.assignTimestampsAndWatermarks(
//指定Watermark生成策略,最大延遲長度5毫秒
WatermarkStrategy.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(5))
.withTimestampAssigner(
//SerializableTimestampAssigner接口中實現了extractTimestamp方法來指定如何從事件數據中抽取時間戳
new SerializableTimestampAssigner<ClickEvent>() {
@Override
public long extractTimestamp(ClickEvent event, long recordTimestamp) {
return event.getDateTime(event.getEventTime());
}
})
);
//結果打印
streamTS.print();
env.execute();
}
}
package com.com.streaming.watermarkstrategy;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
public class ClickEvent {
private String user;
private long l;
private int i;
private LocalDateTime eventTime;
public ClickEvent(LocalDateTime eventTime, String user, long l, int i) {
this.eventTime=eventTime;
this.user=user;
this.l=l;
this.i=i;
}
public LocalDateTime getEventTime() {
return eventTime;
}
public void setEventTime(LocalDateTime eventTime) {
this.eventTime=eventTime;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user=user;
}
public long getL() {
return l;
}
public void setL(long l) {
this.l=l;
}
public int getI() {
return i;
}
public void setI(int i) {
this.i=i;
}
public long getDateTime(LocalDateTime dt) {
ZoneOffset zoneOffset8=ZoneOffset.of("+8");
return dt.toInstant(zoneOffset8).toEpochMilli();
}
}
通過調用WatermarkStrategy對象上的forMonotonousTimestamps方法來實現,無需任何參數,相當于將forBoundedOutOfOrderness策略的最大亂序長度outOfOrdernessMillis設置為0。
package com.com.streaming.watermarkstrategy;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.time.LocalDateTime;
public class ForMonotonousTimestamps {
public static void main(String[] args) throws Exception{
//創建流處理環境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//設置EventTime語義
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置周期生成Watermark間隔(10毫秒)
env.getConfig().setAutoWatermarkInterval(10L);
//并行度1
env.setParallelism(1);
//演示數據
DataStreamSource<ClickEvent> mySource=env.fromElements(
new ClickEvent(LocalDateTime.now(), "user1", 1L, 1),
new ClickEvent(LocalDateTime.now(), "user1", 2L, 2),
new ClickEvent(LocalDateTime.now(), "user1", 3L, 3),
new ClickEvent(LocalDateTime.now(), "user1", 4L, 4),
new ClickEvent(LocalDateTime.now(), "user1", 5L, 5),
new ClickEvent(LocalDateTime.now(), "user1", 6L, 6),
new ClickEvent(LocalDateTime.now(), "user1", 7L, 7),
new ClickEvent(LocalDateTime.now(), "user1", 8L, 8)
);
//WatermarkStrategy.forMonotonousTimestamps周期性生成水位線
//相當于延遲outOfOrdernessMillis=0
//繼承自BoundedOutOfOrdernessWatermarks<T>
SingleOutputStreamOperator<ClickEvent> streamTS=mySource.assignTimestampsAndWatermarks(
WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
.withTimestampAssigner((event, recordTimestamp) -> event.getDateTime(event.getEventTime()))
);
//結果打印
streamTS.print();
env.execute();
}
}
WatermarkStrategy.noWatermarks()
上面代碼設置超時時間5毫秒,超過這個時間,沒有生成Watermark,將流狀態設置空閑,當下次有新的Watermark生成并發送到下游時,重新設置為活躍。
WatermarkStrategy.withIdleness(Duration.ofMillis(5))
未完待續~
Flink 是一個分布式流處理和批處理計算框架,具有高性能、容錯性和靈活性。下面是 Flink 的架構概述:
JobManager:JobManager 是 Flink 集群的主節點,負責接收和處理用戶提交的作業。JobManager 的主要職責包括:
解析和驗證用戶提交的作業。
生成執行計劃,并將作業圖分發給 TaskManager。
協調任務的調度和執行。
管理作業的狀態和元數據信息。
TaskManager:TaskManager 是 Flink 集群的工作節點,負責執行具體的任務。每個 TaskManager 可以運行多個任務(子任務),每個子任務運行在一個單獨的線程中,共享 TaskManager 的資源。TaskManager 的主要職責包括:
接收并執行 JobManager 分配的任務。
負責任務的數據處理、狀態管理、故障恢復等操作。
將處理結果返回給 JobManager。
StateBackend:StateBackend 是 Flink 的狀態管理機制,用于保存和恢復任務的狀態信息,確保任務在失敗后可以進行故障恢復。Flink 提供了多種 StateBackend 實現,包括內存、文件系統、RocksDB 等。
DataStream API / DataSet API:Flink 提供了兩種不同的編程接口,用于流處理和批處理:
DataStream API:面向流式計算,支持實時數據流的處理和分析。它提供了豐富的操作符(例如 map、filter、window、join 等)和窗口函數,以便進行數據轉換和聚合操作。
DataSet API:面向批處理,適用于有界數據集的處理。它提供了類似于 Hadoop MapReduce 的操作符(例如 map、reduce、join 等),用于對數據集進行轉換和計算。
Connectors:Flink 提供了多種連接器,用于與外部系統進行數據交互。常見的連接器包括 Kafka、Hadoop、Elasticsearch、JDBC 等,可以用于讀取和寫入外部數據源。
資源管理器:Flink 可以與各種集群管理工具(如 YARN、Mesos、Kubernetes)集成,以實現資源的動態分配和任務調度。
Flink 的架構使得它能夠實現高性能的流處理和批處理,同時具備良好的容錯性和可伸縮性。它廣泛應用于實時數據處理、數據湖分析、事件驅動應用等場景。
大數據流處理框架 Flink 和 Aflink 的技術架構主要包括以下組件:
JobManager:負責接收 Job 圖,并將其分發給 TaskManager。
TaskManager:負責執行任務,包括數據源、數據計算、數據匯總等操作。
StateBackend:用于保存狀態信息,支持容錯和恢復。
DataStream API:用于定義數據流處理邏輯,包括窗口函數、聚合操作等。
Connector:用于連接外部數據源,如 Kafka。
編輯
JobManager 和 TaskManager 之間的通信方式主要有兩種:心跳機制和RPC(遠程過程調用)。
心跳機制:JobManager 和 TaskManager 通過心跳機制保持連接和通信。具體流程如下:
JobManager 定期向所有的 TaskManager 發送心跳信號,確認 TaskManager 是否存活。
TaskManager 接收到心跳信號后,回復確認信號給 JobManager,表示自己還活著。
如果 JobManager 在一段時間內沒有收到 TaskManager 的心跳信號,就會認為該 TaskManager 失效,并進行相應的處理。
RPC:JobManager 和 TaskManager 使用 RPC 機制進行通信,以傳遞任務和數據等信息。具體流程如下:
JobManager 將任務調度圖發送給 TaskManager。這包括任務的執行計劃、數據源、算子操作等。
TaskManager 接收到任務調度圖后,根據指令執行任務,處理數據流。
TaskManager 在處理過程中將結果返回給 JobManager,以便進行狀態更新和后續處理。
編輯
需要注意的是,JobManager 和 TaskManager 的通信是基于網絡的,它們可以部署在不同的機器上。在一個 Flink 集群中,通常會有一個 JobManager 和多個 TaskManager,它們通過上述的通信方式協同工作,實現數據流的處理和任務調度。
流式計算:Flink 和 Aflink 是流式計算框架,能夠實時處理無界數據流。流式計算基于事件驅動的模型,能夠處理實時數據并支持低延遲計算。
窗口函數:窗口函數用于對數據流進行分組聚合操作,常見的窗口類型包括滾動窗口、滑動窗口和會話窗口。窗口函數允許用戶在有限的數據集上執行計算操作。
窗口類型
Flink 框架提供了多種窗口函數,用于對數據流進行分組聚合操作。以下是一些常見的窗口函數:
滾動窗口(Tumbling Window):將數據流劃分為固定大小的、不重疊的窗口。每個窗口包含相同數量的元素,并且窗口之間沒有重疊。可以通過 window(Tumble.over()) 方法來定義滾動窗口。
滑動窗口(Sliding Window):將數據流劃分為固定大小的、可能重疊的窗口。每個窗口包含指定數量的元素,并且窗口之間可以有重疊。可以通過 window(Slide.over()) 方法來定義滑動窗口。
會話窗口(Session Window):根據事件之間的時間間隔將數據流劃分為不固定長度的會話窗口。如果在指定時間間隔內沒有新事件到達,則會話窗口關閉。可以通過 window(Session.withGap()) 方法來定義會話窗口。
全局窗口(Global Window):將整個數據流視為一個窗口,不進行數據切分。適用于需要計算整個數據流的聚合結果的場景。可以通過 window(Global()) 方法來定義全局窗口。
自定義窗口函數:Flink 還支持自定義窗口函數,以便滿足特定需求。您可以實現 WindowFunction 接口來定義自己的窗口函數,并通過 apply() 方法來處理窗口中的元素。
這些窗口函數可以和其他操作符(例如 groupBy()、reduce()、aggregate() 等)一起使用,以實現各種數據流處理和聚合操作。
不同類型的窗口函數適用于不同的業務場景,具體選擇哪種窗口函數取決于您的需求和數據流的特點。
窗口函數都有其特定的使用場景,下面我會簡要介紹每種窗口函數的典型應用場景,并提供 Java 和 Python 代碼示例。
滾動窗口(Tumbling Window)
使用場景:適用于需要對固定大小的數據范圍進行聚合計算的場景,例如統計每5分鐘內的數據總和。
Java 代碼示例:
DataStream<Tuple2<String, Integer>> input = ... ; // 輸入數據流 DataStream<Tuple2<String, Integer>> result = input .keyBy(0) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .sum(1);
Python 代碼示例:
from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.window import TumblingEventTimeWindows from pyflink.common import WatermarkStrategy env = StreamExecutionEnvironment.get_execution_environment() input_stream = ... # 輸入數據流result_stream = input_stream.key_by(lambda x: x[0]).window(TumblingEventTimeWindows.of('5 minutes')).sum(1)
滑動窗口(Sliding Window)
使用場景:適用于需要對數據流進行連續且重疊的窗口計算的場景,例如統計每5分鐘計算一次數據總和,并且每次計算時包含前一個窗口的部分數據。
Java 代碼示例:
DataStream<Tuple2<String, Integer>> input = ... ; // 輸入數據流 DataStream<Tuple2<String, Integer>> result = input .keyBy(0) .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5))) .sum(1);
Python 代碼示例:
from pyflink.datastream.window import SlidingEventTimeWindows result_stream = input_stream .key_by(lambda x: x[0]).window(SlidingEventTimeWindows.of('10 minutes', '5 minutes')).sum(1)
會話窗口(Session Window)
使用場景:適用于需要基于活動之間的間隔時間來劃分窗口的場景,例如用戶在網站上的一系列操作之間的時間間隔作為窗口的劃分條件。
Java 代碼示例:
DataStream<Tuple2<String, Integer>> input = ... ; // 輸入數據流 DataStream<Tuple2<String, Integer>> result = input .keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(10))) .sum(1);
Python 代碼示例:
from pyflink.datastream.window import EventTimeSessionWindows result_stream = input_stream .key_by(lambda x: x[0]).window(EventTimeSessionWindows.with_gap('10 minutes')).sum(1)
全局窗口(Global Window)
使用場景:適用于對整個數據流進行聚合計算的場景,例如統計全天的數據總和。
Java 代碼示例:
DataStream<Tuple2<String, Integer>> input = ... ; // 輸入數據流 DataStream<Tuple2<String, Integer>> result = input .keyBy(0) .window(GlobalWindows.create()) .trigger(CountTrigger.of(1)) .sum(1);
Python 代碼示例:
from pyflink.datastream.window import GlobalWindows from pyflink.datastream.trigger import CountTrigger result_stream = input_stream .key_by(lambda x: x[0]).window(GlobalWindows.create()) .trigger(CountTrigger(1)).sum(1)
以下是一個簡單的示例代碼,使用 Java 和 Python 分別演示讀取 Kafka 數據并計算指標的過程:
Java 代碼示例:
// 創建 Flink 程序入口 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 從 Kafka 中讀取數據 FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); DataStream<String> stream = env.addSource(kafkaConsumer); // 對數據流進行處理,計算指標 DataStream<Result> resultStream = stream .flatMap(new UserAccessFlatMapFunction()) .keyBy("userId") .timeWindow(Time.minutes(5)) .apply(new UserAccessWindowFunction()); // 執行任務 env.execute("User Access Analysis");
Python 代碼示例:
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings from pyflink.table.descriptors import Schema, Kafka # 創建 Flink 環境 env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()) # 從 Kafka 讀取數據 t_env.connect( Kafka() .version("universal") .topic("topic") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") .start_from_earliest() .finish() ).with_format( Json() ).with_schema( Schema() .field("user_id", DataTypes.STRING()) .field("timestamp", DataTypes.TIMESTAMP(3)) ).create_temporary_table("MySource") # 計算指標 t_env.from_path("MySource") \ .window(Tumble.over("5.minutes").on("timestamp").alias("w")) \ .group_by("user_id, w") \ .select("user_id, w.end as window_end, count(user_id) as pv, count_distinct(user_id) as uv") \ .execute_insert("MySink")
Flink 和 Aflink 可以用于加載數據湖中的大規模數據集,進行 AI 模型訓練。通過流式處理和批處理相結合,可以有效處理圖片、音頻、文本等多媒體數據,用于風控等場景。
當使用 Flink 進行機器學習時,通常會使用 Flink 的批處理和流處理 API 結合機器學習庫(如 Apache Flink ML、Apache Mahout 等)來實現各種機器學習任務。這里我將為您提供一個簡單的示例,演示如何在 Flink 中使用批處理 API 來進行線性回歸的訓練。
首先,讓我們看一下 Java 代碼示例:
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; import org.apache.flink.ml.common.LabeledVector; import org.apache.flink.ml.regression.MultipleLinearRegression; public class LinearRegressionExample { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 創建帶標簽的向量數據集 DataSet<LabeledVector> trainingData = ... ; // 從數據源加載帶標簽的向量數據集 // 初始化線性回歸模型 MultipleLinearRegression mlr = new MultipleLinearRegression(); mlr.setStepsize(0.5); // 設置步長 mlr.setIterations(100); // 設置迭代次數 // 訓練線性回歸模型 mlr.fit(trainingData); // 獲取訓練后的模型參數 double[] weights = mlr.weights(); double intercept = mlr.intercept(); // 打印模型參數 System.out.println("Weights: " + Arrays.toString(weights)); System.out.println("Intercept: " + intercept); } }
Python 代碼示例:
from pyflink.dataset import ExecutionEnvironment from pyflink.datastream import LabeledVector from pyflink.ml.preprocessing import Splitter from pyflink.ml.regression import MultipleLinearRegression env = ExecutionEnvironment.get_execution_environment() # 創建帶標簽的向量數據集 training_data = ... # 從數據源加載帶標簽的向量數據集 # 初始化線性回歸模型 mlr = MultipleLinearRegression() mlr.set_step_size(0.5) # 設置步長 mlr.set_max_iterations(100) # 設置最大迭代次數 # 訓練線性回歸模型mlr.fit(training_data) # 獲取訓練后的模型參數 weights = mlr.weights_ intercept = mlr.intercept_ # 打印模型參數 print("Weights: ", weights) print("Intercept: ", intercept)
在 Flink 中使用批處理 API 進行線性回歸模型的訓練。實際上,在 Flink 中進行更復雜的機器學習任務時,可能需要結合更多的預處理、特征工程、模型評估等步驟,以及更豐富的機器學習算法和模型庫。
Flink Table API 是 Apache Flink 提供的一種用于處理結構化數據的高級 API,它提供了一種類 SQL 的聲明性編程方式,使用戶可以通過類 SQL 的語法來操作流式和批處理數據。使用 Table API,用戶可以方便地進行數據查詢、轉換、聚合等操作,而無需編寫復雜的低級別代碼。
下面是一個簡單的示例,演示如何在 Flink 中使用 Table API 來實現對輸入數據流的簡單轉換和聚合:
Java 示例:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class TableAPIExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 創建輸入數據流 Table inputTable = tableEnv.fromDataStream(inputDataStream, "name, age"); // 查詢和轉換操作 Table resultTable = inputTable .filter("age > 18") .groupBy("name") .select("name, count(1) as count"); // 將結果表轉換為數據流并打印輸出 tableEnv.toRetractStream(resultTable, Row.class).print(); env.execute("Table API Example"); } }
Python 示例:
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) # 創建輸入數據流 input_table = table_env.from_data_stream(input_data_stream, ['name', 'age']) # 查詢和轉換操作 result_table = input_table \ .filter("age > 18") \ .group_by("name") \ .select("name, count(1) as count") # 將結果表轉換為數據流并打印輸出 table_env.to_retract_stream(result_table, Row).print() env.execute("Table API Example")
創建了一個輸入數據流,然后使用 Table API 對數據流進行過濾、分組和聚合操作,最后將結果表轉換為數據流并打印輸出。這展示了 Table API 的簡單用法,更復雜的操作和功能可以根據具體需求進行擴展。
發展歷史:Flink 于 2015 年正式發布,是一個快速發展的流處理引擎,Aflink 是 Flink 在國內的一個分支,也得到了廣泛應用。
市場優勢:Flink 和 Aflink 具有低延遲、高吞吐量等優勢,適用于實時數據處理場景。在大數據領域,它們已成為重要的流式計算框架,廣泛應用于金融、電商、物聯網等行業。