利用有狀態(tài)流處理來(lái)維護(hù)增量更新的物化視圖
杜尼特-達(dá)努什卡 8分鐘閱讀
照片: on
在本系列的第一部分,我們了解了物化視圖的基本原理,以及它們的缺點(diǎn)。然后,我向你介紹了流處理,作為維護(hù)自我更新的物化視圖的一種可擴(kuò)展方式。
這篇文章探討了有狀態(tài)流處理中的兩個(gè)基本概念:流和表;以及流如何變成表,形成物化視圖。在文章的最后,我們將學(xué)習(xí)這些物化視圖如何被擴(kuò)展和從失敗中恢復(fù)。
有狀態(tài)的流處理
流處理有兩大類(lèi);無(wú)狀態(tài)和有狀態(tài)處理。
在無(wú)狀態(tài)模式下,你孤立地處理每個(gè)事件。最基本的模式是將不必要的事件從流中過(guò)濾掉,或?qū)蝹€(gè)事件進(jìn)行轉(zhuǎn)換。過(guò)去的事件對(duì)當(dāng)前事件的處理沒(méi)有影響。
無(wú)狀態(tài)流處理處理單個(gè)事件
相比之下,有狀態(tài)的流處理是復(fù)雜的,因?yàn)樗幚淼氖?"狀態(tài)"。" 大多數(shù)流處理應(yīng)用都關(guān)注聚合、連接和時(shí)間窗口操作。例如,我們可以通過(guò)商店的ID來(lái)聚合零售交易,以查看每個(gè)商店的銷(xiāo)售業(yè)績(jī)。
這些聚合需要為流維護(hù)一個(gè)狀態(tài)。在前面的情況下可更新的物化視圖,每個(gè)存儲(chǔ)的運(yùn)行總量必須在其他地方維護(hù)。例如,在一個(gè)鍵/值存儲(chǔ)中。下次有交易事件發(fā)生時(shí),我們可以查找該事件的商店ID的當(dāng)前總數(shù),然后進(jìn)行遞增。
聚合、連接和窗口操作都需要保持一個(gè)狀態(tài)。
本地狀態(tài)與外部狀態(tài)
通常情況下,流處理器會(huì)將這種狀態(tài)保存在本地,以便更快地訪問(wèn)。它首先被寫(xiě)入內(nèi)存,然后最終被刷新到磁盤(pán)上的鍵/值存儲(chǔ),如。
但在某些情況下,狀態(tài)被存儲(chǔ)在一個(gè)外部地方,如數(shù)據(jù)庫(kù)。雖然它引入了額外的延遲,但對(duì)于簡(jiǎn)單的工作負(fù)載來(lái)說(shuō)效果很好,并為你提供良好的可擴(kuò)展性。
有狀態(tài)的處理引入了許多挑戰(zhàn),尤其是在狀態(tài)管理方面。你必須在狀態(tài)的擴(kuò)展和容錯(cuò)方面花費(fèi)大量的心思。我們將在接下來(lái)的章節(jié)中詳細(xì)討論這些問(wèn)題。
用有狀態(tài)流處理維護(hù)物化視圖
傳統(tǒng)的數(shù)據(jù)庫(kù)支持的物化視圖有一個(gè)主要問(wèn)題,那就是它不能增量地更新視圖的內(nèi)容。整個(gè)視圖必須不時(shí)地被重建,這是很昂貴的。
但是,一個(gè)有狀態(tài)的流處理器可以解決這個(gè)問(wèn)題,它將事件流具體化為一個(gè)持久的視圖,然后在新的數(shù)據(jù)進(jìn)來(lái)時(shí)更新它。流處理器負(fù)責(zé)視圖的維護(hù),這是自動(dòng)和增量的。一旦有新的事件到來(lái),視圖就會(huì)被更新,并以盡可能小的方式基于增量進(jìn)行調(diào)整,而不是從頭開(kāi)始重新計(jì)算。因此,避免了視圖的完全重建。
理解這一點(diǎn)需要你首先熟悉一些概念和行話。讓我們?cè)诮酉聛?lái)的章節(jié)中慢慢解讀這些概念。
流和表
在流處理中,有兩個(gè)基本概念需要我們?nèi)ダ斫?-流和表。
一個(gè)流是一個(gè)不可改變的、只附加的事件序列,代表了變化的歷史。一個(gè)表包含世界的當(dāng)前狀態(tài),它是許多變化的結(jié)果。
在我們上面的零售商店的例子中,一系列的商店交易代表一個(gè)流,而商店銷(xiāo)售的匯總代表一個(gè)表。管理層感興趣的是當(dāng)前的銷(xiāo)售報(bào)告可更新的物化視圖,而不是單個(gè)銷(xiāo)售。
有時(shí),我們想要的是當(dāng)前的狀態(tài)而不是狀態(tài)的變化
將一個(gè)數(shù)據(jù)流物化為一個(gè)表
為了將一個(gè)流轉(zhuǎn)換為一個(gè)表,我們需要應(yīng)用流中包含的所有變化。這也被稱(chēng)為流的物化。
為了具體化一個(gè)流,我們從頭到尾看一遍流中的所有事件,一邊看一邊改變狀態(tài)。當(dāng)我們完成后,我們有一個(gè)代表特定時(shí)間的狀態(tài)的表,我們可以使用。這個(gè)表可以在內(nèi)存中,在本地狀態(tài)中,或者在一個(gè)外部數(shù)據(jù)庫(kù)中。
實(shí)踐中的物化視圖
為了更好地理解這一點(diǎn),讓我向你展示如何用Kafka原生流處理框架構(gòu)建我們的零售實(shí)例。
一個(gè)典型的零售交易有以下格式。
{
"order_id":123456767,
"customer_id":1232。
"store_id":2123。
"created_at": "2021-09-23"
}
然后我們可以定義一個(gè)流來(lái)代表一系列的交易。
CREATE STREAM transactions (
order_id INT KEY,
customer_id INT,
store_id INT,
total DOUBLE,
created_at VARCHAR

) WITH (
kafka_topic = 'trasactions',
partitions = 2,
value_format = 'json'
);
你可能想檢查一下當(dāng)前每個(gè)商店的總銷(xiāo)售額明細(xì)。你可以通過(guò)物化一個(gè)流的視圖來(lái)做到這一點(diǎn)。
CREATE TABLE sales_by_store AS
SELECT store_id, SUM(total) as total
FROM TRANSACTIONS
GROUP BY store_id
EMIT CHANGES;
當(dāng)你在上運(yùn)行這個(gè)語(yǔ)句時(shí)會(huì)發(fā)生什么?
服務(wù)器創(chuàng)建了一個(gè)新的持久性查詢,永遠(yuǎn)運(yùn)行,在數(shù)據(jù)到達(dá)時(shí)進(jìn)行處理。當(dāng)每條記錄從事務(wù)流中讀取時(shí),持久化查詢會(huì)做兩件事。
你可以把主題看作是對(duì)物化視圖的所有更新的審計(jì)跟蹤。這在我們討論容錯(cuò)問(wèn)題時(shí)將會(huì)很有用。所以我們現(xiàn)在先跳過(guò)這一點(diǎn)。
查詢物化視圖
為客戶程序提供了兩種將物化視圖數(shù)據(jù)引入應(yīng)用程序的方式。首先,你可以使用拉動(dòng)查詢,在某個(gè)時(shí)間點(diǎn)檢索結(jié)果。
如果你運(yùn)行下面的查詢,當(dāng)它執(zhí)行時(shí),結(jié)果將是物化視圖中的任何內(nèi)容。
SELECT * FROM sales_by_store WHERE store_id=2000;
相反,我們有推送查詢,當(dāng)查詢結(jié)果發(fā)生變化時(shí),流向你的應(yīng)用程序。例如,下面的查詢連續(xù)地將變化日志中的每一行以=2000的方式進(jìn)行流轉(zhuǎn)。
SELECT * FROM sales_by_store WHERE store_id=2000 EMIT CHANGES;
物化視圖是如何被持久化的?
在幕后,使用來(lái)存儲(chǔ)物化視圖的內(nèi)容。是一個(gè)嵌入式的鍵/值存儲(chǔ),它在每個(gè)服務(wù)器的進(jìn)程中運(yùn)行--你不需要啟動(dòng)、管理或與它交互。
美中不足的是,抽象出了在磁盤(pán)上以高性能存儲(chǔ)和索引關(guān)聯(lián)數(shù)據(jù)結(jié)構(gòu)的復(fù)雜性。作為一個(gè)開(kāi)發(fā)者,這讓你專(zhuān)注于流處理邏輯,而不是與狀態(tài)管理作斗爭(zhēng)。
Flink如何維護(hù)本地狀態(tài)
縮小本地狀態(tài)的規(guī)模
現(xiàn)在你已經(jīng)看到了像這樣的流處理器是如何將一個(gè)流具體化到本地狀態(tài)的。
想象一下,我們的流處理器正在從一個(gè)有四個(gè)分區(qū)的Kafka主題中讀取事件。
流處理器通常先將傳入的數(shù)據(jù)寫(xiě)到內(nèi)存中。然后定期將其刷出到磁盤(pán)(本地狀態(tài))。如果我們只有一個(gè)流處理器的實(shí)例,很快它就會(huì)耗盡分配的內(nèi)存和磁盤(pán)空間。
為了擴(kuò)大處理規(guī)模,我們可以部署多個(gè)流處理器的實(shí)例。你可以通過(guò)添加更多的流處理器來(lái)提高處理的吞吐量。
但是,我們?nèi)绾未_保一個(gè)事件只被一個(gè)流處理器所處理?這就是Kafka的消費(fèi)者組協(xié)議的作用。
不同的流處理框架,如 Flink、和Kafka 都在下面實(shí)現(xiàn)了Kafka消費(fèi)者協(xié)議。分區(qū)分布在流處理器中,在一個(gè)消費(fèi)者組中只有一個(gè)處理器消耗一個(gè)分區(qū)。
當(dāng)有多個(gè)流處理器時(shí),本地狀態(tài)被分散到每個(gè)處理器實(shí)例中。每個(gè)處理器只擁有整個(gè)事件流的一個(gè)子集,這通常由事件的分區(qū)鍵驅(qū)動(dòng)。
這在內(nèi)存和磁盤(pán)空間方面提供了無(wú)限的可擴(kuò)展性。
容錯(cuò)--從故障中恢復(fù)。
我們剛剛解決了擴(kuò)展的問(wèn)題。接下來(lái)我們來(lái)談?wù)勅蒎e(cuò)問(wèn)題。
在我們的零售例子中,如果一個(gè)流處理器在處理流的時(shí)候發(fā)生故障,會(huì)發(fā)生什么?我們能恢復(fù)崩潰的實(shí)例的物化本地狀態(tài)嗎?
是的,由于Kafka的變化日志架構(gòu),這是有可能的。
早些時(shí)候,我們學(xué)習(xí)了如何將一個(gè)流變成一個(gè)表。同時(shí),也可以把一個(gè)表變成一個(gè)流。要做到這一點(diǎn),我們需要捕獲修改表的變化。把所有這些插入、更新和刪除的事件,存儲(chǔ)在一個(gè)流中。這就是我們?cè)诶又锌吹降牧鳌?/p>
在我們的例子中,我們有四個(gè)處理器將四個(gè)表物化。這些表在Kafka中創(chuàng)建了四個(gè)變化日志流。讓我們假設(shè)流處理器D發(fā)生故障。那么Kafka將觸發(fā)一個(gè)分區(qū)的重新平衡,這樣分區(qū)P4就可能被分配給C。
最初,C并沒(méi)有D所維護(hù)的本地表。但是,C可以通過(guò)重放D的變化日志流來(lái)重建它。
C通過(guò)重放D的變化日志流趕上D的最后已知良好狀態(tài)。
這種類(lèi)型的故障恢復(fù)已經(jīng)內(nèi)置于今天的許多流處理器中,將你從解決分布式系統(tǒng)中最復(fù)雜的問(wèn)題中解放出來(lái)。
經(jīng)驗(yàn)之談
下一部分探討了利用變化數(shù)據(jù)捕獲(CDC)來(lái)連接一個(gè)流和一個(gè)表。