python 分布式进程


参考廖雪峰的官方网站 https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上

如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?

原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了

# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')

请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。

然后,在另一台机器上启动任务进程(本机上启动也可以):

# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 处理结束:
print('worker exit.')
mac@1987demac script % python task_master.py
Put task 7166...
Put task 9589...
Put task 8483...
Put task 7304...
Put task 1052...
Put task 8938...
Put task 2517...
Put task 1625...
Put task 5954...
Put task 4582...
Try get results...

Result: 7166 * 7166 = 51351556
Result: 9589 * 9589 = 91948921
Result: 8483 * 8483 = 71961289
Result: 7304 * 7304 = 53348416
Result: 1052 * 1052 = 1106704
Result: 8938 * 8938 = 79887844
Result: 2517 * 2517 = 6335289
Result: 1625 * 1625 = 2640625
Result: 5954 * 5954 = 35450116
Result: 4582 * 4582 = 20994724
master exit.
mac@1987demac script %
mac@1987demac script % python task_work.py 
Connect to server 127.0.0.1...
run task 7166 * 7166...
run task 9589 * 9589...
run task 8483 * 8483...
run task 7304 * 7304...
run task 1052 * 1052...
run task 8938 * 8938...
run task 2517 * 2517...
run task 1625 * 1625...
run task 5954 * 5954...
run task 4582 * 4582...
worker exit.