进程就是操作系统中执行的一个程序,操作系统以进程为单位分配存储空间,每个进程都有自己的地址空间、数据栈以及其他用于跟踪进程执行的辅助数据,操作系统管理所有进程的执行,为它们合理的分配资源。进程可以通过fork或spawn的方式来创建新的进程来执行其他的任务,不过新的进程也有自己独立的内存空间,因此必须通过进程间通信机制(IPC,Inter-Process Communication)来实现数据共享,具体的方式包括管道、信号、套接字、共享内存区等。

一个进程还可以拥有多个并发的执行线索,简单的说就是拥有多个可以获得CPU调度的执行单元,这就是所谓的线程。由于线程在同一个进程下,它们可以共享相同的上下文,因此相对于进程而言,线程间的信息共享和通信更加容易。

Python实现并发编程主要有3种方式:多进程、多线程、多进程+多线程。

多进程

Unix和Linux操作系统上提供了fork()系统调用来创建进程,调用fork()函数的是父进程,创建出的是子进程,子进程是父进程的一个拷贝,但是子进程拥有自己的PID。fork()函数非常特殊它会返回两次,父进程中可以通过fork()函数的返回值得到子进程的PID,而子进程中的返回值永远都是0。Python的os模块提供了fork()函数。由于Windows系统没有fork()调用,因此要实现跨平台的多进程编程,可以使用multiprocessing模块的Process类来创建子进程,而且该模块还提供了更高级的封装,例如批量启动进程的进程池(Pool)、用于进程间通信的队列(Queue)和管道(Pipe)等。

from multiprocessing import Process
from os import getpid
from random import randint
from time import time,sleep

def down_task(filename):
    print('启动下载进程,进程号[%d].'%getpid())
    print('开始下载%s...'%filename)
    time_to_download=randint(5,10)
    sleep(time_to_download)
    print('%s下载完成!耗费了%d秒'%(filename,time_to_download))

def main():
    start=time()
    p1=Process(target=down_task,args=('Python程序设计.pdf',))
    p1.start()
    p2=Process(target=down_task,args=('Python自动化运维.pdf',))
    p2.start()
    p1.join()
    p2.join()
    end=time()
    print('总共耗费了%.2f秒'%(end-start))

if __name__ == '__main__':
    main()

'''
输出结果:
启动下载进程,进程号[3944].
开始下载Python程序设计.pdf...
启动下载进程,进程号[3912].
开始下载Python自动化运维.pdf...
Python程序设计.pdf下载完成!耗费了6秒
Python自动化运维.pdf下载完成!耗费了7秒
总共耗费了7.22秒
'''

在上面的代码中,我们通过Process类创建了进程对象,通过target参数我们传入一个函数来表示进程启动后要执行的代码,后面的args是一个元组,它代表了传递给函数的参数。Process对象的start方法用来启动进程,而join方法表示等待进程执行结束。运行上面的代码可以明显发现两个下载任务“同时”启动了,而且程序的执行时间将大大缩短,不再是两个任务的时间总和。

如果要启动大量的子进程,可以用进程池的方式批量创建子进程

from multiprocessing import Pool
from os import getpid
from random import randint
from time import time,sleep

def down_task(filename):
    print('启动下载进程,子进程号[%d].'% getpid())
    print('开始下载%s...'%filename)
    time_to_download=randint(5,10)
    sleep(time_to_download)
    print('%s下载完成!耗费了%d秒'%(filename,time_to_download))

if __name__ == '__main__':
    start = time()
    print('启动下载进程,父进程号[%d].'% getpid())
    p=Pool(4)
    for i in range(1,6):
        filename='python程序设计第%d版本.pdf' % i
        p.apply_async(down_task,args=(filename,))
    p.close()
    p.join()
    end=time()
    print('总共耗费了%.2f秒'%(end-start))


'''
输出结果:
启动下载进程,父进程号[3220].
启动下载进程,子进程号[7404].
开始下载python程序设计第1版本.pdf...
启动下载进程,子进程号[12776].
开始下载python程序设计第2版本.pdf...
启动下载进程,子进程号[2324].
开始下载python程序设计第3版本.pdf...
启动下载进程,子进程号[7680].
开始下载python程序设计第4版本.pdf...
python程序设计第2版本.pdf下载完成!耗费了7秒
启动下载进程,子进程号[12776].
开始下载python程序设计第5版本.pdf...
python程序设计第4版本.pdf下载完成!耗费了8秒
python程序设计第1版本.pdf下载完成!耗费了9秒
python程序设计第3版本.pdf下载完成!耗费了9秒
python程序设计第5版本.pdf下载完成!耗费了9秒
总共耗费了16.40秒
'''

对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:

import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

如果子进程还需要输入,则可以通过communicate()方法输入:

import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('gbk'))
print('Exit code:', p.returncode)

上面的代码相当于在命令行执行命令nslookup,然后手动输入:

set q=mx
python.org
exit

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。
我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()


'''
输出结果:
Process to write: 2112
Put A to queue...
Process to read: 11132
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
'''

多线程

在Python早期的版本中就引入了thread模块(现在名为_thread)来实现多线程编程,然而该模块过于底层,而且很多功能都没有提供,因此目前的多线程开发我们推荐使用threading模块,该模块对多线程编程提供了更好的面向对象的封装。

from random import randint
from threading import Thread
from time import time,sleep

def down_task(filename):
    print('开始下载%s...'%filename)
    time_to_download=randint(5,10)
    sleep(time_to_download)
    print('%s下载完成!耗费了%d秒'%(filename,time_to_download))

def main():
    start=time()
    t1=Thread(target=down_task,args=('Python程序设计.pdf',))
    t1.start()
    t2=Thread(target=down_task,args=('Python自动化运维.pdf',))
    t2.start()
    t1.join()
    t2.join()
    end=time()
    print('总共耗费了%.2f秒'%(end-start))

if __name__ == '__main__':
    main()

'''
输出结果:
开始下载Python程序设计.pdf...
开始下载Python自动化运维.pdf...
Python程序设计.pdf下载完成!耗费了8秒
Python自动化运维.pdf下载完成!耗费了10秒
总共耗费了10.00秒
'''

也可以通过继承Thread类的方式来创建自定义的线程类,然后再创建线程对象并启动线程。

from random import randint
from threading import Thread
from time import time,sleep

class down_task(Thread):
    def __init__(self,filename):
        super().__init__()
        self._filename=filename
    def run(self):
        print('开始下载%s...'%self._filename)
        time_to_download=randint(5,10)
        sleep(time_to_download)
        print('%s下载完成!耗费了%d秒'%(self._filename,time_to_download))

def main():
    start=time()
    t1=down_task('Python程序设计.pdf')
    t1.start()
    t2=down_task('Python自动化运维.pdf')
    t2.start()
    t1.join()
    t2.join()
    end=time()
    print('总共耗费了%.2f秒'%(end-start))

if __name__ == '__main__':
    main()

'''
输出结果:
开始下载Python程序设计.pdf...
开始下载Python自动化运维.pdf...
Python程序设计.pdf下载完成!耗费了8秒
Python自动化运维.pdf下载完成!耗费了10秒
总共耗费了10.00秒
'''

因为多个线程可以共享进程的内存空间,因此要实现多个线程间的通信相对简单,大家能想到的最直接的办法就是设置一个全局变量,多个线程共享这个全局变量即可。但是当多个线程共享同一个变量(我们通常称之为“资源”)的时候,很有可能产生不可控的结果从而导致程序失效甚至崩溃。如果一个资源被多个线程竞争使用,那么我们通常称之为“临界资源”,对“临界资源”的访问需要加上保护,否则资源会处于“混乱”的状态。

下面的例子演示了100个线程向同一个银行账户转账(转入1元钱)的场景,在这个例子中,银行账户就是一个临界资源,在没有保护的情况下我们很有可能会得到错误的结果。

from time import sleep
from threading import Thread


class Account(object):

    def __init__(self):
        self._balance = 0

    def deposit(self, money):
        # 计算存款后的余额
        new_balance = self._balance + money
        # 模拟受理存款业务需要0.01秒的时间
        sleep(0.01)
        # 修改账户余额
        self._balance = new_balance

    @property
    def balance(self):
        return self._balance


class AddMoneyThread(Thread):

    def __init__(self, account, money):
        super().__init__()
        self._account = account
        self._money = money

    def run(self):
        self._account.deposit(self._money)


def main():
    account = Account()
    threads = []
    # 创建100个存款的线程向同一个账户中存钱
    for _ in range(100):
        t = AddMoneyThread(account, 1)
        threads.append(t)
        t.start()
    # 等所有存款的线程都执行完毕
    for t in threads:
        t.join()
    print('账户余额为: ¥%d元' % account.balance)


if __name__ == '__main__':
    main()

'''
输出结果:
账户余额为: ¥2元
'''

100个线程分别向账户中转入1元钱,结果居然远远小于100元。之所以出现这种情况是因为我们没有对银行账户这个“临界资源”加以保护,多个线程同时向账户中存钱时,会一起执行到new_balance = self._balance + money这行代码,多个线程得到的账户余额都是初始状态下的0,所以都是0上面做了+1的操作,因此得到了错误的结果。在这种情况下,“锁”就可以派上用场了。我们可以通过“锁”来保护“临界资源”,只有获得“锁”的线程才能访问“临界资源”,而其他没有得到“锁”的线程只能被阻塞起来,直到获得“锁”的线程释放了“锁”,其他线程才有机会获得“锁”,进而访问被保护的“临界资源”。下面的代码演示了如何使用“锁”来保护对银行账户的操作,从而获得正确的结果。

from time import sleep
from threading import Thread, Lock


class Account(object):

    def __init__(self):
        self._balance = 0
        self._lock = Lock()

    def deposit(self, money):
        # 先获取锁才能执行后续的代码
        self._lock.acquire()
        try:
            new_balance = self._balance + money
            sleep(0.01)
            self._balance = new_balance
        finally:
            # 在finally中执行释放锁的操作保证正常异常锁都能释放
            self._lock.release()

    @property
    def balance(self):
        return self._balance


class AddMoneyThread(Thread):

    def __init__(self, account, money):
        super().__init__()
        self._account = account
        self._money = money

    def run(self):
        self._account.deposit(self._money)


def main():
    account = Account()
    threads = []
    for _ in range(100):
        t = AddMoneyThread(account, 1)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print('账户余额为: ¥%d元' % account.balance)


if __name__ == '__main__':
    main()

'''
输出结果:
账户余额为: ¥100元
'''

Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。
所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。

import threading
    
# 创建全局ThreadLocal对象:
local_school = threading.local()

def process_student():
    # 获取当前线程关联的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(arg):
    # 绑定ThreadLocal的student:
    local_school.student = arg
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。你可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
可以理解为全局变量local_school是一个dict,不但可以用local_school.student,还可以绑定其他变量,如local_school.teacher等等。
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。

分布式进程

在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。
Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?
原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。
我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:

import random,time,queue
from multiprocessing.managers import BaseManager
#发送任务队列
task_queue=queue.Queue()
#接收结果队列
result_queue=queue.Queue()

#从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass
def return_task_queue():
    #global 用于函数内部,修改全局变量的值
    global task_queue
    return task_queue
def return_result_queue():
    global result_queue
    return result_queue
if __name__=='__main__':
    #将两个Queue注册到网络上,callable参数关联Queue对象
    QueueManager.register('get_task_queue',callable=return_task_queue)
    QueueManager.register('get_result_queue',callable=return_result_queue)
    #绑定端口5000,设置验证码abc,通过QueueManager将Queue暴露出去
    manager=QueueManager(address=('127.0.0.1',5000),authkey=b'abc')
    manager.start()
    # 获得通过网络访问的Queue对象:
    task=manager.get_task_queue()
    result=manager.get_result_queue()
    #放10个任务进去
    for i in range(10):
        n=random.randint(0,1000)
        print('Put task %d...'%n)
        #将数据放到任务队列
        task.put(n)
    #取任务执行结果
    print('Try get results...')
    for i in range(10):
        #从结果队列中取结果
        r=result.get(timeout=10)
        print('REsult:%s'%r)
    #关闭
    manager.shutdown()
    print('master end')

当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。

然后,在另一台机器上启动任务进程(本机上启动也可以):

import time, sys, queue
from multiprocessing.managers import BaseManager


# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass


if __name__ == '__main__':

    # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')

    # 连接到服务器,也就是运行task_master.py的机器:
    server_addr = '127.0.0.1'
    print('Connect to server %s...' % server_addr)
    # 端口设置和task_master.py中一样
    m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
    # 从网络连接:
    m.connect()

    # 获取Queue的对象:
    task = m.get_task_queue()
    result = m.get_result_queue()

    for i in range(10):
        try:
            # 接收任务队列中的数据
            n = task.get(timeout=1)
            print('Run task %d*%d' % (n, n))
            r = '%d*%d=%d' % (n, n, n * n)
            time.sleep(1)
            # 放进结果队列
            result.put(r)
        except queue.Queue.Empty:
            print('task queue is empty')
    print('work done')

这个Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。

List of learning reference documents

  • https://github.com/lovevantt/Python-100-Days
  • https://www.liaoxuefeng.com/wiki/1016959663602400