python 多进程提升模型效率


多进程内默认不共享全局变量!!!

使用到的全局变量,需要在函数中修改的话,就涉及到歧义问题,(而且进程内好像无法对全局变量赋值),因此,需要修改全局变量a,可以在”a = 2”之前加入global a声明,如:

#! /usr/bin/python
from multiprocessing import Process, Pool
import os
a = 1
b = [2, 3]


def func(name):
    global a
    print name
    a = 2
    print "in func a:", a
    b[0] = 1
    print "in func b:", b



if __name__ == '__main__':

    print "before func a:", a
    print "before func b:", b
    # func('diaoying')


    print 'Parent process %s.' % os.getpid()
    p = Process(target=func, args=('process',))
    print 'Process will start.'
    p.start()
    p.join()
    print 'Process end.'
    print "after func a:", a
    print "after func b:", b

# 输出结果
before func a: 1
before func b: [2, 3]
Parent process 29736.
Process will start.
process
in func a: 2
in func b: [1, 3]
Process end.
after func a: 1
after func b: [2, 3]

multiprocessing模块

multiprocessing模块就是跨平台版本的多进程模块。
开辟子进程
multiprocessing中提供了Process类来生成进程实例
Process([group [, target [, name [, args [, kwargs]]]]])

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Process(target=run_proc, args=('test',))
    print 'Process will start.'
    p.start()
    p.join()
    print 'Process end.'

# 结果:
Parent process 928.
Process will start.
Run child process test (929)...
Process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。

join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

Multiprocessing Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print 'Run task %s (%s)...' % (name, os.getpid())
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

多进程模块multiprocessing,这里主要是介绍multiprocessing下的Pool的几个函数
1. apply(func[, args[, kwds]])
apply用于传递不定参数,同python中的apply函数一致(不过内置的apply函数从2.3以后就不建议使用了),主进程会阻塞于函数。

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."

# 输出结果
msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done. 

这个时候主进程的执行流程同单进程一致
2. apply_async(func[, args[, kwds[, callback]]])
与apply用法一致,但它是非阻塞的且支持结果返回后进行回调。

import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"
    return "done" + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."

# 输出结果
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

例二

result = [arg1,arg2,...]    
for x in gen_list(l):
    result = pool.apply_async(pool_test, (x, ))
    print 'main_process'
    pool.close()
    pool.join()
for res in result:
    print res1.get()
print "all subprocesses have run!"   
import multiprocessing
import time

def func(msg):
    return multiprocessing.current_process().name + '-' + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4) # 创建4个进程
    results = []
    for i in xrange(10):
        msg = "hello %d" %(i)
        results.append(pool.apply_async(func, (msg, )))
    pool.close() # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用
    pool.join() # 等待进程池中的所有进程执行完毕
    print ("Sub-process(es) done.")

    for res in results:
        print (res.get())

# 输出结果
Sub-process(es) done.
PoolWorker-37-hello 0
PoolWorker-38-hello 1
PoolWorker-39-hello 2
PoolWorker-40-hello 3
PoolWorker-37-hello 4
PoolWorker-38-hello 5
PoolWorker-39-hello 6
PoolWorker-37-hello 7
PoolWorker-40-hello 8
PoolWorker-38-hello 9

这个时候主进程循环运行过程中不等待apply_async的返回结果,在主进程结束后,即使子进程还未返回整个程序也会就退出。虽然apply_async是非阻塞的,但其返回结果的get方法却是阻塞的,在本例中result.get()会阻塞主进程。因此可以这样来处理返回结果:
[x.get() for x in [pool.apply_async(pool_test, (x,)) for x in gen_list(l)]]
3. map(func, iterable[, chunksize])
map方法与内置的map函数行为基本一致,但是multiprocess中的map函数是多进程,而python内建map函数是单进程,在它会使进程阻塞与此直到结果返回。
但需注意的是其第二个参数虽然描述的为iterable, 但在实际使用中发现只有在整个队列全部就绪后,程序才会运行子进程。
4. map_async(func, iterable[, chunksize[, callback]])
与map用法一致,但是它是非阻塞的。其有关事项见apply_async。

  1. close()
    关闭进程池(pool),使其不在接受新的任务。

  2. terminate()
    结束工作进程,不在处理未处理的任务。

  3. join()
    主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用。

#!/usr/bin/env python
#encoding: UTF-8

from multiprocessing import Pool
import time

def fun(x):
    time.sleep(2)
    return "subprocess run result : %d" % (x*x+x)

if __name__ == '__main__':
     pool = Pool(processes=10)     //开始4个进程
     result = []
     for i in range(10):
         result.append(pool.apply_async(fun,(i,)))    //异步调用fun
     pool.close()
     pool.join()
     for res in result:
           print res.get()                            //用get()获得结果
     print "[apply] all subprocesses have run!"

     pool = Pool(processes=10)
     result1 = []
     #for i in range(10):
     result1.append(pool.map_async(fun,range(10)))
     pool.close()
     pool.join()
     for res1 in result1:
         print res1.get()
     print "[map] all subprocesses have run!"       

使用多个进程池

#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
    end = time.time()
    print 'Task Lee, runs %0.2f seconds.' %(end - start)

def Marlon():
    print "\nRun task Marlon-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print 'Task Marlon runs %0.2f seconds.' %(end - start)

def Allen():
    print "\nRun task Allen-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print 'Task Allen runs %0.2f seconds.' %(end - start)

def Frank():
    print "\nRun task Frank-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print 'Task Frank runs %0.2f seconds.' %(end - start)

if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print "parent process %s" %(os.getpid())

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print 'Waiting for all subprocesses done...'
    pool.close()
    pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print 'All subprocesses done.'

# 输出结果
parent process 7704

Waiting for all subprocesses done...
Run task Lee-6948

Run task Marlon-2896

Run task Allen-7304

Run task Frank-3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.

Python map() 函数

map() 会根据提供的函数对指定序列做映射。
是单进程
第一个参数 function 以参数序列中的每一个元素调用function 函数,返回包含每次 function 函数返回值的新列表。

语法
map() 函数语法:
map(function, iterable, ...)

参数
function – 函数,有两个参数
iterable – 一个或多个序列

>>>def square(x) :            # 计算平方数
...     return x ** 2
... 
>>> map(square, [1,2,3,4,5])   # 计算列表各个元素的平方
[1, 4, 9, 16, 25]
>>> map(lambda x: x ** 2, [1, 2, 3, 4, 5])  # 使用 lambda 匿名函数
[1, 4, 9, 16, 25]

# 提供了两个列表,对相同位置的列表数据进行相加
>>> map(lambda x, y: x + y, [1, 3, 5, 7, 9], [2, 4, 6, 8, 10])
[3, 7, 11, 15, 19]

转载自:https://blog.csdn.net/u013381011/article/details/79541471

You may also like...