python 进程与线程

使用多进程和多线程最好的方式是使用Master-Worker的设计模式

多进程

多进程不共享内存空间,所以变量将会拷贝一份副本到新的进程中。

进程可以在不同的CPU核心上切换,Unix使用的底层的fork()函数,而windows支持的并不好,所以只能最大限度模拟。

通用的封装模块有两个。subprocessmultiprocessing

调用外部程序使用的是subprocess,而创建新的进程往往使用的是multiprocessing

调用外部程序

当调用的外部程序不需要复杂交互的时候,可以使用call方法直接创建子进程,然后得到返回。
call方法是拥塞的,将会等到子进程结束才会继续执行父进程。

当调用的外部程序有复杂交互的时候,可以使用Popen方法。
Popen方法是非拥塞的。

import subprocess
r = subprocess.call(['nslookup', 'python.org'])
print('Exit code:', r)
import subprocess
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE,
                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)

output, error = p.communicate(b'setq=mx\npython.org\nexit\n') print(output.decode('utf-8', 'ignore')) print('Return Code:', p.returncode)

创建子进程

创建子进程也有两种方式,一种是用multiprocessing.Process进行单次创建,另一种就是使用multiprocessing.Pool创建一个池,然后管理池中子进程.

multiprocessing.Process

from multiprocessing import Process
import os
from time import sleep


def run_child(name): sleep(1) print('Run child name %s, pid is %s' % (name, os.getpid()))

if __name__ == "__main__": print('Parent process %s.' % os.getpid()) p = Process(target=run_child, args=('test',)) print('start child process') p.start() # 会自动调用run_child函数 p.join() # 会等待子进程结束后再继续往下运行 print('Child process end')

multiprocessing.Pool

from multiprocessing import Pool, cpu_count
import os
import time
import random


def long_time_wait(name): print("Run task %s, pid is %s" % (name, os.getpid())) start_time = time.time() time.sleep(random.random() * 3) end_time = time.time() print("Task %s runs %0.2f sec" % (name, (end_time - start_time))) return f'hello {name}'

if __name__ == '__main__': print("Parent pid is %s" % os.getpid()) p = Pool(cpu_count()) # 这个是进程池,默认是CPU核数,可以自己指定 jobs = [] print("subprocess Start") for i in range(10): jobs.append(p.apply_async(long_time_wait, args=(i,))) # 这个是异步的,异步的执行,不会阻塞当前的进程 # jobs.append(p.apply(long_time_wait, args=(1,))) # 这个是同步的,同步的执行,会阻塞当前的进程 print('before close') p.close() # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用 p.join() # 等待进程池中的所有进程执行完毕 print("All subprocess done") for job in jobs: print(job.get()) # 获取进程的返回值
## output: #Parent pid is 14068 #subprocess Start #before close #Run task 0, pid is 7984 #Run task 1, pid is 12564 #Run task 2, pid is 12380 #Run task 3, pid is 5828 #Run task 4, pid is 14752 #Run task 5, pid is 7124 #Run task 6, pid is 16648 #Run task 7, pid is 2712 #Task 1 runs 0.36 sec #Run task 8, pid is 12564 #Task 6 runs 0.87 sec #Run task 9, pid is 16648 #Task 0 runs 0.93 sec #Task 2 runs 1.30 sec #Task 8 runs 1.20 sec #Task 4 runs 2.73 sec #Task 3 runs 2.84 sec #Task 5 runs 2.84 sec #Task 9 runs 2.02 sec #Task 7 runs 2.97 sec #All subprocess done

另外如果想使用装饰器的话是会有问题的,参考这篇文章, 解决办法只能使用等价形式调用。

进程间通讯

进程间的通讯使用的是multiprocessing.Queue队列

from multiprocessing import Process, Queue
import time
import random


def write(q): for value in ['A', 'B', 'C']: print('Put %s in queue' % value) q.put(value) time.sleep(random.random() * 3)

def read(q): while True: value = q.get(True) print('Get %s from queue' % value) time.sleep(0.1)

if __name__ == '__main__': q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() pr.terminate() # 由于read是死循环,需要强制结束

Process中有terminate方法强制结束,但是线程中没有。

多线程

多线程共享一个内存空间,调用的变量也是同一个。

多线程python支持的不错,但是以下几点必须注意:

  1. 在读写全局变量的时候必须加入线程锁,并且确保释放
  2. 多线程的变量传参使用threading.Local对象
  3. python的多线程只能利用一核,要使用多核就必须要用到多进程了

但是不得不说,多线程没有协程好用,并且效率没有协程高。python 异步IO

不带读写的多线程创建

import threading
import time
import random


def loop(): print('thread name %s' % threading.current_thread().name) for i in range(6): print('thread name %s >>> %s' % (threading.current_thread().name, i)) time.sleep(random.random())
print('thread name %s Done' % threading.current_thread().name)

print('Main thread: ', threading.current_thread().name) t = threading.Thread(target=loop, name='LoopThread') t.start() t.join() print('Main thread %s Done' % threading.current_thread().name)

读写带进程锁的多线程创建

import threading

# 假定这是你的银行存款: balance = 0 lock = threading.Lock()

def change_it(n): # 先存后取,结果应该为0: global balance balance = balance + n balance = balance - n

def run_thread(n): for i in range(2000000): lock.acquire() try: change_it(n) finally: lock.release()

t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance)

Local 传递变量

使用local其实是省略了dict[threading.current_thread()],来保证取到的变量是本线程的变量

import threading

local_s = threading.local()

def start_thread(name): local_s.name = name proc_name()

def proc_name(): name = local_s.name print("%s, thread: %s" % (name, threading.current_thread().name))

p1 = threading.Thread(target=start_thread, args=("p1",), name="Thread_1") p2 = threading.Thread(target=start_thread, args=("p2",), name="Thread_2") p1.start() p2.start() p1.join() p2.join()

线程池

使用multiprocessing.pool.ThreadPool创建线程池

from multiprocessing.pool import ThreadPool

def foo(bar, baz): print 'hello {0}'.format(bar) return 'foo' + baz
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('elin', 'foo',))
result = async_result.get() # 取返回值

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 365433079@qq.com