300字范文 > 跨数据库分布式实时事务 - 基于RabbitMQ实时消息队列服务实现

时间:2023-07-13 15:36:52


如果有任何问题,或者更完善的方案 请加我创建的群 568752806 来一起探讨吧



处理器:Intel(R) Core(TM) i5-4750 CPU @ 3.2GHz 4核处理器

内存: 共32G 金士顿HyperX 骇客神条 DDR3 1866 8G x 4

固态硬盘:GLOWAY STK512GS3-S7 实际容量 476G

主板:Z97-PRO Wi-Fi ac

网卡:Broadcom 802.11ac

Intel(R) Ethernet Connection(2) I218-V

操作系统:Windows 10 64位教育版系统


VMware Workstation 12 Pro 版本 12.5.7 build-5813279

创建4台虚拟Cent OS 7 系统

主机名称 IP 内存 磁盘大小 说明 10G 100G PHP for 开发者 4G 80G Mariadb for db1 4G 80G Mariadb for db2 6G 80G RabbitMQ for 消息存储

在主机 上配置 IP到自定义域名之间的映射关系

[root@contoso ~]#cat > /etc/hosts localhost localhost.localdomain localhost4 localhost4.localdomain4

::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 user. contoso

优化内核 配置

[root@contoso ~]#cat >> /etc/security/limits.conf

* soft nofile 65536

* hard nofile 65536

[root@contoso ~]#ulimit -n 65536

[root@contoso ~]#cat > /etc/sysctl.conf

# sysctl settings are defined through files in

# /usr/lib/sysctl.d/, /run/sysctl.d/, and /etc/sysctl.d/.


# Vendors settings live in /usr/lib/sysctl.d/.

# To override a whole file, create a new file with the same in

# /etc/sysctl.d/ and put new settings there. To override

# only specific settings, add a file with a lexically later

# name in /etc/sysctl.d/ and put new settings there.


# For more information, see sysctl.conf(5) and sysctl.d(5).

net.ipv4.tcp_syn_retries = 1

net.ipv4.tcp_synack_retries = 1

net.ipv4.tcp_keepalive_time = 600

net.ipv4.tcp_keepalive_probes = 3

net.ipv4.tcp_keepalive_intvl = 15

net.ipv4.tcp_retries2 = 5

net.ipv4.tcp_fin_timeout = 2

net.ipv4.tcp_max_tw_buckets = 36000

net.ipv4.tcp_tw_recycle = 1

net.ipv4.tcp_tw_reuse = 1

net.ipv4.tcp_max_orphans = 32768

net.ipv4.tcp_syncookies = 1

net.ipv4.tcp_max_syn_backlog = 16384

net.ipv4.tcp_wmem = 8192 131072 16777216

net.ipv4.tcp_rmem = 32768 131072 16777216

net.ipv4.tcp_mem = 786432 1048576 1572864

net.ipv4.ip_local_port_range = 1024 65000

[root@contoso ~]#sysctl -p


[myth@contoso ~]$yum install /pub/epel/epel-release-latest-7.noarch.rpm

[myth@contoso ~]$yum install /enterprise/remi-release-7.rpm

[myth@contoso ~]$yum --enablerepo=remi-php71,remi,epel -y install php php-devel php-mysql php-fpm php-pecl-xdebug php-gd php-intl php-freetype php-mcrypt php-mbstring php-pecl-memcached php-pecl-redis php-pecl-swoole


[myth@contoso ~]$cat > /etc/php.d/15-xdebug.ini

; Enable xdebug extension module







; see /docs/all_settings


[root@contoso ~]#ll /etc/httpd/conf.modules.d

total 32

-rw-r--r-- 1 root root 3739 Apr 12 21:50 00-base.conf

-rw-r--r-- 1 root root 139 Apr 12 21:50 00-dav.conf

-rw-r--r-- 1 root root 41 Apr 12 21:50 00-lua.conf

-rw-r--r-- 1 root root 950 Aug 30 03:43 00-mpm.conf

-rw-r--r-- 1 root root 957 Apr 12 21:50 00-proxy.conf

-rw-r--r-- 1 root root 88 Apr 12 21:50 00-systemd.conf

-rw-r--r-- 1 root root 451 Apr 12 21:50 01-cgi.conf

-rw-r--r-- 1 root root 423 Aug 2 18:21 15-php.conf

[root@contoso ~]#cat > /etc/httpd/conf.modules.d/00-mpm.conf

# Select the MPM module which should be used by uncommenting exactly

# one of the following LoadModule lines:

# prefork MPM: Implements a non-threaded, pre-forking web server

# See: /docs/2.4/mod/prefork.html

LoadModule mpm_prefork_module modules/mod_mpm_prefork.so

<IfModule mpm_prefork_module>

StartServers 100

MinSpareServers 100

MaxSpareServers 120

ServerLimit 3000

MaxClients 3000

MaxRequestsPerChild 500000


# worker MPM: Multi-Processing Module implementing a hybrid

# multi-threaded multi-process web server

# See: /docs/2.4/mod/worker.html


#LoadModule mpm_worker_module modules/mod_mpm_worker.so

# event MPM: A variant of the worker MPM with the goal of consuming

# threads only for connections with active processing

# See: /docs/2.4/mod/event.html


#LoadModule mpm_event_module modules/mod_mpm_event.so

[root@contoso ~]#

在主机 上配置 IP到自定义域名之间的映射关系

[root@mariadb1 ~]#cat > /etc/hosts localhost localhost.localdomain localhost4 localhost4.localdomain4

::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

[root@mariadb1 ~]#

[root@mariadb1 ~]#cat > /etc/yum.repos.d/MariaDB.repo

# MariaDB 10.2 CentOS repository list - created -07-08 12:50 UTC

# /mariadb/repositories/


name = MariaDB

baseurl = /10.2/centos7-amd64




[root@mariadb1 ~]# cat > /etc/yum.repos.d/MariaDB.repo

# MariaDB 10.2 CentOS repository list - created -07-08 12:50 UTC

# /mariadb/repositories/


name = MariaDB

baseurl = https://mirrors.tuna./mariadb/mariadb-10.2.7/yum/centos7-amd64/



[root@mariadb1 ~]#yum install -y MariaDB-server MariaDB-client

接下来配置数据库服务器存储数据的编码格式,配置查询日志 慢查询日志 binlog日志存储路径,默认这些功能他们是不开启的

[root@mariadb1~]#mkdir -p /var/log/mariadb && touch /var/log/mariadb/queries.log && touch /var/log/mariadb/mariadb-error.log && touch /var/log/mariadb/mariadb-slow.log && touch /var/log/mariadb/mariadb-log-bin && touch /var/log/mariadb/mariadb-log-bin.index && chown -R mysql:mysql /var/log/mariadb && chmod 755 /var/log/mariadb && chmod 666 /var/log/mariadb/*


[root@mariadb1~]#cat > /etc/f.d/f


# These groups are read by MariaDB server.

# Use it for options that only the server (but not clients) should see


# See the examples of server f files in /usr/share/mysql/


# this is read by the standalone daemon and embedded servers


# this is only for the mysqld standalone daemon














# * Galera-related settings



# Mandatory settings








# Allow server to accept connections on all interfaces.




# Optional setting



# this is only for embedded server


# This group is only read by MariaDB servers, not by MySQL.

# If you use the same .cnf file for MySQL and MariaDB,

# you can put MariaDB-only options here


# This group is only read by MariaDB-10.1 servers.

# If you use the same .cnf file for MariaDB of different versions,

# use this group for options that older servers don't understand



在主机 上配置 IP到自定义域名之间的映射关系

[root@mariadb2 ~]#cat > /etc/hosts localhost localhost.localdomain localhost4 localhost4.localdomain4

::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

[root@mariadb2 ~]#

[root@mariadb2 ~]#cat > /etc/yum.repos.d/MariaDB.repo

# MariaDB 10.2 CentOS repository list - created -07-08 12:50 UTC

# /mariadb/repositories/


name = MariaDB

baseurl = /10.2/centos7-amd64




[root@mariadb2 ~]# cat > /etc/yum.repos.d/MariaDB.repo

# MariaDB 10.2 CentOS repository list - created -07-08 12:50 UTC

# /mariadb/repositories/


name = MariaDB

baseurl = https://mirrors.tuna./mariadb/mariadb-10.2.7/yum/centos7-amd64/



[root@mariadb2 ~]#yum install -y MariaDB-server MariaDB-client

接下来配置数据库服务器存储数据的编码格式,配置查询日志 慢查询日志 binlog日志存储路径,默认这些功能他们是不开启的

[root@mariadb2 ~]#mkdir -p /var/log/mariadb && touch /var/log/mariadb/queries.log && touch /var/log/mariadb/mariadb-error.log && touch /var/log/mariadb/mariadb-slow.log && touch /var/log/mariadb/mariadb-log-bin && touch /var/log/mariadb/mariadb-log-bin.index && chown -R mysql:mysql /var/log/mariadb && chmod 755 /var/log/mariadb && chmod 666 /var/log/mariadb/*

[root@mariadb2 ~]#

[root@mariadb2~]#cat > /etc/f.d/f


# These groups are read by MariaDB server.

# Use it for options that only the server (but not clients) should see


# See the examples of server f files in /usr/share/mysql/


# this is read by the standalone daemon and embedded servers


# this is only for the mysqld standalone daemon














# * Galera-related settings



# Mandatory settings








# Allow server to accept connections on all interfaces.




# Optional setting



# this is only for embedded server


# This group is only read by MariaDB servers, not by MySQL.

# If you use the same .cnf file for MySQL and MariaDB,

# you can put MariaDB-only options here


# This group is only read by MariaDB-10.1 servers.

# If you use the same .cnf file for MariaDB of different versions,

# use this group for options that older servers don't understand


[root@mariadb2 ~]#

在主机 上配置 IP到自定义域名之间的映射关系

[root@rabbitmq1 ~]#cat > /etc/hosts localhost localhost.localdomain localhost4 localhost4.localdomain4

::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

[root@rabbitmq1 ~]#

消息服务器的安装请按照在CentOS 7系统上安装RabbitMQ /zhengzizhi/article/details/77018658

在主机 上配置虚拟主机:

[root@contoso ~]#cat > /etc/httpd/conf.d/httpd-vhosts.conf

<Directory "/home/myth/www/think">

Options +Indexes +FollowSymLinks

Order allow,deny

Allow from all

AllowOverride All

Require all granted


<VirtualHost *:80>

ServerAdmin zhengzizhi@

DocumentRoot "/home/myth/www/think/public"



ErrorLog "/home/myth/log/httpd/contoso-error_log"

CustomLog "/home/myth/log/httpd/contoso-access_log" common


<Directory "/home/myth/www/think">

Options +Indexes +FollowSymLinks

Order allow,deny

Allow from all

AllowOverride All

Require all granted


<VirtualHost *:80>

ServerAdmin zhengzizhi@

DocumentRoot "/home/myth/www/think/public"



ErrorLog "/home/myth/log/httpd/corp-contoso-error_log"

CustomLog "/home/myth/log/httpd/corp-contoso-access_log" common


<Directory "/home/myth/www/think">

Options +Indexes +FollowSymLinks

Order allow,deny

Allow from all

AllowOverride All

Require all granted


<VirtualHost *:80>

ServerAdmin zhengzizhi@

DocumentRoot "/home/myth/www/think/public"

ServerName user.

ServerAlias user.

ErrorLog "/home/myth/log/httpd/user-corp-contoso-error_log"

CustomLog "/home/myth/log/httpd/user-corp-contoso-access_log" common


[root@contoso ~]#

[myth@contoso ~]$mkdir -p /home/myth/log/httpd#注意:日志文件的目录需要提前创建好,

[root@contoso ~]#sed -i -- 's/^#ServerName :80/ServerName :80/g' /etc/httpd/conf/httpd.conf

直接贴消费者的实现代码 think\apps\index\controller\Consumer.php:

<?php/** 作者:zhengzizhi@* 日期:二O一七年 七夕节*/namespace app\index\controller;use PDO;use think\Db;use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;class Consumer{private $connection;private $channel;public function __construct(){}/*** Special remind: before updating consumer code to debug,first,please close consumer listen! * Listens for incoming messages* * [root@rabbitmq1 ~]# rabbitmqctl delete_vhost / && rabbitmqctl add_vhost / && rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'* [root@rabbitmq1 ~]# systemctl restart rabbitmq-server && rabbitmqadmin list exchanges* * [root@contoso ~]# cd /home/myth/www/think && php public/index.php index/Consumer/listen*/public function listen(){$this-> connection = new AMQPStreamConnection('', 5672, 'guest', 'guest');$this->channel = $this-> connection->channel();/*** Declares queue, creates if needed** @param string $queue* @param bool $passive* @param bool $durable* @param bool $exclusive* @param bool $auto_delete* @param bool $nowait* @param array $arguments* @param int $ticket* @return mixed|null*/$this->channel->queue_declare('bank.transfers1',#queue - Should be unique in direct exchangefalse, #passive - false Don't check if a queue with the same name existstrue, #durable - true The queue will survive(exist) server restartsfalse, #exclusive - false The queue might be accessed by other channelsfalse #auto_delete - false The queue won't be deleted once the channel is closed);/*** Declares exchange** @param string $exchange* @param string $type* @param bool $passive* @param bool $durable* @param bool $auto_delete* @param bool $internal* @param bool $nowait* @param array $arguments* @param int $ticket* @return mixed|null*/$this->channel->exchange_declare('corp1.fanout', #exchange - That is the exchange(corp.direct)'fanout',#type - That is the type(direct) of exchange(corp.direct)false, #passive - false Don't check if a exchange with the same name existstrue,#durable - true The exchange will survive(exist) server restartsfalse#auto_delete - false The exchange won't be deleted once the channel is closed);/*** Binds queue to an exchange** @param string $queue* @param string $exchange* @param string $routing_key* @param bool $nowait* @param array $arguments* @param int $ticket* @return mixed|null*/$this->channel->queue_bind('bank.transfers1', 'corp1.fanout');/*** Specifies QoS* don't dispatch a new message to a worker until it has processed and* acknowledged the previous one. Instead, it will dispatch it to the* next worker that is not still busy.** @param int $prefetch_size* @param int $prefetch_count* @param bool $a_global* @return mixed*/$this->channel->basic_qos(null, #prefetch size - prefetch window size in octets, null meaning "no specific limit"1,#prefetch count - prefetch window in terms of whole messagesnull #a_global - null to mean that the QoS settings should apply per-consumer#a_global - true to mean that the QoS settings should apply per-channel);/*** Starts a queue consumer** @param string $queue* @param string $consumer_tag* @param bool $no_local* @param bool $no_ack* @param bool $exclusive* @param bool $nowait* @param callback|null $callback* @param int|null $ticket* @param array $arguments* @return mixed|string*/$this->channel->basic_consume('bank.transfers1',#queue - get the messages from the queue(bank.transfers)'',#consumer_tag - Consumer identifierfalse, #no_local - Don't receive messages published by this consumerfalse, #no_ack - false acks turned on, - true turned off. send a proper acknowledgment from the worker, once we're done with a taskfalse, #exclusive - false The queue(bank.transfers) may be accessed by the all connectionsfalse, #nowait - false Don't wait for a server response[$this, 'callback1'] #callback - A PHP callback);$this->channel->queue_declare('bank.transfers2', #queue - Should be unique in direct exchangefalse, #passive - false Don't check if a queue with the same name existstrue, #durable - true The queue will survive(exist) server restartsfalse, #exclusive - false The queue might be accessed by other channelsfalse #auto_delete - false The queue won't be deleted once the channel is closed);$this->channel->exchange_declare('corp2.fanout', #exchange - That is the exchange(corp.direct)'fanout', #type - That is the type(direct) of exchange(corp.direct)false,#passive - false Don't check if a exchange with the same name existstrue, #durable - true The exchange will survive(exist) server restartsfalse #auto_delete - false The exchange won't be deleted once the channel is closed);$this->channel->queue_bind('bank.transfers2', 'corp2.fanout');$this->channel->basic_consume('bank.transfers2',#queue - get the messages from the queue(bank.transfers)'',#consumer_tag - Consumer identifierfalse, #no_local - Don't receive messages published by this consumerfalse, #no_ack - false acks turned on, - true turned off. send a proper acknowledgment from the worker, once we're done with a taskfalse, #exclusive - false The queue(bank.transfers) may be accessed by the all connectionsfalse, #nowait - false Don't wait for a server response[$this, 'callback2'] #callback - A PHP callback);// 'Consuming from queue';# Loop as long as the channel has callbacks registered# After 10 seconds there will be a timeout exception# $channel->wait(null, false, 10)while(count($this->channel->callbacks)) {// 'Waiting for incoming messages'$this->channel->wait();}$this->channel->close();$this->connection->close();}/*** Executes when a message is received.** @param AMQPMessage $req*/public function callback1(AMQPMessage $req) {$account = json_decode($req->body);$msg_id = $req->get('correlation_id');$user_from = $account->user_from;$amount = $account->amount; $status = $account->status;$user_to = $account->user_to; $_isSuccess = 1;if($account->status == 0){$status = 1;$db2 = Db::connect('db2');$db2->startTrans(); try{$cnt = $db2->query('SELECT COUNT(*) AS cnt FROM think_message_apply a WHERE a.msg_id = ?',[$msg_id]);if ($cnt[0] == ['cnt'=>0]) {$db2->execute('INSERT INTO think_message_apply(msg_id,user_from,amount,status,user_to)VALUES(?,?,?,?,?)',[$msg_id,$user_from,$amount,$status,$user_to]);}$db2->commit();} catch (\Exception $e){$db2->rollback();$_isSuccess = 0;}}if($_isSuccess == 1 && $account->status == 0){$status = 1;/** Creating a reply message with the same correlation id than the incoming message*/$msg = new AMQPMessage(json_encode(['success' => true,'data' => ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,],]), #message['correlation_id' => $msg_id]);/*** Publishes a message to the same channel from the incoming message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$req->delivery_info['channel']->basic_publish($msg, #msg'', #exchange$req->get('reply_to') #routing_key);//回复一条入账成功的消息给生产者(消息发送者) /*** Acknowledges one or more messages to delivery_tag* If a consumer dies without sending an acknowledgement the AMQP broker* will redeliver it to another consumer or, if none are available at the* time, the broker will wait until at least one consumer is registered* for the same queue before attempting redelivery** @param string $delivery_tag* @param bool $multiple*/$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag'] #delivery_tag = '1');//确认一条回复消息已经发送} else {$status = 0;/** Creating a reply message with the same correlation id than the incoming message*/$msg = new AMQPMessage(json_encode(['success' => false,'data' => ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,],]), #message['correlation_id' => $msg_id]);/*** Publishes a message to the same channel from the incoming message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$req->delivery_info['channel']->basic_publish($msg, #msg'', #exchange$req->get('reply_to') #routing_key);/*** Acknowledges one or more messages to delivery_tag* If a consumer dies without sending an acknowledgement the AMQP broker* will redeliver it to another consumer or, if none are available at the* time, the broker will wait until at least one consumer is registered* for the same queue before attempting redelivery** @param string $delivery_tag* @param bool $multiple*/$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag'] #delivery_tag = '1');}}/*** Executes when a message is received.** @param AMQPMessage $req*/public function callback2(AMQPMessage $req) {$account = json_decode($req->body);$msg_id = $req->get('correlation_id');$user_from = $account->user_from;$amount = $account->amount; $status = $account->status;$user_to = $account->user_to; $_isSuccess = 1;if($account->status == 1){$status = 2;$db2 = Db::connect('db2');$db2->startTrans();try{$db2->execute('UPDATE think_account a SET a.amount = a.amount + ? WHERE a.user_id = ?',[$amount,$user_to]);$db2->execute('UPDATE think_message_apply b SET b.STATUS = ? WHERE b.msg_id = ?',[$status,$msg_id]);$db2->commit();} catch (\Exception $e){$db2->rollback();$_isSuccess = 0;}}if($_isSuccess == 1 && $account->status == 1){$status = 2;/** Creating a reply message with the same correlation id than the incoming message*/$msg = new AMQPMessage(json_encode(['success' => true,'data' => ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,],]), #message['correlation_id' => $msg_id]);/*** Publishes a message to the same channel from the incoming message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$req->delivery_info['channel']->basic_publish($msg, #msg'', #exchange$req->get('reply_to') #routing_key);//回复一条入账成功的消息给生产者 /*** Acknowledges one or more messages to delivery_tag* If a consumer dies without sending an acknowledgement the AMQP broker* will redeliver it to another consumer or, if none are available at the* time, the broker will wait until at least one consumer is registered* for the same queue before attempting redelivery** @param string $delivery_tag* @param bool $multiple*/$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag'] #delivery_tag = '2');//确认一条回复消息已经发送} else {$status = 1;/** Creating a reply message with the same correlation id than the incoming message*/$msg = new AMQPMessage(json_encode(['success' => false,'data' => ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,],]), #message['correlation_id' => $msg_id]);/*** Publishes a message to the same channel from the incoming message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$req->delivery_info['channel']->basic_publish($msg, #msg'', #exchange$req->get('reply_to') #routing_key);/*** Acknowledges one or more messages to delivery_tag* If a consumer dies without sending an acknowledgement the AMQP broker* will redeliver it to another consumer or, if none are available at the* time, the broker will wait until at least one consumer is registered* for the same queue before attempting redelivery** @param string $delivery_tag* @param bool $multiple*/$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag'] #delivery_tag = '2');//确认一条回复消息已经发送}}}

直接贴生产者的实现代码 think\apps\index\controller\Producer.php:

<?php/** 作者:zhengzizhi@* 日期:二O一七年 七夕节*/namespace app\index\controller;use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;use think\Request;use think\Db;use PDO;class Producer{private $connection;private $channel;private $callback1_queue;private $callback2_queue;private $pass_msg1 = true;private $pass_msg2 = true;/*** @var string*/private $response;private $waiting1;private $waiting2;/*** @var string*/private $corr_id;private $suffix1 = ['01','02','03','04','05','06','07','08','09'];private $suffix2 = ['10','20','30','40','50','60','70','80','90'];public function __construct() {$this->connection = new AMQPStreamConnection('', 5672, 'guest', 'guest');$this->channel = $this->connection->channel();}/*** @return string* * [root@contoso ~]# chown -R apache:apache /home/myth/www/think/apps/bank-data/data.csv && ll /home/myth/www/think/apps* [root@contoso ~]# chmod -R 0755 /home/myth/www/think/apps/bank-data && ll /home/myth/www/think/apps/bank-data* [root@contoso ~]# cat /home/myth/www/think/apps/bank-data/data.csv* [root@contoso ~]# cat /dev/null > /home/myth/www/think/apps/bank-data/data.csv** [root@mariadbxxx ~]# cat /dev/null > /var/log/mariadb/queries.log && cat /dev/null > /var/log/mariadb/mariadb-slow.log && cat /dev/null > /var/log/mariadb/mariadb-error.log* [root@mariadbxxx ~]# mysql -uroot -p123456 -h127.0.0.1 -e "reset master"* * GET /index/producer/transfer?account[user_from]=1&account[amount]=1024&account[status]=0&account[user_to]=2* * [root@rabbitmq1 ~]# rabbitmqadmin list bindings* [root@rabbitmq1 ~]# rabbitmqadmin list queues* [root@rabbitmq1 ~]# rabbitmqadmin get queue=bank.transfers1 requeue=true count=30* [root@rabbitmq1 ~]# rabbitmqadmin get queue=bank.transfers2 requeue=true count=10* * [myth@contoso ~]$ ab -r -t 7200 -s 7200 -k -n 100000 -c 500 "/index/producer/transfer?account[user_from]=1&account[amount]=1024&account[status]=0&account[user_to]=2"*/public function transfer(Request $request){$this->response = null;$msg_id = session_create_id();//uniqid(); //$index = random_int(1,9);/** $this->corr_id has a value like 53e26b393313a*/$this->corr_id = $msg_id;$user_from = $request->param('account.user_from');$amount = $request->param('account.amount');$status = $request->param('account.status');$user_to = $request->param('account.user_to');$account = ['msg_id' => $msg_id,'user_from' => $user_from,'amount' => $amount,'status' => $status,'user_to' => $user_to,]; $this->channel->exchange_declare("corp1.fanout",'fanout',false,true,false);list($this->callback1_queue,, ) = $this->channel->queue_declare('', #queue $msg_id.$this->suffix1[$index]false, #passivetrue, #durabletrue, #exclusivefalse#auto delete);$this->channel->queue_declare("bank.transfers1", #queuefalse, #passivetrue, #durablefalse, #exclusivefalse #auto delete);$this->channel->queue_bind('', "corp1.fanout");$this->channel->queue_bind("bank.transfers1", "corp1.fanout");/** create a message with two properties: reply_to, which is set to the* callback queue and correlation_id, which is set to a unique value for* every request*/$msg1 = new AMQPMessage(json_encode($account), #body['correlation_id' => $this->corr_id,'reply_to' => $this->callback1_queue,'delivery_mode' => 2,]#properties);/*** Publishes a message** @param AMQPMessage $msg* @param string $exchange* @param string $routing_key* @param bool $mandatory* @param bool $immediate* @param int $ticket*/$this->channel->basic_publish($msg1, #message"corp1.fanout", #exchange"bank.transfers1",#routing keytrue, #mandatoryfalse);$this->channel->basic_consume($this->callback1_queue, #queue'', #consumer_tag = amq.ctag-bzBXVZr5iF7R16bq1NYgYwfalse, #no localfalse, #no ackfalse, #exclusivefalse, #no wait[$this, 'onCallback1']#callback);$this->waiting1 = false;while(!$this->waiting1) {$this->channel->wait();}if($this->pass_msg1 == false){$this->channel->close();$this->connection->close();return $this->response;}$this->channel->exchange_declare("cor2.fanout",'fanout',false,true,false);list($this->callback2_queue,, ) = $this->channel->queue_declare('', #queue $msg_id.$this->suffix2[$index]false, #passivetrue, #durabletrue, #exclusivefalse#auto delete);$this->channel->queue_declare("bank.transfers2", #queuefalse,#passivetrue,#durablefalse,#exclusivefalse#auto delete);$this->channel->queue_bind('', "corp2.fanout");$this->channel->queue_bind("bank.transfers2", "corp2.fanout");$account['status'] = 1;$msg2 = new AMQPMessage(json_encode($account), #body['correlation_id' => $this->corr_id,'reply_to' => $this->callback2_queue,'delivery_mode' => 2,]#properties);$this->channel->basic_publish($msg2, #message"corp2.fanout", #exchange "bank.transfers2",#routing keytrue, #mandatoryfalse);$this->channel->basic_consume($this->callback2_queue, #queue'', #consumer_tag = amq.ctag-bzBXVZr5iF7R16bq1NYgYwfalse, #no localfalse, #no ackfalse, #exclusivefalse, #no wait[$this, 'onCallback2']#callback);$this->waiting2 = false;while(!$this->waiting2) {$this->channel->wait();}$this->channel->close();$this->connection->close();return $this->response;}/*** When a message appears, it checks the correlation_id property. If it* matches the value from the request it returns the response to the* application.** @param AMQPMessage $rep*/public function onCallback1(AMQPMessage $rep) {if($rep->get('correlation_id') == $this->corr_id) { $this->waiting1 = true;$body = json_decode($rep->body);if($body->success == true){$msg_id = $body->data->msg_id;$user_from = $body->data->user_from;$amount = $body->data->amount;$status = $body->data->status;$user_to = $body->data->user_to;if($status == 1){$db1 = Db::connect('db1');$db1->startTrans();try {$db1->execute('INSERT INTO think_message_supply(msg_id,user_from,amount,status,user_to) VALUES (?,?,?,?,?)',[$msg_id,$user_from,$amount,$status,$user_to]);$db1->commit();} catch (\Exception $e){$db1->rollback();$this->response = json(['success'=>0,'msg'=>"First Transaction db1 has failed."]);$this->pass_msg1 = false;}} } else {$this->response = json(['success'=>0,'msg'=>"First Transaction db2 has failed."]);$this->pass_msg1 = false;}}}/*** When a message appears, it checks the correlation_id property. If it* matches the value from the request it returns the response to the* application.** @param AMQPMessage $rep*/public function onCallback2(AMQPMessage $rep) {if($rep->get('correlation_id') == $this->corr_id) {$this->waiting2 = true;$body = json_decode($rep->body);if( $body->success == true){ $msg_id = $body->data->msg_id;$user_id = $body->data->user_from;$amount = $body->data->amount;$status = $body->data->status;if($status == 2){$db1 = Db::connect('db1');$db1->startTrans();try {$db1->execute('UPDATE think_message_supply a SET a.status = ? WHERE a.msg_id = ?',[$status,$msg_id]);$db1->execute('UPDATE think_account b SET b.amount = b.amount - ? WHERE b.user_id = ?',[$amount,$user_id]);$db1->commit();} catch (\Exception $e){$db1->rollback();file_put_contents(APP_PATH.'bank-data/data.csv', $rep->body.' ## '.date('Y-m-d H:i:s',time()).PHP_EOL, FILE_APPEND);$this->response = json(['success'=>1,'msg'=>"Avoid to rededuct money,now you must contact our customer."]);$this->pass_msg2 = false;}if($this->pass_msg2 == true){$this->response = json(['success'=>1,'msg'=>'Bank Transaction is successfull.']);}} } else {$this->response = json(['success'=>0,'msg'=>"Second Transaction db2 has failed."]);}}}}




USE `db1`;

DROP TABLE IF EXISTS `think_account`;

CREATE TABLE `think_account` (

`user_id` int(4) NOT NULL,

`amount` decimal(18,2) DEFAULT NULL,

PRIMARY KEY (`user_id`)


insert into `think_account`(`user_id`,`amount`) values (1,'9999851572743.00'),(3,'9999999999999.00'),(5,'9999999999999.00');

DROP TABLE IF EXISTS `think_message_supply`;

CREATE TABLE `think_message_supply` (

`msg_id` varchar(40) NOT NULL,

`user_from` int(11) DEFAULT NULL,

`amount` decimal(18,2) DEFAULT NULL,

`status` tinyint(4) DEFAULT 0,

`user_to` int(11) DEFAULT NULL,

`time` timestamp NULL DEFAULT current_timestamp(),

PRIMARY KEY (`msg_id`)




USE `db2`;

DROP TABLE IF EXISTS `think_account`;

CREATE TABLE `think_account` (

`user_id` int(4) NOT NULL,

`amount` decimal(18,2) DEFAULT NULL,

PRIMARY KEY (`user_id`)


insert into `think_account`(`user_id`,`amount`) values (2,'0.00'),(4,'500.00'),(6,'500.00');

DROP TABLE IF EXISTS `think_message_apply`;

CREATE TABLE `think_message_apply` (

`msg_id` varchar(40) NOT NULL,

`user_from` int(11) DEFAULT NULL,

`amount` decimal(18,2) DEFAULT NULL,

`status` tinyint(4) DEFAULT 0,

`user_to` int(11) DEFAULT NULL,

`time` timestamp NULL DEFAULT current_timestamp(),

PRIMARY KEY (`msg_id`)




[root@contoso ~]#cd /home/myth/www/think && php public/index.php index/Consumer/listen

测试500个并发,可能有点高了在虚拟机环境里,因为有请求丢失了(ab 工具因为socket接收数据出现错误,关闭了出现错误的请求连接,相当于这条请求没发送成功)

[myth@contoso ~]$ab -r -t 7200 -s 7200 -k -n 100000 -c 500 "/index/producer/transfer?account[user_from]=1&account[amount]=1024&account[status]=0&account[user_to]=2"

查看一下消息服务器 队列瞬间执行的情形:


看看启动的httpd进程个数吧(没有直接统计,而是把SecureCRT终端输出的全部httpd进程贴到NotePad++里滚屏截图(FastStone Capture)打印的) 进程总的个数 = 第623行 - 第3行 + 1 = 621个 httpd进程 空闲httpd个数是100个 + 500个并发 + 默认启动的1个httpd = 刚好等于621个httpd进程


[root@rabbitmq1 ~]#rabbitmqctl delete_vhost / && rabbitmqctl add_vhost / && rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'

Deleting vhost "/"

Creating vhost "/"

Setting permissions for user "guest" in vhost "/"

[root@rabbitmq1 ~]#systemctl restart rabbitmq-server && rabbitmqadmin list exchanges


| name | type |


| | direct |

| amq.direct | direct |

| amq.fanout | fanout |

| amq.headers | headers |

| amq.match | headers |

| amq.rabbitmq.log | topic |

| amq.rabbitmq.trace | topic |

| amq.topic | topic |


[root@rabbitmq1 ~]#

读写文件的用户是httpd的用户apache 所以data.csv文件的读写权限是分配给apache用户的

如果你够细心runtime文件夹的读写权限也是apache 连root都无法写操作 要留意哦,PHP源码注释里我写这些东西不为了好看才写的,


[root@contoso ~]# chown -R apache:apache /home/myth/www/think/apps/bank-data/data.csv && ll /home/myth/www/think/apps

[root@contoso ~]# chmod -R 0755 /home/myth/www/think/apps/bank-data && ll /home/myth/www/think/apps/bank-data

[root@contoso ~]# cat /home/myth/www/think/apps/bank-data/data.csv

GET /index/producer/transfer?account[user_from]=1&account[amount]=1024&account[status]=0&account[user_to]=2

[root@rabbitmq1 ~]# rabbitmqadmin list bindings

[root@rabbitmq1 ~]# rabbitmqadmin list queues

[root@rabbitmq1 ~]# rabbitmqadmin get queue=bank.transfers1 requeue=true count=30

[root@rabbitmq1 ~]# rabbitmqadmin get queue=bank.transfers2 requeue=true count=10
