# 前言

​ 多线程和多进程本身并不自带线程安全或进程安全的保障。这意味着,在没有采取适当控制措施的情况下,很可能会发生多个线程或进程同时访问同一资源的情况,进而引起资源竞争,导致程序的执行结果不符合预期。为了避免这种问题,可以考虑以下两种解决方案:

  1. 使用队列(Queue)来管理资源。队列的底层实现已经考虑到了线程安全和进程安全的问题,能有效避免资源竞争。

  2. 在访问资源时,采用进程锁或线程锁来进行同步控制,确保在任一时刻,只有一个线程或进程可以访问该资源。

这两种方法都可以有效地解决多线程和多进程环境下的资源竞争问题,提高程序的稳定性和可靠性。以下内容,我均采用的队列的形式来避免这一问题。

# 多线程 + 队列

# -*- coding: utf-8 -*-
import time
from queue import Queue
import threading
data_q = Queue() # 数据存放队列
th_number = 10 # 线程数
def put_data():
    """收集任务函数需要的参数,放入队列"""
    for i in range(0, 1001, 5):
        data_q.put(i)
    # 将任务函数参数消耗完毕的标志放入队列,用于跳出 while True 循环,结束子线程
    # 开起了几个线程,就要放入几个结束标准,否则子线程不会结束
    for _ in range(th_number):
        data_q.put(None)
        
        
def job(item):
    """任务函数"""
    print(f'{item} --> 消费')
    time.sleep(1)
    
    
    def thread_func():
        """多线程函数"""
        def run():
            while True:
                item = data_q.get() # 从队列中取出参数
                if item is None:
                    data_q.task_done() # 队列大小减 1
                    break
                job(item) # 执行任务函数
                data_q.task_done() # 队列大小减 1
                
        tasks = []
        for _ in range(th_number):
            t = threading.Thread(target=run)
        	tasks.append(t)
        for t in tasks:
            t.daemon = True # 启动守护线程
            t.start() # 启动线程
        for t in tasks:
        	t.join() # 等待线程执行结束
                    
                
if __name__ == '__main__':
    put_data()
    start = time.time()
    thread_func()
    print(f'{th_number} threads, cost time: {time.time() - start}')
    
    # 5 threads, cost time: 41.34183621406555
    # 10 threads, cost time: 21.197285175323486
    # 20 threads, cost time: 11.106141805648804

# 多进程 + 队列

  • 多进程之间的参数是不共享的,所以多进程的队列只能以参数的形式传递,不能通过
  • 全局变量的形式传递;
  • 多进程启动的函数不能是子函数,否则会报错,无法正常运行;
  • 队列不能用 from queue import Queue,否则也会报错;
  • 进程函数的入口必须是 if name == 'main':
# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue
import time
th_number = 10 # 进程数
def put_data():
    data_q = Queue() # 数据存放队列
    for i in range(0, 1001, 5):
    	data_q.put(i)
    for _ in range(th_number):
    	data_q.put(None)
    return data_q
def job(item):
    print(f'{item} --> 消费')
    time.sleep(1)
    
    
def run_func(data_q: Queue):
    while True:
        item = data_q.get()
        if item is None:
       		break
		job(item)
        
        
def process_func(data_q):
    tasks = []
    for _ in range(th_number):
        p = Process(target=run_func, args=(data_q,))
        tasks.append(p)
    for t in tasks:
        t.daemon = True
        t.start()
    for t in tasks:
    	t.join()
        
        
if __name__ == '__main__':
    data_q = put_data()
    start = time.time()
    process_func(data_q)
    print(f'{th_number} process, cost time: {time.time() - start}')
    
    # 10 process, cost time: 23.576398372650146

# 协程 + 队列

  • 协程函数前需要加 async;
  • 在协程函数中,如果要执行耗时操作,需要在操作前加 await。
# -*- coding: utf-8 -*-
import time
from queue import Queue
import asyncio
data_q = Queue() # 数据存放队列
async def job(item):
    print(f'{item} --> 消费')
    await asyncio.sleep(1)
    
    
async def thread_func():
    tasks = []
    while True:
        item = data_q.get()
        if item is None:
            data_q.task_done()
            break
        task = asyncio.create_task(job(item))
        tasks.append(task)
        data_q.task_done()
    await asyncio.wait(tasks)
    
    
def put_data():
    for i in range(0, 1001, 5):
    	data_q.put(i)
    data_q.put(None)
    
    
if __name__ == '__main__':
    put_data()
    start = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(thread_func())
    print(f'cost time: {time.time() - start}')
    
    # cost time: 1.019110918045044

# 线程池

# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
import time
data_q = []
th_number = 10 # 线程数
def job(item):
    print(f'{item} --> 消费')
    time.sleep(1)
    
    
def put_data():
    for i in range(0, 1001, 5):
    	data_q.append(i)
    
    
def main():
    put_data()
    with ThreadPoolExecutor(max_workers=th_number) as pool:
    	pool.map(job, data_q)
        
        
if __name__ == '__main__':
    start = time.time()
    main()
    print(f'{th_number} threads, cost time: {time.time() - start}')
    
    # 5 threads, cost time: 41.31677436828613
    # 10 threads, cost time: 21.17139196395874
    # 20 threads, cost time: 11.089375019073486

# 进程池

  • 进程池要慎用,很可能会因为代码逻辑问题导致资源没能及时释放,或子进程开启过多,致使机器变得十分卡顿,影响其他程序的正常运行甚至死机。
# -*- coding: utf-8 -*-
from multiprocessing import Pool
import time
th_number = 10 # 进程数
def job(item):
    print(f'{item} --> 消费')
    time.sleep(1)
    
    
def put_data():
    data_q = []
    for i in range(0, 1001, 5):
    	data_q.append(i)
    return data_q
def main():
    data_q = put_data()
    with Pool(maxtasksperchild=th_number) as pool:
    	pool.map(job, data_q)
        
        
if __name__ == '__main__':
    start = time.time()
    main()
    print(f'{th_number} process, cost time: {time.time() - start}')
    
    # 10 process, cost time: 16.681106567382812

以上内容都只是并发爬虫的基本实现,如果想要了解更多相关内容建议结合度娘及 chatGPT,特别是协程、线程池和进程池