Apache Pulsar延遲消息投遞解析,apache log4j2Apache Pulsar延遲消息投遞解析導語Apache Pulsar是一個多租戶、高性能的服務間消息傳輸解決方案,支持多租戶、低延時、讀寫分離、跨地域復制、快速擴容、靈活容錯等特性。騰訊數據平臺部MQ團隊對Pulsar做了深入調研以及大量的性能和......
導語Apache Pulsar是一個多租戶、高性能的服務間消息傳輸解決方案,支持多租戶、低延時、讀寫分離、跨地域復制、快速擴容、靈活容錯等特性。騰訊數據平臺部MQ團隊對Pulsar做了深入調研以及大量的性能和穩定性方面優化,目前已經在騰訊云消息隊列TDMQ落地上線。本文主要介紹Pulsar延遲消息投遞的實現,希望與大家一同交流。
一、什么是延遲消息投遞
延遲消息投遞在MQ應用場景中十分普遍,它是指消息在發國際快遞MQ服務端后并不會立馬投遞,而是根據消息中的屬性延遲固定時間后才投遞給消費者,一般分為定時消息和延遲消息兩種:
·定時消息:Producer將消息發國際快遞MQ服務端,但并不期望這條消息立馬投遞,而是推遲到在當前時間點之后的某一個時間投遞到Consumer進行消費。
·延遲消息:Producer將消息發國際快遞MQ服務端,但并不期望這條消息立馬投遞,而是延遲一定時間后才投遞到Consumer進行消費。
目前在業界,騰訊云的CMQ和阿里云的RocketMQ也都支持延遲消息投遞:
·CMQ:將消息延遲期間定義為”飛行狀態“,可通過設置DelaySeconds配置延遲范圍,取值范圍為03600秒,即消息最長不可見時長為1小時。
·RocketMQ:開源版本延遲消息臨時存儲在一個內部主題中,支持特定的level,例如定時5s,10s,1m等,商業版本支持任意時間精度。
開源的NSQ、RabbitMQ、ActiveMQ和Pulsar也都內置了延遲消息的處理能力。雖然每個MQ項目的使用和實現方式不同,但核心實現思路都一樣:Producer將一個延遲消息發國際快遞某個Topic中,Broker將延遲消息放到臨時存儲進行暫存,延遲跟蹤服務(Delayed Tracker Service)會檢查消息是否到期,將到期的消息進行投遞。
二、延遲消息投遞的使用場景
延遲消息投遞是要暫緩對當前消息的處理,在未來的某個時間點再觸發投遞,實際的應用場景非常多,比如異常檢測重試、訂單超時取消、預約提醒等。
·服務請求異常,需要將異常請求放到單獨的隊列,隔5分鐘后進行重試;
·用戶購買商品,但一直處于未支付狀態,需要定期提醒用戶支付,超時則關閉訂單;
·面試或者會議預約,在面試或者會議開始前半小時,發快遞通知再次提醒;
TDMQ最近就有個使用Pulsar延遲消息的Case:業務要對兩套系統的日志消息進行關聯,其中一套系統由于查詢Hbase可能會超時或失敗,需要將失敗的關聯任務在集群空閑的時候再次調度。
三、如何使用Pulsar延遲消息投遞
Pulsar最早是在2.4.0引入了延遲消息投遞的特性,在Pulsar中使用延遲消息,可以精確指定延遲投遞的時間,有deliverAfter和deliverAt兩種方式。其中deliverAt可以指定具體的時間戳;deliverAfter可以指定在當前多長時間后執行。兩種方式的本質是一樣的,Client會計算出時間戳國際快遞Broker。
1.deliverAfter發快遞
producer.newMessage()
.deliverAfter(long time, TimeUnit unit)
.send();
2.deliverAt發快遞
producer.newMessage()
.deliverAt(long timestamp)
.send();
在Pulsar中,可以支持跨度很大的延時消息,比方說一個月、半年;同時在一個Topic里,既支持延時消息,也支持非延時消息。下圖展示了Pulsar中延遲消息的具體過程:
producer發快遞的m1/m3/m4/m5有不同的延遲時間,m2是不需要延遲投遞的正常消息,consumer消費時會根據不同的延遲時間進行ack。
四、Pulsar延遲消息投遞實現原理
從上面的使用方式可以看出,Pulsar支持的是秒級精度的延遲消息投遞,不同于開源RocketMQ支持固定時間level的延遲。
Pulsar實現延遲消息投遞的方式比較簡單,所有延遲投遞的消息會被Delayed Message Tracker記錄對應的index。index是由timestampLedgerIDEntryID三部分組成,其中LedgerIDEntryID用于定位該消息,timestamp除了記錄需要投遞的時間,還用于delayed index優先級隊列排序。
Delayed Message Tracker在堆外內存維護著一個delayed index優先級隊列,根據延遲時間進行堆排序,延遲時間最短的會放在頭上,時間越長越靠后。consumer在消費時,會先去Delayed Message Tracker檢查,是否有到期需要投遞的消息,如果有到期的消息,則從Tracker中拿出對應的index,找到對應的消息進行消費;如果沒有到期的消息,則直接消費正常的消息。
如果集群出現Broker宕機或者topic的ownership轉移,Pulsar會重建delayed index隊列,來保證延遲投遞的消息能夠正常工作。
五、Pulsar延遲消息投遞面臨的挑戰
從Pulsar的延遲消息投遞實現原理可以看出,該方法簡單高效,對Pulsar內核侵入性較小,可以支持到任意時間的延遲消息。但同時發現,Pulsar的實現方案無法支持大規模使用延遲消息,主要有以下兩個原因:
1.delayed index隊列受到內存限制
一條延遲消息的delayed index由三個long組成,對于小規模的延遲消息來說,內存開銷并不大。但由于index隊列是subscription級別,對于topic的同一個partition來說,有多少個subscription就需要維護多少個index隊列;同時,由于延遲消息越多、延遲的時間越長,index隊列內存占用也會更多。
2.delayed index隊列重建時間開銷
上面有提到,如果集群出現Broker宕機或者topic的ownership轉移,Pulsar會重建delayed index隊列。對于跨度時間長的大規模延遲消息,重建時間可能會到小時級別。為了減小delayed index隊列重建時間,雖然可以給topic分更多的partition提高重建的并發度,但沒有徹底解決重建時間開銷問題。
六、Pulsar延遲消息投遞未來工作
Pulsar目前的延遲消息投遞方案簡單高效,但處理大規模延遲消息時仍然存在風險。關于延遲消息投遞,社區和數據平臺部MQ團隊下一步將聚焦在支持大規模延遲消息。目前討論的方案是在delayed index隊列加入時間分區,Broker只加載當前較近的時間片delayed index到內存,其余時間片分區持久化磁盤,示例圖如下圖所示:
上圖中,我們按5分鐘的間隔對delayed index隊列進行分區,m5和m1放在了time partition 1,由于延遲時間最近,放在了內存;m4和m3在time partition 2,延遲時間比較靠后,index存儲在了磁盤。該方案不僅可以減少delayed index隊列重建時間開銷,還可以降低對內存的依賴。
結語
本文為大家介紹了延遲消息投遞的相關概念和使用場景,并詳細拓展了Apache Pulsar的實現原理。Pulsar目前方案簡單高效,支持秒級精度的延遲消息投遞,但在處理大規模延遲消息時還有一些局限。
目前騰訊云消息隊列TDMQ上已上線了對Pulsar延遲消息投遞的支持,Pulsar社區和數據平臺部MQ團隊下一步也將聚焦在支持大規模延遲消息上。
特別聲明:以上文章內容僅代表作者本人觀點,不代表ESG跨境電商觀點或立場。如有關于作品內容、版權或其它問題請于作品發表后的30日內與ESG跨境電商聯系。
二維碼加載中...
使用微信掃一掃登錄
使用賬號密碼登錄
平臺顧問
微信掃一掃
馬上聯系在線顧問
小程序
ESG跨境小程序
手機入駐更便捷
返回頂部