盒子
盒子
文章目录
  1. 进程
    1. multiprocessing
    2. concurrent.futures.ProcessPoolExecutor
  2. 线程
    1. threading
    2. concurrent.futures.ThreadPoolExecutor

进程 / 线程

进程

multiprocessing

  • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。
  • multiprocessing提供的threading包中没有的IPC(比如Pipe和Queue),效率上更高。
  • 应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
  • 所有进程的任务在打印时都会向同一个标准输出(stdout)输出
# -*- coding: gbk -*-
import multiprocessing
import time

def func(i):
print("start", i)
time.sleep(1)
print("end", i)

def start():
pros = list()
# 创建进程池
for i in range(10):
pros.append(multiprocessing.Process(target=func, args=(i, )))
# 启动
for p in pros:
p.start()
# 活动子进程
print(multiprocessing.active_children())
# 等待终止
for p in pros:
p.join()
# 返回值
print('%15s.exitcode = %s' % (p.name, p.exitcode))

if __name__ == '__main__':
start()
# 是否活动
p.is_alive()
# 强制终止
p.terminate()
# 返回值
p.join()
print('%15s.exitcode = %s' % (p.name, p.exitcode))
  • 进程同步:管道,队列
# 进程安全:管道
from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()
# 进程安全:队列
from multiprocessing import Process, Queue

def f(q):
#队列满时阻塞
q.put([42, None, 'hello'])

if __name__ == '__main__':
q = Queue(10) #队列最大存储10条数据, 不指定则代表无限定
p = Process(target=f, args=(q,))
p.start()
print q.get() #无数据时阻塞
p.join()

concurrent.futures.ProcessPoolExecutor

# 批量创建:map机制, 自动等待结束
from concurrent.futures import ProcessPoolExecutor

def fun(params):
time.sleep(0.1)
print(params)

if __name__ == "__main__":
start = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
results = list(pool.map(fun, [(i,) for i in range(10)]))
print('Took %.3f seconds.' % (time.time() - start))
# 单个创建:submit机制, 程序控制结束
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=2) as pool:
futures = [pool.submit(fun, (i, )) for i in range(10)]

# result等待结束, 并获得返回值
print('results: %s' % [future.result() for future in futures])

# wait等待全部结束: ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
# 返回(done, unfinished),分别为已完成/未完成future对象列表
done, unfinished = wait(futures, timeout=2, return_when=concurrent.futures.ALL_COMPLETED)

线程

threading

  • threading.currentThread(): 返回当前的线程变量。
  • threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
  • 线程对象方法:
    • isAlive(): 返回线程是否活动的。
    • getName(): 返回线程名。
    • setName(): 设置线程名。
  • 线程队列import queue
    • q = queue.Queue(10) 创建同步的、线程安全的队列
    • qsize() 返回队列的大小
    • empty() 如果队列为空,返回True,反之False
    • full() 如果队列满了,返回True,反之False, full 与 maxsize 大小对应
    • get([block[, timeout]]) 获取队列,timeout等待时间
    • get_nowait() 相当Queue.get(False)
    • put(item) 写入队列,timeout等待时间
    • put_nowait(item) 相当Queue.put(item, False)
    • task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
    • join() 实际上意味着等到队列为空,再执行别的操作
import time
import threading

def hello(i):
time.sleep(0.1)
print(i)

if __name__ == '__main__':
t = threading.Thread(target=hello, args=(1,))
t.setDaemon(True) # 设置跟随父线程
t.setName('t') # 设置线程名称
t.start()
t.join()
print(t.getName()) # 获取线程名称
print(t.isAlive()) # 是否活动
# 继承模式,创建自己的线程类
class MyThread(threading.Thread):
def __init__(self, name, lock):
threading.Thread.__init__(self)
self.name = name
self.lock = lock

def run(self):
print("开始线程:" + self.name)
time.sleep(0.1)
self.lock.acquire() # 锁定
print("退出线程:" + self.name)
self.lock.release() # 解锁

if __name__ == '__main__':
lock = threading.Lock() # 线程锁
ts = [MyThread("Thread-"+str(i), lock) for i in range(10)]
# 设置跟随父线程
[t.setDaemon(True) for t in ts]
[t.start() for t in ts]
[t.join() for t in ts]

concurrent.futures.ThreadPoolExecutor

# map机制,支持submit控制单个进程,见concurrent.futures.ProcessPoolExecutor例子
import time
from concurrent.futures import ThreadPoolExecutor

def fun(params):
time.sleep(0.1)
print(params)

if __name__ == "__main__":
start = time.time()
with ThreadPoolExecutor(max_workers=5) as pool:
results = list(pool.map(fun, [(i,) for i in range(10)]))
print('Took %.3f seconds.' % (time.time() - start))
支持一下
扫一扫,支持一下
  • 微信扫一扫
  • 支付宝扫一扫