最初,BIGO 的消息流平臺主要采用開源 Kafka 作為數(shù)據(jù)支撐。隨著數(shù)據(jù)規(guī)模日益增長,產(chǎn)品不斷迭代,BIGO 消息流平臺承載的數(shù)據(jù)規(guī)模出現(xiàn)了成倍增長,下游的在線模型訓(xùn)練、在線推薦、實時數(shù)據(jù)分析、實時數(shù)倉等業(yè)務(wù)對消息流平臺的實時性和穩(wěn)定性提出了更高的要求。開源的 Kafka 集群難以支撐海量數(shù)據(jù)處理場景,我們需要投入更多的人力去維護(hù)多個 Kafka 集群,這樣成本會越來越高,主要體現(xiàn)在以下幾個方面:
1、數(shù)據(jù)存儲和消息隊列服務(wù)綁定,集群擴(kuò)縮容/分區(qū)均衡需要大量拷貝數(shù)據(jù),造成集群性能下降。
2、當(dāng)分區(qū)副本不處于 ISR(同步)狀態(tài)時,一旦有 broker 發(fā)生故障,可能會造成數(shù)據(jù)丟失或該分區(qū)無法提供讀寫服務(wù)。
3、當(dāng) Kafka broker 磁盤故障/空間占用率過高時,需要進(jìn)行人工干預(yù)。
4、集群跨區(qū)域同步使用 KMM(Kafka Mirror Maker),性能和穩(wěn)定性難以達(dá)到預(yù)期。
5、在 catch-up 讀場景下,容易出現(xiàn) PageCache 污染,造成讀寫性能下降。
6、Kafka broker 上存儲的 topic 分區(qū)數(shù)量有限,分區(qū)數(shù)越多,磁盤讀寫順序性越差,讀寫性能越低。
7、Kafka 集群規(guī)模增長導(dǎo)致運維成本急劇增長,需要投入大量的人力進(jìn)行日常運維;在 BIGO,擴(kuò)容一臺機(jī)器到 Kafka 集群并進(jìn)行分區(qū)均衡,需要 0.5 人/天;縮容一臺機(jī)器需要 1 人/天。
如果繼續(xù)使用 Kafka,成本會不斷上升:擴(kuò)縮容機(jī)器、增加運維人力。同時,隨著業(yè)務(wù)規(guī)模增長,我們對消息系統(tǒng)有了更高的要求:系統(tǒng)要更穩(wěn)定可靠、便于水平擴(kuò)展、延遲低。為了提高消息隊列的實時性、穩(wěn)定性和可靠性,降低運維成本,我們開始考慮是否要基于開源 Kafka 做本地化二次開發(fā),或者看看社區(qū)中有沒有更好的解決方案,來解決我們在維護(hù) Kafka 集群時遇到的問題。
2019 年 11 月,我們開始調(diào)研消息隊列,對比當(dāng)前主流消息流平臺的優(yōu)缺點,并跟我們的需求對接。在調(diào)研過程中,我們發(fā)現(xiàn) Apache Pulsar 是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數(shù)式計算為一體。Pulsar 能夠無縫擴(kuò)容、延遲低、吞吐高,支持多租戶和跨地域復(fù)制。最重要的是,Pulsar 存儲、計算分離的架構(gòu)能夠完美解決 Kafka 擴(kuò)縮容的問題。Pulsar producer 把消息發(fā)送給 broker,broker 通過 bookie client 寫到第二層的存儲 BookKeeper 上。
Pulsar 采用存儲、計算分離的分層架構(gòu)設(shè)計,支持多租戶、持久化存儲、多機(jī)房跨區(qū)域數(shù)據(jù)復(fù)制,具有強(qiáng)一致性、高吞吐以及低延時的高可擴(kuò)展流數(shù)據(jù)存儲特性。
1、水平擴(kuò)容:能夠無縫擴(kuò)容到成百上千個節(jié)點。
2、高吞吐:已經(jīng)在 Yahoo! 的生產(chǎn)環(huán)境中經(jīng)受了考驗,支持每秒數(shù)百萬條消息的發(fā)布-訂閱(Pub-Sub)。
3、低延遲:在大規(guī)模的消息量下依然能夠保持低延遲(小于 5 ms)。
4、持久化機(jī)制:Pulsar 的持久化機(jī)制構(gòu)建在 Apache BookKeeper 上,實現(xiàn)了讀寫分離。
5、讀寫分離:BookKeeper 的讀寫分離 IO 模型極大發(fā)揮了磁盤順序?qū)懶阅?,對機(jī)械硬盤相對比較友好,單臺 bookie 節(jié)點支撐的 topic 數(shù)不受限制。
為了進(jìn)一步加深對 Apache Pulsar 的理解,衡量 Pulsar 能否真正滿足我們生產(chǎn)環(huán)境大規(guī)模消息 Pub-Sub 的需求,我們從 2019 年 12 月開始進(jìn)行了一系列壓測工作。由于我們使用的是機(jī)械硬盤,沒有 SSD,在壓測過程中遇到了一些性能問題,在 StreamNative 的協(xié)助下,我們分別和 進(jìn)行了一系列的調(diào)優(yōu),Pulsar 的吞吐和穩(wěn)定性均有所提高。
經(jīng)過 3~4 個月的壓測和調(diào)優(yōu),我們認(rèn)為 Pulsar 完全能夠解決我們使用 Kafka 時遇到的各種問題,并于 2020 年 4 月在測試環(huán)境上線 Pulsar。
2020 年 5 月,我們正式在生產(chǎn)環(huán)境中使用 Pulsar 集群。Pulsar 在 BIGO 的場景主要是 Pub-Sub 的經(jīng)典生產(chǎn)消費模式,前端有 Baina 服務(wù)(用 C++ 實現(xiàn)的數(shù)據(jù)接收服務(wù)),Kafka 的 Mirror Maker 和 Flink,以及其他語言如 Java、Python、C++ 等客戶端的 producer 向 topic 寫入數(shù)據(jù)。后端由 Flink 和 Flink SQL,以及其他語言的客戶端的 consumer 消費數(shù)據(jù)。
在下游,我們對接的業(yè)務(wù)場景有實時數(shù)倉、實時 ETL(Extract-Transform-Load,將數(shù)據(jù)從來源端經(jīng)過抽?。╡xtract)、轉(zhuǎn)換(transform)、加載(load)至目的端的過程)、實時數(shù)據(jù)分析和實時推薦。大部分業(yè)務(wù)場景使用 Flink 消費 Pulsar topic 中的數(shù)據(jù),并進(jìn)行業(yè)務(wù)邏輯處理;其他業(yè)務(wù)場景消費使用的客戶端語言主要分布在 C++、Go、Python 等。數(shù)據(jù)經(jīng)過各自業(yè)務(wù)邏輯處理后,最終會寫入 Hive、Pulsar topic 以及 ClickHouse、HDFS、Redis 等第三方存儲服務(wù)。
在 BIGO,我們借助 Flink 和 Pulsar 打造了實時流平臺。在介紹這個平臺之前,我們先了解下 Pulsar Flink Connector 的內(nèi)部運行機(jī)理。在 Pulsar Flink Source/Sink API 中,上游有一個 Pulsar topic,中間是 Flink job,下游有一個 Pulsar topic。我們怎么消費這個 topic,又怎樣處理數(shù)據(jù)并寫入 Pulsar topic 呢?
按照上圖左側(cè)代碼示例,初始化一個 StreamExecutionEnvironment,進(jìn)行相關(guān)配置,比如修改 property、topic 值。然后創(chuàng)建一個 FlinkPulsarSource 對象,這個 Source 里面填上 serviceUrl(brokerlist)、adminUrl(admin 地址)以及 topic 數(shù)據(jù)的序列化方式,最終會把 property 傳進(jìn)去,這樣就能夠讀取 Pulsar topic 中的數(shù)據(jù)。Sink 的使用方法非常簡單,首先創(chuàng)建一個 FlinkPulsarSink,Sink 里面指定 target topic,再指定 TopicKeyExtractor 作為 key,并調(diào)用 addsink,把數(shù)據(jù)寫入 Sink。這個生產(chǎn)消費模型很簡單,和 Kafka 很像。
Pulsar topic 和 Flink 的消費如何聯(lián)動呢?如下圖所示,新建 FlinkPulsarSource 時,會為 topic 的每一個分區(qū)新創(chuàng)建一個 reader 對象。要注意的是 Pulsar Flink Connector 底層使用 reader API 消費,會先創(chuàng)建一個 reader,這個 reader 使用 Pulsar Non-Durable Cursor。Reader 消費的特點是讀取一條數(shù)據(jù)后馬上提交(commit),所以在監(jiān)控上可能會看到 reader 對應(yīng)的 subscription 沒有 backlog 信息。
在 Pulsar 2.4.2 版本中,由 Non-Durable Cursor 訂閱的 topic,在接收到 producer 寫入的數(shù)據(jù)時,不會將數(shù)據(jù)保存在 broker 的 cache 中,導(dǎo)致大量數(shù)據(jù)讀取請求落到 BookKeeper 中,降低數(shù)據(jù)讀取效率。BIGO 在 Pulsar 2.5.1 版本中修正了這個問題。
Reader 訂閱 Pulsar topic 后,消費 Pulsar topic 中的數(shù)據(jù),F(xiàn)link 如何保證 exactly-once 呢?Pulsar Flink Connector 使用另外一個獨立的 subscription,這個 subscription 使用的是 Durable Cursor。當(dāng) Flink 觸發(fā) checkpoint,Pulsar Flink Connector 會把 reader 的狀態(tài)(包括每個 Pulsar Topic Partition 的消費位置) checkpoint 到文件、內(nèi)存或 RocksDB 中,當(dāng) checkpoint 完成后,會發(fā)布一次 Notify Checkpoint Complete 通知。Pulsar Flink Connector 收到 checkpoint 完成通知后,把當(dāng)前所有 reader 的消費 Offset,即 message id 以獨立的 SubscriptionName 提交給 Pulsar broker,此時才會把消費 Offset 信息真正記錄下來。
Offset Commit 完成后,Pulsar broker 會將 Offset 信息(在 Pulsar 中以 Cursor 表示)存儲到底層的分布式存儲系統(tǒng) BookKeeper 中,這樣做的好處是當(dāng) Flink 任務(wù)重啟后,會有兩層恢復(fù)保障。第一種情況是從 checkpoint 恢復(fù):可以直接從 checkpoint 里獲得上一次消費的 message id,通過這個 message id 獲取數(shù)據(jù),這個數(shù)據(jù)流就能繼續(xù)消費。如果沒有從 checkpoint 恢復(fù),F(xiàn)link 任務(wù)重啟后,會根據(jù) SubscriptionName 從 Pulsar 中獲取上一次 Commit 對應(yīng)的 Offset 位置開始消費。這樣就能有效防止 checkpoint 損壞導(dǎo)致整個 Flink 任務(wù)無法成功啟動的問題。
Checkpoint 流程如下圖所示。
先做 checkpoint N,完成后發(fā)布一次 notify Checkpoint Complete,等待一定時間間隔后,接下來做 checkpoint N+1,完成后也會進(jìn)行一次 notify Checkpoint Complete 操作,此時把 Durable Cursor 進(jìn)行一次 Commit,最終 Commit 到 Pulsar topic 的服務(wù)端上,這樣能確保 checkpoint 的 exactly-once,也能根據(jù)自己設(shè)定的 subscription 保證 message “keep alive”。
Topic/Partition Discovery 要解決什么問題呢?當(dāng) Flink 任務(wù)消費 topic 時,如果 Topic 增加分區(qū),F(xiàn)link 任務(wù)需要能夠自動發(fā)現(xiàn)分區(qū)。Pulsar Flink Connector 如何實現(xiàn)這一點呢?訂閱 topic 分區(qū)的 reader 之間相互獨立,每個 task manager 包含多個 reader thread,根據(jù)哈希函數(shù)把單個 task manager 中包含的 topic 分區(qū)映射過來,topic 中新增分區(qū)時,新加入的分區(qū)會映射到某個 task manager 上,task manager 發(fā)現(xiàn)新增分區(qū)后,會創(chuàng)建一個 reader,消費掉新數(shù)據(jù)。用戶可以通過設(shè)置 `partition.discovery.interval-millis` 參數(shù),調(diào)配檢測頻率。
為了降低 Flink 消費 Pulsar topic 的門檻,讓 Pulsar Flink Connector 支持更加豐富的 Flink 新特性,BIGO 消息隊列團(tuán)隊為 Pulsar Flink Connector 增加了 Pulsar Flink SQL DDL(Data Definition Language,數(shù)據(jù)定義語言) 和 Flink 1.11 支持。此前官方提供的 Pulsar Flink SQL 只支持 Catalog,要想通過 DDL 形式消費、處理 Pulsar topic 中的數(shù)據(jù)不太方便。在 BIGO 場景中,大部分 topic 數(shù)據(jù)都以 JSON 格式存儲,而 JSON 的 schema 沒有提前注冊,所以只能在 Flink SQL 中指定 topic 的 DDL 后才可以消費。針對這種場景,BIGO 基于 Pulsar Flink Connector 做了二次開發(fā),提供了通過 Pulsar Flink SQL DDL 形式消費、解析、處理 Pulsar topic 數(shù)據(jù)的代碼框架(如下圖所示)。
左邊的代碼中,第一步是配置 Pulsar topic 的消費,首先指定 topic 的 DDL 形式,比如 rip、rtime、uid 等,下面是消費 Pulsar topic 的基礎(chǔ)配置,比如 topic 名稱、service-url、admin-url 等。底層 reader 讀到消息后,會根據(jù) DDL 解出消息,將數(shù)據(jù)存儲在 test_flink_sql 表中。第二步是常規(guī)邏輯處理(如對表進(jìn)行字段抽取、做 join 等),得出相關(guān)統(tǒng)計信息或其他相關(guān)結(jié)果后,返回這些結(jié)果,寫到 HDFS 或其他系統(tǒng)上等。第三步,提取相應(yīng)字段,將其插入一張 hive 表。由于 Flink 1.11 對 hive 的寫入支持比 1.9.1 更加優(yōu)秀,所以 BIGO 又做了一次 API 兼容和版本升級,使 Pulsar Flink Connector 支持 Flink 1.11。
BIGO 基于 Pulsar 和 Flink 構(gòu)建的實時流平臺主要用于實時 ETL 處理場景和 AB-test 場景。
實時 ETL 處理場景主要運用 Pulsar Flink Source 及 Pulsar Flink Sink。這個場景中,Pulsar topic 實現(xiàn)幾百甚至上千個 topic,每個 topic 都有獨立的 schema。我們需要對成百上千個 topic 進(jìn)行常規(guī)處理,如字段轉(zhuǎn)換、容錯處理、寫入 HDFS 等。每個 topic 都對應(yīng) HDFS 上的一張表,成百上千個 topic 會在 HDFS 上映射成百上千張表,每張表的字段都不一樣,這就是我們遇到的實時 ETL 場景。
這種場景的難點在于 topic 數(shù)量多。如果每個 topic 維護(hù)一個 Flink 任務(wù),維護(hù)成本太高。之前我們想通過 HDFS Sink Connector 把 Pulsar topic 中的數(shù)據(jù)直接 sink 到 HDFS 上,但處理里面的邏輯卻很麻煩。最終我們決定使用一個或多個 Flink 任務(wù)去消費成百上千個 topic,每個 topic 配自己的 schema,直接用 reader 來訂閱所有 topic,進(jìn)行 schema 解析后處理,將處理后的數(shù)據(jù)寫到 HDFS 上。
隨著程序運行,我們發(fā)現(xiàn)這種方案也存在問題:算子之間壓力不均衡。因為有些 topic 流量大,有些流量小,如果完全通過隨機(jī)哈希的方式映射到對應(yīng)的 task manager 上去,有些 task manager 處理的流量會很高,而有些 task manager 處理的流量很低,導(dǎo)致有些 task 機(jī)器上積塞非常嚴(yán)重,拖慢 Flink 流的處理。所以我們引入了 slot group 概念,根據(jù)每個 topic 的流量情況進(jìn)行分組,流量會映射到 topic 的分區(qū)數(shù),在創(chuàng)建 topic 分區(qū)時也以流量為依據(jù),如果流量很高,就多為 topic 創(chuàng)建分區(qū),反之少一些。分組時,把流量小的 topic 分到一個 group 中,把流量大的 topic 單獨放在一個 group 中,很好地隔離了資源,保證 task manager 總體上流量均衡。
實時數(shù)倉需要提供小時表或天表為數(shù)據(jù)分析師及推薦算法工程師提供數(shù)據(jù)查詢服務(wù),簡單來講就是 app 應(yīng)用中會有很多打點,各種類型的打點會上報到服務(wù)端。如果直接暴露原始打點給業(yè)務(wù)方,不同的業(yè)務(wù)使用方就需要訪問各種不同的原始表從不同維度進(jìn)行數(shù)據(jù)抽取,并在表之間進(jìn)行關(guān)聯(lián)計算。頻繁對底層基礎(chǔ)表進(jìn)行數(shù)據(jù)抽取和關(guān)聯(lián)操作會嚴(yán)重浪費計算資源,所以我們提前從基礎(chǔ)表中抽取用戶關(guān)心的維度,將多個打點合并在一起,構(gòu)成一張或多張寬表,覆蓋上面推薦相關(guān)的或數(shù)據(jù)分析相關(guān)的 80% ~ 90% 場景任務(wù)。
在實時數(shù)倉場景下還需實時中間表,我們的解決方案是,針對 topic A 到 topic K ,我們使用 Pulsar Flink SQL 將消費到的數(shù)據(jù)解析成相應(yīng)的表。通常情況下,將多張表聚合成一張表的常用做法是使用 join,如把表 A 到 K 按照 uid 進(jìn)行 join 操作,形成非常寬的寬表;但在 Flink SQL 中 join 多張寬表效率較低。所以 BIGO 使用 union 來替代 join,做成很寬的視圖,以小時為單位返回視圖,寫入 ClickHouse,提供給下游的業(yè)務(wù)方實時查詢。使用 union 來替代 join 加速表的聚合,能夠把小時級別的中間表產(chǎn)出控制在分鐘級別。
輸出天表可能還需要 join 存放在 hive 上的表或其他存儲介質(zhì)上的離線表,即流表和離線表之間 join 的問題。如果直接 join,checkpoint 中需要存儲的中間狀態(tài)會比較大,所以我們在另外一個維度上做了優(yōu)化。
左側(cè)部分類似于小時表,每個 topic 使用 Pulsar Flink SQL 消費并轉(zhuǎn)換成對應(yīng)的表,表之間進(jìn)行 union 操作,將 union 得到的表以天為單位輸入到 HBase(此處引入 HBase 是為了做替代它的 join)。
右側(cè)需要 join 離線數(shù)據(jù),使用 Spark 聚合離線的 Hive 表(如表 a1、a2、a3),聚合后的數(shù)據(jù)會通過精心設(shè)計的 row-key 寫入 HBase 中。數(shù)據(jù)聚合后狀態(tài)如下:假設(shè)左邊數(shù)據(jù)的 key 填了寬表的前 80 列,后面 Spark 任務(wù)算出的數(shù)據(jù)對應(yīng)同樣一個 key,填上寬表的后 20 列,在 HBase 中組成一張很大的寬表,把最終數(shù)據(jù)再次從 HBase 抽出,寫入 ClickHouse,供上層用戶查詢,這就是 AB-test 的主體架構(gòu)。
從 2020 年 5 月上線至今,Pulsar 運行穩(wěn)定,日均處理消息數(shù)百億,字節(jié)入流量為 2~3 GB/s。Apache Pulsar 提供的高吞吐、低延遲、高可靠性等特性極大提高了 BIGO 消息處理能力,降低了消息隊列運維成本,節(jié)約了近 50% 的硬件成本。目前,我們在幾十臺物理主機(jī)上部署了上百個 Pulsar broker 和 bookie 進(jìn)程,采用 bookie 和 broker 在同一個節(jié)點的混部模式,已經(jīng)把 ETL 從 Kafka 遷移到 Pulsar,并逐步將生產(chǎn)環(huán)境中消費 Kafka 集群的業(yè)務(wù)(比如 Flink、Flink SQL、ClickHouse 等)遷移到 Pulsar 上。隨著更多業(yè)務(wù)的遷移,Pulsar 上的流量會持續(xù)上漲。
我們的 ETL 任務(wù)有一萬多個 topic,每個 topic 平均有 3 個分區(qū),使用 3 副本的存儲策略。之前使用 Kafka,隨著分區(qū)數(shù)增加,磁盤由順序讀寫逐漸退化為隨機(jī)讀寫,讀寫性能退化嚴(yán)重。Apache Pulsar 的存儲分層設(shè)計能夠輕松支持百萬 topic,為我們的 ETL 場景提供了優(yōu)雅支持。
BIGO 在 Pulsar broker 負(fù)載均衡、broker cache 命中率優(yōu)化、broker 相關(guān)監(jiān)控、BookKeeper 讀寫性能優(yōu)、BookKeeper 磁盤 IO 性能優(yōu)化、Pulsar 與 Flink、Pulsar 與 Flink SQL 結(jié)合等方面做了大量工作,提升了 Pulsar 的穩(wěn)定性和吞吐,也降低了 Flink 與 Pulsar 結(jié)合的門檻,為 Pulsar 的推廣打下了堅實基礎(chǔ)。
未來,我們會增加 Pulsar 在 BIGO 的場景應(yīng)用,幫助社區(qū)進(jìn)一步優(yōu)化、完善 Pulsar 功能,具體如下:
1、為 Apache Pulsar 研發(fā)新特性,比如支持 topic policy 相關(guān)特性。
2、遷移更多任務(wù)到 Pulsar。這項工作涉及兩方面,一是遷移之前使用 Kafka 的任務(wù)到 Pulsar。二是新業(yè)務(wù)直接接入 Pulsar。
3、BIGO 準(zhǔn)備使用 KoP 來保證數(shù)據(jù)遷移平滑過渡。因為 BIGO 有大量消費 Kafka 集群的 Flink 任務(wù),我們希望能夠直接在 Pulsar 中做一層 KoP,簡化遷移流程。
4、對 Pulsar 及 BookKeeper 持續(xù)進(jìn)行性能優(yōu)化。由于生產(chǎn)環(huán)境中流量較高,BIGO 對系統(tǒng)的可靠性和穩(wěn)定性要求較高。
5、持續(xù)優(yōu)化 BookKeeper 的 IO 協(xié)議棧。Pulsar 的底層存儲本身是 IO 密集型系統(tǒng),保證底層 IO 高吞吐,才能夠提升上層吞吐,保證性能穩(wěn)定。