




版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
第PHP+RabbitMQ實(shí)現(xiàn)消息隊(duì)列的完整代碼為什么使用RabbitMq而不是ActiveMq或者RocketMq?
首先,從業(yè)務(wù)上來(lái)講,我并不要求消息的100%接受率,并且,我需要結(jié)合php開發(fā),RabbitMq相較RocketMq,延遲較低(微妙級(jí))。至于ActiveMq,貌似問(wèn)題較多。RabbitMq對(duì)各種語(yǔ)言的支持較好,所以選擇RabbitMq。
先安裝PHP對(duì)應(yīng)的RabbitMQ,這里用的是php_amqp不同的擴(kuò)展實(shí)現(xiàn)方式會(huì)有細(xì)微的差異.
php擴(kuò)展地址:/package/amqp
具體以官網(wǎng)為準(zhǔn)/getstarted.html
介紹
config.php配置信息
BaseMQ.phpMQ基類
ProductMQ.php生產(chǎn)者類
ConsumerMQ.php消費(fèi)者類
Consumer2MQ.php消費(fèi)者2(可有多個(gè))
config.php
return[
//配置
'host'=[
'host'='',
'port'='5672',
'login'='guest',
'password'='guest',
'vhost'='/',
//交換機(jī)
'exchange'='word',
//路由
'routes'=[],
];
BaseMQ.php
*CreatedbyPhpStorm.
*User:pc
*Date:2025/12/13
*Time:14:11
namespaceMyObjSummary\rabbitMQ;
/**Member
*AMQPChannel
*AMQPConnection
*AMQPEnvelope
*AMQPExchange
*AMQPQueue
*ClassBaseMQ
*@packageMyObjSummary\rabbitMQ
classBaseMQ
/**MQChannel
*@var\AMQPChannel
public$AMQPChannel;
/**MQLink
*@var\AMQPConnection
public$AMQPConnection;
/**MQEnvelope
*@var\AMQPEnvelope
public$AMQPEnvelope;
/**MQExchange
*@var\AMQPExchange
public$AMQPExchange;
/**MQQueue
*@var\AMQPQueue
public$AMQPQueue;
/**conf
*@var
public$conf;
/**exchange
*@var
public$exchange;
/**link
*BaseMQconstructor.
*@throws\AMQPConnectionException
publicfunction__construct()
$conf=require'config.php';
if(!$conf)
thrownew\AMQPConnectionException('configerror!');
$this-conf=$conf['host'];
$this-exchange=$conf['exchange'];
$this-AMQPConnection=new\AMQPConnection($this-conf);
if(!$this-AMQPConnection-connect())
thrownew\AMQPConnectionException("Cannotconnecttothebroker!\n");
*closelink
publicfunctionclose()
$this-AMQPConnection-disconnect();
/**Channel
*@return\AMQPChannel
*@throws\AMQPConnectionException
publicfunctionchannel()
if(!$this-AMQPChannel){
$this-AMQPChannel=new\AMQPChannel($this-AMQPConnection);
return$this-AMQPChannel;
/**Exchange
*@return\AMQPExchange
*@throws\AMQPConnectionException
*@throws\AMQPExchangeException
publicfunctionexchange()
if(!$this-AMQPExchange){
$this-AMQPExchange=new\AMQPExchange($this-channel());
$this-AMQPExchange-setName($this-exchange);
return$this-AMQPExchange;
/**queue
*@return\AMQPQueue
*@throws\AMQPConnectionException
*@throws\AMQPQueueException
publicfunctionqueue()
if(!$this-AMQPQueue){
$this-AMQPQueue=new\AMQPQueue($this-channel());
return$this-AMQPQueue;
/**Envelope
*@return\AMQPEnvelope
publicfunctionenvelope()
if(!$this-AMQPEnvelope){
$this-AMQPEnvelope=new\AMQPEnvelope();
return$this-AMQPEnvelope;
}
ProductMQ.php
//生產(chǎn)者P
namespaceMyObjSummary\rabbitMQ;
require'BaseMQ.php';
classProductMQextendsBaseMQ
private$routes=['hello','word'];//路由key
*ProductMQconstructor.
*@throws\AMQPConnectionException
publicfunction__construct()
parent::__construct();
/**只控制發(fā)送成功不接受消費(fèi)者是否收到
*@throws\AMQPChannelException
*@throws\AMQPConnectionException
*@throws\AMQPExchangeException
publicfunctionrun()
//頻道
$channel=$this-channel();
//創(chuàng)建交換機(jī)對(duì)象
$ex=$this-exchange();
//消息內(nèi)容
$message='productmessage'.rand(1,99999);
//開始事務(wù)
$channel-startTransaction();
$sendEd=true;
foreach($this-routesas$route){
$sendEd=$ex-publish($message,$route);
echo"SendMessage:".$sendEd."\n";
if(!$sendEd){
$channel-rollbackTransaction();
$channel-commitTransaction();//提交事務(wù)
$this-close();
die;
try{
(newProductMQ())-run();
}catch(\Exception$exception){
var_dump($exception-getMessage());
}
ConsumerMQ.php
//消費(fèi)者C
namespaceMyObjSummary\rabbitMQ;
require'BaseMQ.php';
classConsumerMQextendsBaseMQ
private$q_name='hello';//隊(duì)列名
private$route='hello';//路由key
*ConsumerMQconstructor.
*@throws\AMQPConnectionException
publicfunction__construct()
parent::__construct();
/**接受消息如果終止重連時(shí)會(huì)有消息
*@throws\AMQPChannelException
*@throws\AMQPConnectionException
*@throws\AMQPExchangeException
*@throws\AMQPQueueException
publicfunctionrun()
//創(chuàng)建交換機(jī)
$ex=$this-exchange();
$ex-setType(AMQP_EX_TYPE_DIRECT);//direct類型
$ex-setFlags(AMQP_DURABLE);//持久化
//echo"ExchangeStatus:".$ex-declare()."\n";
//創(chuàng)建隊(duì)列
$q=$this-queue();
//var_dump($q-declare());exit();
$q-setName($this-q_name);
$q-setFlags(AMQP_DURABLE);//持久化
//echo"MessageTotal:".$q-declareQueue()."\n";
//綁定交換機(jī)與隊(duì)列,并指定路由鍵
echo'QueueBind:'.$q-bind($this-exchange,$this-route)."\n";
//阻塞模式接收消息
echo"Message:\n";
while(True){
$q-consume(function($envelope,$queue){
$msg=$envelope-getBody();
echo$msg."\n";//處理消息
$queue-ack($envelope-getDeliveryTag());//
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 物流企業(yè)智能分揀設(shè)備數(shù)據(jù)庫(kù)租賃與數(shù)據(jù)安全評(píng)估協(xié)議
- 互聯(lián)網(wǎng)廣告聯(lián)盟精準(zhǔn)投放服務(wù)合同
- 繼子女撫養(yǎng)權(quán)解除與監(jiān)護(hù)責(zé)任轉(zhuǎn)移協(xié)議
- 汽車美容連鎖品牌合作經(jīng)營(yíng)普通合伙合同樣本
- 農(nóng)村土地流轉(zhuǎn)與農(nóng)業(yè)產(chǎn)業(yè)升級(jí)合作協(xié)議
- 《初中數(shù)學(xué)公式復(fù)習(xí)課件完善版》
- 內(nèi)部人員溝通培訓(xùn)流程圖
- 《幼兒園中班家長(zhǎng)會(huì)課件》
- 《眉山市房地產(chǎn)市場(chǎng)分析》課件
- 聚氯乙烯生產(chǎn)工藝流程
- 2023年第37屆中國(guó)化學(xué)奧林匹克競(jìng)賽(江蘇賽區(qū))初賽真題(學(xué)生版+解析版)
- 動(dòng)物實(shí)驗(yàn)生物安全
- 埃里克森的人格發(fā)展八階段
- 霧都孤兒讀書報(bào)告
- 職業(yè)生涯規(guī)劃家庭影響因素
- 2024年江蘇交通文化傳媒有限公司招聘筆試參考題庫(kù)含答案解析
- 安心護(hù)行 從個(gè)案分析看創(chuàng)傷骨科患者VTE管理低分子肝素合理應(yīng)用版本
- 實(shí)驗(yàn) 驗(yàn)證牛頓第二定律
- 鉆孔水文地質(zhì)工程地質(zhì)綜合編錄一覽表模板
- 備用柴油發(fā)電機(jī)定期啟動(dòng)試驗(yàn)記錄表
- 國(guó)企食堂運(yùn)作方案
評(píng)論
0/150
提交評(píng)論