文档结构  
翻译进度:已翻译     翻译赏金:0 元 (?)    ¥ 我要打赏

Python多线程模块是在2.6的版本中添加的.最初是由Jesse Noller 和 Richard Oudkerk定义在 PEP 371中.与threading创建线程的方式一样,multiprocessing处理模块允许你创建进程.现在你可以创建多个进程,充分挖掘机器上的多个处理器性能,但需要注意全局锁 (GIL) 的问题.

multiprocessing模块包含一些连threading模块都没有的接口(API).例如:Pool类允许你多点输入且并行执行.我们将在后续章节对Pool做进一步介绍.现在我们首先以multiprocessing 模块中的Process来做介绍.

第 1 段(可获 1.64 积分)

多线程之旅

Process 类与线程模块下的Thread类是非常相似的. 让我们来尝试创建多个进程然后调用相同的方法,看看它是怎么工作的:

import os
 
from multiprocessing import Process
 
def doubler(number):
    """
    A doubling function that can be used by a process
    """
    result = number * 2
    proc = os.getpid()
    print('{0} doubled to {1} by process id: {2}'.format(
        number, result, proc))
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
 
    for index, number in enumerate(numbers):
        proc = Process(target=doubler, args=(number,))
        procs.append(proc)
        proc.start()
 
    for proc in procs:
        proc.join()

本例中, 我们引入了 Process 类并且创建了 doubler 方法. 在空上方法内部, 我们将入参乘以2. 当然我们也可以使用Python中的 os模块来获取当前进程的id(或pid). 它将会告诉我们这个方法谁在调用. 在代码的底部, 我们创建了多个进程并启动他. 最后的一个 loop在每个进程上调用 join() 方法, 它将会告诉Python进程进入等待直到所有子进程结束. 如果需要结束一个进程, 你可以调用他的 terminate()方法.

当你运行这段代码时,将会看到如下的输出结果: 

第 2 段(可获 1.89 积分)
5 doubled to 10 by process id: 10468
10 doubled to 20 by process id: 10469
15 doubled to 30 by process id: 10470
20 doubled to 40 by process id: 10471
25 doubled to 50 by process id: 10472

有时,拥有便于阅读的进程名字将会更好. 非学幸运的是,  Process 类允许你获得相同的进程, 让我们来看一下:

import os
 
from multiprocessing import Process, current_process
 
 
def doubler(number):
    """
    A doubling function that can be used by a process
    """
    result = number * 2
    proc_name = current_process().name
    print('{0} doubled to {1} by: {2}'.format(
        number, result, proc_name))
 
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
    proc = Process(target=doubler, args=(5,))
 
    for index, number in enumerate(numbers):
        proc = Process(target=doubler, args=(number,))
        procs.append(proc)
        proc.start()
 
    proc = Process(target=doubler, name='Test', args=(2,))
    proc.start()
    procs.append(proc)
 
    for proc in procs:
        proc.join()

这一次, 我们加了些额外的东西: current_process.  current_process 基本上与线程模块中 current_thread用法是一致的.我们用它来获取调用函数的线程的名称. 你需要注意,前五个进程, 我并没有设置名称. 从第六个开始, 我们设置进程的名称为 “Test”. 让我们来看一下输出结果: 

5 doubled to 10 by: Process-2
10 doubled to 20 by: Process-3
15 doubled to 30 by: Process-4
20 doubled to 40 by: Process-5
25 doubled to 50 by: Process-6
2 doubled to 4 by: Test
第 3 段(可获 1.38 积分)

输出表明了多线程模块默认的为每个进程名字中加入了一个数字. 当然,如果我们设置了名字,数字将不会被加入.

multiprocessing 模块与多线程模块一样也是支持锁的. 你需要做的就是导入 **Lock**, 获取锁, 执行代码然后释放锁. 让我们来看一下:

from multiprocessing import Process, Lock
 
lock = Lock()
 
 
def printer(item):
    """
    Prints out the item that was passed in
    """
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()
 
if __name__ == '__main__':
    items = ['tango', 'foxtrot', 10]
    for item in items:
        p = Process(target=printer, args=(item,))
        p.start()
第 4 段(可获 0.93 积分)

我们创建了一个简单的打印函数,打印任何传递给它的内容。 为了避免线程间的相互干扰,我们使用了一个锁对象. 代码将迭代集合中的3个元素,并为每个元素创建一个进程. 每个进程将调用printer函数 并把当前元素做为参数传递给他.由于我们使用了锁, 下一个进程在锁释放之前将一直阻塞.

日志输出

日志记录过程与日志线程略有不同. 原因就是Python的logging包并没有使用进程间共享锁, 所以在处理不同进程的消息时,很容易混淆. 让我们在前面的示例中添加最基本的日志记录. 代码如下:

第 5 段(可获 1.75 积分)
import logging
import multiprocessing
 
from multiprocessing import Process, Lock
 
lock = Lock()
 
def printer(item):
    """
    Prints out the item that was passed in
    """
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()
 
if __name__ == '__main__':
    items = ['tango', 'foxtrot', 10]
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    for item in items:
        p = Process(target=printer, args=(item,))
        p.start()

最简单的方式就是将所有的日志输出到标准输出. 我们可以调用 log_to_stderr() 函数来完成. 然后我们调用 get_logger 函数来获取 logger 并设置日志记录级别为 INFO. 其它代码都是一样的. 如果你按照这种方式做了, 你将会看到类似的输出:

[INFO/Process-1] child process calling self.run()
tango
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-2] child process calling self.run()
[INFO/MainProcess] process shutting down
foxtrot
[INFO/Process-2] process shutting down
[INFO/Process-3] child process calling self.run()
[INFO/Process-2] process exiting with exitcode 0
10
[INFO/MainProcess] calling join() for process Process-3
[INFO/Process-3] process shutting down
[INFO/Process-3] process exiting with exitcode 0
[INFO/MainProcess] calling join() for process Process-2

如果要将日志记录到硬盘上, 会有些麻烦. 我们将在下一个示例中找到答案: 

import logging
import multiprocessing
 
from multiprocessing import Process, Lock
 
lock = Lock()
 
def create_logger():
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
 
    fh = logging.FileHandler("process.log")
 
    fmt = '%(asctime)s - %(levelname)s - %(message)s'
    formatter = logging.Formatter(fmt)
    fh.setFormatter(formatter)
 
    logger.addHandler(fh)
    return logger
 
def printer(item):
    """
    Prints out the item that was passed in
    """
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()
 
if __name__ == '__main__':
    items = ['tango', 'foxtrot', 10]
    logger = create_logger()
    for item in items:
        p = Process(target=printer, args=(item,))
        p.start()
第 6 段(可获 1.15 积分)

刚刚我们添加了 create_logger() 函数,然后在 multiprocessing 模块内部调用 get_logger. 现在我们有一个记录日志的对象, 我们可以添加一个 FileHandlerFormatter, 就像我做的那样. 如果你现在运行代码, 日志将会记录到硬盘上, 并且在标准输出中你只能看到传递给 printer 函数输出内容. 在我的机器上,他的输出结果是这样的:

2016-07-28 16:23:28,877 - INFO - child process calling self.run()
2016-07-28 16:23:28,878 - INFO - process shutting down
2016-07-28 16:23:28,878 - INFO - process exiting with exitcode 0
2016-07-28 16:23:28,878 - INFO - child process calling self.run()
2016-07-28 16:23:28,878 - INFO - process shutting down
2016-07-28 16:23:28,878 - INFO - process exiting with exitcode 0
2016-07-28 16:23:28,879 - INFO - process shutting down
2016-07-28 16:23:28,879 - INFO - child process calling self.run()
2016-07-28 16:23:28,879 - INFO - calling join() for process Process-3
2016-07-28 16:23:28,879 - INFO - process shutting down
2016-07-28 16:23:28,879 - INFO - process exiting with exitcode 0
2016-07-28 16:23:28,880 - INFO - calling join() for process Process-2
第 7 段(可获 1.04 积分)

每次运行这段代码的时候,日志文件内容都是不一样的, 因为进程执行的顺序稍微有些不一样.

线程池

Pool 代表所有工作进程的线程池. 它包含一些方法,允许将工作进程停止掉. 让我们来看一个简单的例子:

from multiprocessing import Pool
 
def doubler(number):
    return number * 2
 
if __name__ == '__main__':
    numbers = [5, 10, 20]
    pool = Pool(processes=3)
    print(pool.map(doubler, numbers))

在这里我们创建了一个线程池,并在池子里创建了三个进程. 然后运用 map 方法对每个元素执行相同的函数. 最后打印出结果, 当然它的最终的结果是: [10, 20, 40].

第 8 段(可获 1.39 积分)

你也可以运用线程池调用 apply_async 函数得到相同的结果:

from multiprocessing import Pool
 
def doubler(number):
    return number * 2
 
if __name__ == '__main__':
    pool = Pool(processes=3)
    result = pool.apply_async(doubler, (25,))
    print(result.get(timeout=1))

它允许我们直接的请求线程的执行结果. 这就是 get 函数的由来. 它尝试获取执行结果. 需要注意的是,设置了timeout参数,当进程在执行过程中发生任何异常时超时返回. 我们并不希望它无限期的阻塞.

进程间通信

当进程间需要通信的时候,  multiprocessing 模块有两个主要的方法: Queues 和 Pipes.  Queue 实际上是线程和进程的安全实现. 让我们来看一个基于Queue的一个实例,发表在 线程相关文章:

from multiprocessing import Process, Queue
 
sentinel = -1
 
def creator(data, q):
    """
    Creates data to be consumed and waits for the consumer
    to finish processing
    """
    print('Creating data and putting it on the queue')
    for item in data:
 
        q.put(item)
 
 
def my_consumer(q):
    """
    Consumes some data and works on it
 
    In this case, all it does is double the input
    """
    while True:
        data = q.get()
        print('data found to be processed: {}'.format(data))
        processed = data * 2
        print(processed)
 
        if data is sentinel:
            break
 
 
if __name__ == '__main__':
    q = Queue()
    data = [5, 10, 13, -1]
    process_one = Process(target=creator, args=(data, q))
    process_two = Process(target=my_consumer, args=(q,))
    process_one.start()
    process_two.start()
 
    q.close()
    q.join_thread()
 
    process_one.join()
    process_two.join()
第 9 段(可获 1.65 积分)

在这里我们导入了 Queue 和 Process. 程序有两个功能:一个生产数据并将数据加入到 queue ,另一个消费数据并处理数据. 将数据加入到 Queue 调用的是Queue的put() 方法,相反的从Queue 中获取数据用 get 方法. 最后的一段代码创建 Queue 对象并创建了一组进程,然后启动进程. 你会注意到在进程上我调用用了 join() ,而不是在 Queue 上调用.

结束语

讲了很多案例. 你也学习了在函数中使用 multiprocessing 模块, 通过 Queues来实现进程间通信, 修改线程名称等等. 与Python 官方文档相比,还有许多在这里并没有涉及 , 所以需要你自己去深入学习. 与此同时, 你现在知道如何运用Python来发挖计算机的处理能力!

第 10 段(可获 2.13 积分)

文章评论