網易傳媒基於Arctic的低成本準實時計算實踐
本站傳媒大數據實際業務中,存在着大量的準實時計算需求場景,業務方對於數據的實效性要求一般是分鐘級;這種場景下,用傳統的離線數倉方案不能滿足用戶在實效性方面的要求,而使用全鏈路的實時計算方案又會帶來較高的資源佔用。
基於對開源數據湖方案的調研,我們注意到了本站數帆開源的基於 Apache Iceberg 構建的 Arctic 數據湖解決方案。Arctic 能相對較好地支持與服務於流批混用的場景,其開放的疊加式架構,可以幫助我們非常平滑地過渡與實現 Hive 到數據湖的升級改造,且由於傳媒離線數倉已接入有數,通過 Arctic 來改造現有業務的成本較低,於是我們準備通過引入 Arctic ,嘗試解決 push 業務場景下的痛點。
1
項目背景
以傳媒 push 實時數倉爲例,新聞推送在地域、時間、頻次等因素上有較高的不確定性,非常容易出現偶發的流量洪峰,尤其是在出現突發性社會熱點新聞的時候。如果採用全鏈路的實時計算方案來處理,則需要預留出較多的資源 buffer 來應對。
由於推送時機的不確定性,push 業務的數據指標一般不是增量型的,而是以當天截止到當前的各種累計型指標爲主,計算窗口通常爲十五分鐘到半小時不等,統計維度區分發送類型、內容分類、發送票數、發送廠商、首啓方式、用戶活躍度、AB 實驗等,具有流量波動大和數據口徑繁多等特點。
此前採用的全鏈路 Flink 實時計算方案中,主要遇到以下問題:
(1)資源佔用成本高
爲應對流量洪峰,需要爲實時任務分配預留出較高的資源,且多個聚合任務需要消費同一個上游數據,存在讀放大問題。push 相關的實時計算流程佔到了實時任務總量的 18+%,而資源使用量佔到了實時資源總使用量的近 25%。
(2)大狀態帶來的任務穩定性下降
push 業務場景下進行窗口計算時,大流量會帶來大狀態的問題,而大狀態的維護在造成資源開支的同時比較容易影響任務的穩定性。
(3)任務異常時難以及時的進行數據修復
實時任務出現異常時,以實時方式來回溯數據時效慢且流程複雜;而以離線流程來修正,則會帶來雙倍的人力和存儲成本。
2
項目思路和方案
2.1 項目思路
我們通過對數據湖的調研,期望利用數據實時入湖的特點,同時使用 Spark 等離線資源完成計算,用較低的成本滿足業務上對準實時計算場景的需求。我們以 push 業務場景作爲試點進行方案的探索落地,再逐漸將方案推廣至更多類似業務場景。
基於對開源數據湖方案的調研,我們注意到了本站數帆開源的基於 Apache Iceberg 構建的 Arctic 數據湖解決方案。Arctic 能相對較好地支持與服務於流批混用的場景,其開放的疊加式架構,可以幫助我們非常平滑地過渡與實現 Hive 到數據湖的升級改造,且由於傳媒離線數倉已接入有數,通過 Arctic 來改造現有業務的成本較低,於是我們準備通過引入 Arctic ,嘗試解決 push 業務場景下的痛點。
Arctic 是由本站數帆開源的流式湖倉系統,在 Iceberg 和 Hive 之上添加了更多實時場景的能力。通過 Arctic,用戶可以在 Flink、Spark、Trino、Impala 等引擎上實現更加優化的 CDC、流式更新、OLAP 等功能。
實現 push 業務場景下的數據湖改造,只需要使用 Arctic 提供的 Flink Connector,便可快速地實現 push 明細數據的實時入湖。
此時需要我們關注的重點是,數據產出需要滿足分鐘級業務需求。數據產出延遲由兩部分組成:
數據就緒延遲,取決於 Flink 實時任務的 Commit 間隔,一般爲分鐘級別;
數據計算耗時,取決於計算引擎和業務邏輯:數據產出延遲 = 數據就緒延遲 + 數據計算耗時
2.2 解決方案
2.2.1 數據實時入湖
Arctic 能夠兼容已有的存儲介質(如 HDFS)和表結構(如 Hive、Iceberg),並在之上提供透明的流批一體表服務。存儲結構上主要爲 Basestore 和 Changestore 兩部分:
(1)Basestore 中存儲了表的存量數據。它通常由 Spark/Flink 等引擎完成第一次寫入,再之後則通過自動的結構優化過程將 Changestore 中的數據轉化之後寫入。
(2)Changestore 中存儲了表上最近的變更數據。Changestore 中存儲了表上最近的變更數據。它通常由 Apache Flink 任務實時寫入,並用於下游 Flink 任務進行準實時的流式消費。同時也可以對它直接進行批量計算或聯合 Basestore 裡的數據一起通過 Merge-On-Read(以下簡稱爲MOR) 的查詢方式提供分鐘級延遲的批量查詢能力。
Arctic 表支持實時數據的流式寫入,數據寫入過程中爲了保證數據的實效性,寫入側需要頻繁的進行數據提交,但因此會產生大量的小文件,積壓的小文件一方面會影響數據的查詢性能,另一方面也會對文件系統帶來壓力。這方面,Arctic 支持基於主鍵的行級更新,提供了 Optimizer 來進行數據 Update 和自動的結構優化,以幫助用戶解決數據湖常見的小文件、讀放大、寫放大等問題。
以傳媒 push 數倉場景爲例,push 發送、送達、點擊、展示等明細數據需要通過 Flink 作業實時寫入到 Arctic 中。由於上游已經做了 ETL 清洗,此階段只需要通過 FlinkSQL 即可方便地將上游數據寫入 Changestore。Changestore 內包含了存儲插入數據的 insert 文件和存儲刪除數據的 equality delete 文件,更新數據會被拆分爲更新前項和更新後項分別存儲在 delete 文件與 insert 文件中。
具體的,對於有主鍵場景,insert/update_after 消息會寫入 Changestore 的 insert 文件,delete/update_before 會寫入 Arctic 的 delete 文件。當進行 Optimize 的時候,會先把 delete 文件讀到內存中形成一個 delete map, map 的 key 是記錄的主鍵,value 是 record_lsn。然後 再讀取 Basestore 和 Changestore 中的 insert 文件, 對主鍵相同的 row 進行 record_lsn 的對比,如果 insert 記錄中 record_lsn 比 deletemap 中相同主鍵的 record_lsn 小,則認爲這條記錄已經被刪除了,不會再追加到 base 裡;否則把數據寫入到新文件裡,最終實現了行級的更新。
2.2.2 湖水位感知
傳統的離線計算在調度方面需要有一個觸發機制,一般由作業調度系統按照任務之間的依賴關係來處理,當上遊任務全部成功後自動調起下游的任務。但在實時入湖的場景下,下游任務缺乏一個感知數據是否就緒的途徑。以 push 場景爲例,需要產出的指標主要爲按照指定的時間粒度來計算一次當天累計的各種統計值,此時下游如果沒法感知當前湖表水位的話,要麼需要留出一個較冗餘的緩衝時間來保證數據就緒,要麼則有漏數據的可能,畢竟 push 場景的流量變化是非常起伏不定的。
傳媒大數據團隊和 Arctic 團隊借鑑了 Flink Watermark 的處理機制和 Iceberg 社區討論的方案,將 Watermark 信息寫入到 Iceberg 表的 metadata 文件裡,然後由 Arctic 通過消息隊列或者 API 暴露出來,從而做到下游任務的主動感知,儘可能地降低了啓動延遲。具體方案如下:
(1)Arctic 表水位感知
當前只考慮 Flink 寫入的場景,業務在 Flink 的 source 定義事件時間和 Watermark。ArcticSinkConnector 包含兩個算子,一個是負責寫文件的多併發的 ArcticWriter, 一個是負責提交文件的的單併發的 ArcticFileCommitter。當執行 checkpoint 時,ArcticFileCommitter 算子會進行 Watermark 對齊之後取最小的 Watermark。會新建一個類似於 Iceberg 事務的 AMS Transaction,在這個事務裡除了 AppendFiles 到 Iceberg,同時把 TransactionID,以及 Watermark 通過 AMS 的 thrift 接口上報給 AMS。
(2)Hive 表水位感知
Hive表裡可見的數據是經過 Optimize 過後的數據,Optimize 由 AMS 來調度,Flink 任務異常執行文件的讀寫合併,並且把 Metric 上報給 AMS, 由 AMS 來把這一次 Optimize 執行的結果 Commit,AMS 天然知道這一次 Optimize 推進到了哪次 Transaction, 並且 AMS 本身也存儲了 Transaction 對應的 Watermark,也就知道 Hive 表水位推進到了哪裡。
2.2.3 數據湖查詢
Arctic 提供了 Spark/Flink/Trino/Impala 等計算引擎的 Connector 支持。通過使用Arctic數據源,各計算引擎都可以實時讀取到已經 Commit 的文件,Commit 的間隔按照業務的需求一般爲分鐘級別。下面以 push 業務爲例介紹幾種場景下的查詢方案和相應成本:
(1)Arctic + Trino/Impala 滿足秒級 OLAP 查詢
OLAP 場景下,用戶一般更關注計算上的耗時,對數據就緒的敏感度相對不高。針對中小規模數據量的 Arctic 表或較簡單的查詢,通過 Trino/Impala 進行 OLAP 查詢是一個相對高效的方案,基本上可以做到秒級 MOR 查詢耗時。成本上,需要搭建 Trino/Impala 集羣,如果團隊中已有在使用的話,則可以根據負載情況考慮複用。
Arctic 在開源發佈會上發佈了自己的 benchmark 數據,在數據庫 CDC 持續流式攝取的場景下,對比各個數據湖 Format 的 OLAP benchmark 性能, 整體上帶 Optimize 的 Arctic 的性能優於 Hudi,這主要得益於 Arctic 內部有一套高效的文件索引 Arctic Tree,在 MOR 場景下可以做到更細粒度、精確地 merge。詳細的對比報告可以參考:https://arctic.netease.com/ch/benchmark/。
(2)Arctic + Spark 滿足分鐘級預聚合查詢
針對提供下游數據報表展示的場景,一般需要走預計算的流程將結果持久化下來,對數據就緒和計算耗時的敏感度都較高,而且查詢邏輯相對複雜,Trino/Impala 集羣規模相對較小,執行容易失敗,導致穩定性欠佳。這個場景下我們使用了集羣部署規模最大的 Spark 引擎來處理,在不引入新的資源成本的情況下,做到了離線計算資源的複用。
數據就緒方面,通過 Arctic 表水位感知方案,可以做到較低的分鐘級就緒延遲。
計算方面,Arctic 對 Spark Connector 提供了一些讀取優化,用戶可以通過配置 Arctic 表的 read.split.planning-parallelism 和 read.split.planning-parallelism-factor 這兩個參數值,來調整 Arctic Combine Task 的數量,進而控制計算任務的併發度。由於 Spark 離線計算的資源相對靈活和充足,我們可以通過上述調整併發度的方式來保證在 2~3 分鐘內完成業務的計算需求。
(3)Hive + Spark 滿足傳統離線數倉生產鏈路的調度
Arctic 支持將 Hive 表作爲 Basestore,Full Optimize 時會將文件寫入到 Hive 數據目錄下,以達到更新 Hive 原生讀取內容的目的,通過存儲架構上的流批一體來降低成本。因此傳統的離線數倉生產鏈路,可以直接使用對應的 Hive 表來作爲離線數倉鏈路的一部分,時效性上相較於 Arctic 表雖缺少了 MOR,但通過 Hive 表水位感知方案,可以做到業務能接受的就緒延遲,從而滿足傳統離線數倉生產鏈路的調度。
3
項目影響力與產出價值
3.1 項目影響力
通過 Arctic + X 方案在傳媒的探索和落地,爲傳媒準實時計算場景提供了一個新的解決思路。該思路不但減輕了全鏈路 Flink 實時計算方案所帶來的實時資源壓力和開發運維負擔,而且還能較好地複用現有的 HDFS 和 Spark 等存儲計算資源,做到了降本增效。
此外 Arctic 在音樂、有道等多個 BU 也有落地,比如在音樂公技,用於 ES 冷數據的存儲,降低了用戶 ES 的存儲成本;而有道精品課研發團隊也在積極探索和使用 Arctic 作爲其部分業務場景下的解決方案。
目前 Arctic 已經在 github 上開源,受到了開源社區與外部用戶的持續關注,在 Arctic 的建設與發展中,也收到了不少外部用戶提交的高質量 PR 。
3.2 項目產出價值
通過上述方案我們將 push ETL 明細數據通過 Flink 實時入湖到 Arctic,然後在調度平臺上配置分鐘級的調度任務,按照不同交叉維度進行計算後將累計型指標後寫入關係數據庫,最後通過有數直連進行數據展示,做到了業務方要求的分鐘級時效數據產出。改造後的方案,同原來的全鏈路 Flink 實時計算方案相比:
(1)充分複用離線空閒算力,降低了實時計算資源開支
方案利用了空閒狀態下的離線計算資源,且基本不會帶來新的資源開支。離線計算業務場景註定了資源使用的高峰在凌晨,而新聞 push 推送及熱點新聞產生的場景大多爲非凌晨時段,在滿足準實時計算時效的前提下,通過複用提升了離線計算集羣的綜合利用率。另外,該方案能幫我們釋放大約 2.4T 左右的實時計算內存資源。
(2)降低任務維護成本,提升任務穩定性
Arctic + Spark 水位感知觸發調度的方案可減少 17+ 實時任務的維護成本,減少了 Flink 實時計算任務大狀態所帶來的穩定性問題。通過 Spark 離線調度任務可充分利用離線資源池調整計算並行度,有效提升了應對突發熱點新聞流量洪峰時的健壯性。
(3)提升數據異常時的修復能力,降低數據修復時間開支
通過流批一體的 Arctic 數據湖存儲架構,當數據出現異常需要修正時,可靈活地對異常數據進行修復,降低修正成本;而如果通過實時計算鏈路回溯數據或通過額外的離線流程來修正,則需要重新進行狀態累計或複雜的 ETL 流程。
4
項目未來規劃和展望
當前還有一些場景 Arctic 不能做到較好的支持,傳媒大數據團隊將和 Arctic 團隊繼續對以下場景下的解決方案進行探索和落地:
(1)當前入湖前的 push 明細數據是通過上游多條數據流 join 生成的,也同樣會存在大狀態的問題。而 Arctic 當前只能支持行級的更新能力,如果能落地有主鍵表的部分列更新能力,則可以幫助業務在入湖的時候,以較低的成本直接實現多流 join。
(2)進一步完善 Arctic 表和 Hive 表的水位定義和感知方案,提升時效,並推廣到更多的業務場景中。當前的方案只支持單 Spark/Flink 任務寫入的場景,對於多個任務併發寫表的場景,還需要再完善。
有獎問卷