Go+Redis實現(xiàn)延遲隊列實操_第1頁
Go+Redis實現(xiàn)延遲隊列實操_第2頁
Go+Redis實現(xiàn)延遲隊列實操_第3頁
Go+Redis實現(xiàn)延遲隊列實操_第4頁
Go+Redis實現(xiàn)延遲隊列實操_第5頁
已閱讀5頁,還剩3頁未讀 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領

文檔簡介

第Go+Redis實現(xiàn)延遲隊列實操目錄前言簡單的實現(xiàn)定義消息PushConsume存在的問題多消費者實現(xiàn)定義消息PushConsume存在的問題總結

前言

延遲隊列是一種非常使用的數(shù)據(jù)結構,我們經(jīng)常有需要延遲推送處理消息的場景,比如延遲60秒發(fā)送短信,延遲30分鐘關閉訂單,消息消費失敗延遲重試等等。

一般我們實現(xiàn)延遲消息都需要依賴底層的有序結構,比如堆,而Redis剛好提供了zset這種數(shù)據(jù)類型,它的底層實現(xiàn)是哈希表+跳表,也是一種有序的結構,所以這篇文章主要是使用Go+Redis來實現(xiàn)延遲隊列。

當然Redis本身并不支持延遲隊列,所以我們只是實現(xiàn)一個比較簡單的延遲隊列,而且Redis不太適合大量消息堆積,所以只適合比較簡單的場景,如果需要更加強大穩(wěn)定的消息隊列,可以使用RocketMQ等自帶延遲消息的消息隊列。

我們這里先定一下我們要實現(xiàn)的幾個目標:

消息必須至少被消費一次多個生產(chǎn)者多個消費者

然后我們定義一個簡單的接口:

Push(msg)error:添加消息到隊列Consume(topic,batchSize,func(msg)error):消費消息

簡單的實現(xiàn)

每個主題最多可以被一個消費者消費,因為不會對主題進行分區(qū)但是可以多個生產(chǎn)者同時進行生產(chǎn),因為Push操作是原子的同時需要消費操作返回值error為nil才刪除消息,保證消息至少被消費一次

定義消息

這個消息參考了Kafka的消息結構:

Topic可以是某個隊列的名字Key是消息的唯一標識,在一個隊列里面不可以重復Body是消息的內(nèi)容Delay是消息的延遲時間ReadyTime是消息準備好執(zhí)行的時間

//Msg消息

typeMsgstruct{

Topicstring//消息的主題

Keystring//消息的Key

Body[]byte//消息的Body

Delaytime.Duration//延遲時間(秒)

ReadyTimetime.Time//消息準備好執(zhí)行的時間(now+delay)

}

Push

由于我們需要把消息的Body存儲到Hash,把消息的ReadyTime存儲到ZSet,所以我們需要一個簡單的Lua腳本來保證這兩個操作是原子的。

同時我們不會覆蓋已經(jīng)存在的相同Key的消息。

constdelayQueuePushRedisScript=`

--KEYS[1]:topicZSet

--KEYS[2]:topicHash

--ARGV[1]:消息的Key

--ARGV[2]:消息的Body

--ARGV[3]:消息準備好執(zhí)行的時間

localtopicZSet=KEYS[1]

localtopicHash=KEYS[2]

localkey=ARGV[1]

localbody=ARGV[2]

localreadyTime=tonumber(ARGV[3])

--添加readyTime到zset

localcount=redis.call("zadd",topicZSet,readyTime,key)

--消息已經(jīng)存在

ifcount==0then

return0

--添加body到hash

redis.call("hsetnx",topicHash,key,body)

return1

`

func(q*SimpleRedisDelayQueue)Push(ctxcontext.Context,msg*Msg)error{

//如果設置了ReadyTime,就使用RedisTime

varreadyTimeint64

if!msg.ReadyTime.IsZero(){

readyTime=msg.ReadyTime.Unix()

}else{

//否則使用Delay

readyTime=time.Now().Add(msg.Delay).Unix()

success,err:=q.pushScript.Run(ctx,q.client,[]string{q.topicZSet(msg.Topic),q.topicHash(msg.Topic)},

msg.Key,msg.Body,readyTime).Bool()

iferr!=nil{

returnerr

if!success{

returnErrDuplicateMessage

returnnil

}

Consume

其中第二個參數(shù)batchSize表示用于批量獲取已經(jīng)準備好執(zhí)行的消息,減少網(wǎng)絡請求。

fn是對消息進行處理的函數(shù),它有一個返回值error,如果是nil才表示消息消費成功,然后調(diào)用刪除腳本把成功消費的消息給刪除(需要原子的刪除ZSet和Hash里面的內(nèi)容)。

constdelayQueueDelRedisScript=`

--KEYS[1]:topicZSet

--KEYS[2]:topicHash

--ARGV[1]:消息的Key

localtopicZSet=KEYS[1]

localtopicHash=KEYS[2]

localkey=ARGV[1]

--刪除zset和hash關于這條消息的內(nèi)容

redis.call("zrem",topicZSet,key)

redis.call("hdel",topicHash,key)

return1

`

func(q*SimpleRedisDelayQueue)Consume(topicstring,batchSizeint,fnfunc(msg*Msg)error){

for{

//批量獲取已經(jīng)準備好執(zhí)行的消息

now:=time.Now().Unix()

zs,err:=q.client.ZRangeByScoreWithScores(context.Background(),q.topicZSet(topic),redis.ZRangeBy{

Min:"-inf",

Max:strconv.Itoa(int(now)),

Count:int64(batchSize),

}).Result()

//如果獲取出錯或者獲取不到消息,則休眠一秒

iferr!=nil||len(zs)==0{

time.Sleep(time.Second)

continue

//遍歷每個消息

for_,z:=rangezs{

key:=z.Member.(string)

//獲取消息的body

body,err:=q.client.HGet(context.Background(),q.topicHash(topic),key).Bytes()

iferr!=nil{

continue

//處理消息

err=fn(Msg{

Topic:topic,

Key:key,

Body:body,

ReadyTime:time.Unix(int64(z.Score),0),

iferr!=nil{

continue

//如果消息處理成功,刪除消息

q.delScript.Run(context.Background(),q.client,[]string{q.topicZSet(topic),q.topicHash(topic)},key)

}

存在的問題

如果多個線程同時調(diào)用Consume函數(shù),那么多個線程會拉取相同的可執(zhí)行的消息,造成消息重復的被消費。

多消費者實現(xiàn)

每個主題最多可以被分區(qū)個數(shù)個消費者消費,會對主題進行分區(qū)

我們添加了一個Partition字段表示消息的分區(qū)號

//Msg消息

typeMsgstruct{

Topicstring//消息的主題

Keystring//消息的Key

Body[]byte//消息的Body

Partitionint//分區(qū)號

Delaytime.Duration//延遲時間(秒)

ReadyTimetime.Time//消息準備好執(zhí)行的時間

}

代碼與SimpleRedisDelayQueue的Push相同,只是我們會使用Msg里面的Partition字段對主題進行分區(qū)。

func(q*PartitionRedisDelayQueue)topicZSet(topicstring,partitionint)string{

returnfmt.Sprintf("%s:%d:z",topic,partition)

func(q*PartitionRedisDelayQueue)topicHash(topicstring,partitionint)string{

returnfmt.Sprintf("%s:%d:h",topic,partition)

}

Consume

代碼與SimpleRedisDelayQueue的Consume相同,我們也只是對Consume多加了一個partition參數(shù)用于指定消費的分區(qū)。

func(q*PartitionRedisDelayQueue)Consume(topicstring,batchSize,partitionint,fnfunc(msg*Msg)error){

//...

}

存在的問題

一個比較大的問題就是我們需要手動指定分區(qū)而不是自動分配分區(qū),這個問題對于Push操作解決起來比較容易,可以通過哈希算法對Key進行哈希取模進行分區(qū),比如murmur3。但是對于

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論