使用多进程和多线程最好的方式是使用Master-Worker
的设计模式
多进程
多进程不共享内存空间,所以变量将会拷贝一份副本到新的进程中。
进程可以在不同的CPU核心上切换,Unix使用的底层的fork()
函数,而windows支持的并不好,所以只能最大限度模拟。
通用的封装模块有两个。subprocess
和multiprocessing
调用外部程序使用的是subprocess
,而创建新的进程往往使用的是multiprocessing
调用外部程序
当调用的外部程序不需要复杂交互的时候,可以使用call
方法直接创建子进程,然后得到返回。call
方法是拥塞的,将会等到子进程结束才会继续执行父进程。
当调用的外部程序有复杂交互的时候,可以使用Popen
方法。Popen
方法是非拥塞的。
import subprocess
r = subprocess.call(['nslookup', 'python.org'])
print('Exit code:', r)
import subprocess
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = p.communicate(b'setq=mx\npython.org\nexit\n')
print(output.decode('utf-8', 'ignore'))
print('Return Code:', p.returncode)
创建子进程
创建子进程也有两种方式,一种是用multiprocessing.Process
进行单次创建,另一种就是使用multiprocessing.Pool
创建一个池,然后管理池中子进程.
multiprocessing.Process
from multiprocessing import Process
import os
from time import sleep
def run_child(name):
sleep(1)
print('Run child name %s, pid is %s' % (name, os.getpid()))
if __name__ == "__main__":
print('Parent process %s.' % os.getpid())
p = Process(target=run_child, args=('test',))
print('start child process')
p.start() # 会自动调用run_child函数
p.join() # 会等待子进程结束后再继续往下运行
print('Child process end')
multiprocessing.Pool
from multiprocessing import Pool, cpu_count
import os
import time
import random
def long_time_wait(name):
print("Run task %s, pid is %s" % (name, os.getpid()))
start_time = time.time()
time.sleep(random.random() * 3)
end_time = time.time()
print("Task %s runs %0.2f sec" % (name, (end_time - start_time)))
return f'hello {name}'
if __name__ == '__main__':
print("Parent pid is %s" % os.getpid())
p = Pool(cpu_count()) # 这个是进程池,默认是CPU核数,可以自己指定
jobs = []
print("subprocess Start")
for i in range(10):
jobs.append(p.apply_async(long_time_wait, args=(i,))) # 这个是异步的,异步的执行,不会阻塞当前的进程
# jobs.append(p.apply(long_time_wait, args=(1,))) # 这个是同步的,同步的执行,会阻塞当前的进程
print('before close')
p.close() # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用
p.join() # 等待进程池中的所有进程执行完毕
print("All subprocess done")
for job in jobs:
print(job.get()) # 获取进程的返回值
## output:
#Parent pid is 14068
#subprocess Start
#before close
#Run task 0, pid is 7984
#Run task 1, pid is 12564
#Run task 2, pid is 12380
#Run task 3, pid is 5828
#Run task 4, pid is 14752
#Run task 5, pid is 7124
#Run task 6, pid is 16648
#Run task 7, pid is 2712
#Task 1 runs 0.36 sec
#Run task 8, pid is 12564
#Task 6 runs 0.87 sec
#Run task 9, pid is 16648
#Task 0 runs 0.93 sec
#Task 2 runs 1.30 sec
#Task 8 runs 1.20 sec
#Task 4 runs 2.73 sec
#Task 3 runs 2.84 sec
#Task 5 runs 2.84 sec
#Task 9 runs 2.02 sec
#Task 7 runs 2.97 sec
#All subprocess done
另外如果想使用装饰器的话是会有问题的,参考这篇文章, 解决办法只能使用等价形式调用。
进程间通讯
进程间的通讯使用的是multiprocessing.Queue
队列
from multiprocessing import Process, Queue
import time
import random
def write(q):
for value in ['A', 'B', 'C']:
print('Put %s in queue' % value)
q.put(value)
time.sleep(random.random() * 3)
def read(q):
while True:
value = q.get(True)
print('Get %s from queue' % value)
time.sleep(0.1)
if __name__ == '__main__':
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate() # 由于read是死循环,需要强制结束
Process
中有terminate
方法强制结束,但是线程中没有。
多线程
多线程共享一个内存空间,调用的变量也是同一个。
多线程python支持的不错,但是以下几点必须注意:
- 在读写全局变量的时候必须加入线程锁,并且确保释放
- 多线程的变量传参使用
threading.Local
对象 - python的多线程只能利用一核,要使用多核就必须要用到多进程了
但是不得不说,多线程没有协程好用,并且效率没有协程高。python 异步IO
不带读写的多线程创建
import threading
import time
import random
def loop():
print('thread name %s' % threading.current_thread().name)
for i in range(6):
print('thread name %s >>> %s' % (threading.current_thread().name, i))
time.sleep(random.random())
print('thread name %s Done' % threading.current_thread().name)
print('Main thread: ', threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('Main thread %s Done' % threading.current_thread().name)
读写带进程锁的多线程创建
import threading
# 假定这是你的银行存款:
balance = 0
lock = threading.Lock()
def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(2000000):
lock.acquire()
try:
change_it(n)
finally:
lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
Local 传递变量
使用local其实是省略了dict[threading.current_thread()]
,来保证取到的变量是本线程的变量
import threading
local_s = threading.local()
def start_thread(name):
local_s.name = name
proc_name()
def proc_name():
name = local_s.name
print("%s, thread: %s" % (name, threading.current_thread().name))
p1 = threading.Thread(target=start_thread, args=("p1",), name="Thread_1")
p2 = threading.Thread(target=start_thread, args=("p2",), name="Thread_2")
p1.start()
p2.start()
p1.join()
p2.join()
线程池
使用multiprocessing.pool.ThreadPool
创建线程池
from multiprocessing.pool import ThreadPool
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('elin', 'foo',))
result = async_result.get() # 取返回值
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 365433079@qq.com