300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > python项目开发:用RabbitMQ实现异步RPC

python项目开发:用RabbitMQ实现异步RPC

时间:2023-11-14 21:32:48

相关推荐

python项目开发:用RabbitMQ实现异步RPC

程序要求:

1. 用Rabbit MQ实现RPC

1. 可以异步地执行多条命令

2. 可以对一次性对多个机器执行命令

程序效果:

---》run dir host1 host2 。。。。

---》get task_id

---》taskId:xxxx host: xxxxxx

---》check task_id

--->打印结果

程序分析:

为了达到异步地效果,可以使用多线程或协程,即每执行一条命令就启动一条线程或协程。客户端发送命令到队列、从返回队列接收结果分离,不能写到一起。

业务逻辑:

代码实现:

README

#author:Wu zhiHao#博客地址:/BUPT-MrWu/p/10364619.html#程序目录框架:|--RPC|--RPC_server #服务端|--bin|--start.py #程序入口|--core|--RpcServer.py #服务端主要逻辑|--RPC_client #客户端|--bin|--start.py #程序入口|--core|--main.py #程序主要逻辑|--modules|--RpcClient.py #客户端主要逻辑|--conf|--settings.py #配置文件|--READ_ME#命令格式:1. run command host1 host2..... #执行命令2. all_task #获取全部task_id3. check task_id #获取命令结果

View Code

RPC_server\\bin\\start.py

import sys,osBASE_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))sys.path.append(BASE_dir)from core import RpcServerif __name__ == '__main__':obj = RpcServer.RpcServer()obj.channel.start_consuming()

View Code

RPC_server\\core\\RpcServer.py

import pikaimport osimport socketfrom conf import settingsclass RpcServer(object):def __init__(self):self.credentials = pika.PlainCredentials(settings.RabbitMq_name,settings.RabbitMq_password) #RabbiMQ用户认证self.connection = pika.BlockingConnection(pika.ConnectionParameters(settings.RabbitMq_ip,settings.RabbitMq_port,"/",self.credentials,))self.My_Ip = self.get_ip() #获取服务端IP地址self.channel = self.connection.channel()self.result = self.channel.queue_declare(exclusive=True)self.queue_name = self.result.method.queueself.channel.exchange_declare(exchange="Rpc",exchange_type="direct",)self.channel.queue_bind(exchange="Rpc",queue=self.queue_name,routing_key=self.My_Ip,)self.channel.basic_consume(self.on_response,queue=self.queue_name,)def on_response(self,ch,method,properties,body):command = body.decode()command_result = self.on_request(command)self.channel.basic_publish(exchange="",routing_key=properties.reply_to,properties=pika.BasicProperties(correlation_id=properties.correlation_id,),body=command_result)def on_request(self,command):return os.popen(command).read()def get_ip(self):computer_name = socket.getfqdn(socket.gethostname( ))computer_Ip = socket.gethostbyname(computer_name)return computer_Ip

View Code

RPC_client\\bin\\start.py

import sys,osBASE_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))sys.path.append(BASE_dir)from core import mainif __name__ == '__main__':obj = main.run()obj.start()

View Code

RPC_client\\core\\main.py

import randomimport threadingfrom modules import RpcClientclass run(object):def __init__(self):self.client = RpcClient.RpcClient()self.information = {}def start(self):while True:try:command = input("-->")if not command:continuet = threading.Thread(target=self.select,args=(command,))t.start()except Exception as e:print(e)def select(self,command):'''解析命令'''try:keyword = command.split()[0]func = getattr(self,keyword)func(command)except Exception as e:print(e)def run(self,command):'''执行命令'''try:task_id = str(random.randint(100,1000))self.information[task_id] = {}keyword = command.split()[1]for host in command.split()[2:]:result = self.client.on_request(host,keyword)self.information[task_id][host] = [result[0],result[1]]except Exception as e:print(e)def check(self,command):'''获取命令结果'''try:task_id = command.split()[1]for host in self.information[task_id]:corr_id = self.information[task_id][host][0]callback_queue = self.information[task_id][host][1]command_result = self.client.get_response(corr_id,callback_queue)print("%s:\n%s"%(host,command_result))self.information.pop(task_id) #删除task_idexcept Exception as e:print(e)def all_task(self,command):'''获取全部task_id'''try:for task_id in self.information:all_host = []for host in self.information[task_id]:all_host.append(host)print("task_id: %s host: %s\n"%(task_id,all_host))except Exception as e:print(e)

View Code

RPC_client\\conf\\settings.py

RabbitMq_name = "XXX" #RabbitMq用户名RabbitMq_password = "XXX" #rabbitmq用户密码RabbitMq_ip = "XXX" #RabbitMq端的IP地址RabbitMq_port = 5672 #RabbitMq端的端口号

View Code

RPC_client\\mudules\\RpcClient.py

import pikaimport uuidfrom conf import settingsclass RpcClient(object):def __init__(self):self.credentials = pika.PlainCredentials(settings.RabbitMq_name,settings.RabbitMq_password) #RabbiMQ用户认证self.connection = pika.BlockingConnection(pika.ConnectionParameters(settings.RabbitMq_ip,settings.RabbitMq_port,"/",self.credentials,))self.channel = self.connection.channel()def get_response(self,corr_id,callback_queue):'''从队列里取值'''self.corr_id = corr_idself.response = Noneself.channel.basic_consume(self.on_response,queue=callback_queue,)while self.response is None:self.connection.process_data_events() #非阻塞版的start_consumingreturn self.responsedef on_response(self,ch,method,properties,body):'''当队列里有数据时执行'''if self.corr_id == properties.correlation_id:self.response = body.decode()def on_request(self,host,command):'''发送命令'''result = self.channel.queue_declare(exclusive=False) #生成另一个queue时,这个queue不会消失callback_queue = result.method.queue #返回queuecorr_id = str(uuid.uuid4()) #验证码 self.channel.exchange_declare(exchange="Rpc",exchange_type="direct")self.channel.basic_publish(exchange="Rpc",routing_key=host,properties=pika.BasicProperties(correlation_id=corr_id,reply_to=callback_queue,),body=command,)return corr_id,callback_queue #返回验证值和返回queue

View Code

程序执行实例:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。