python使用多任务提高爬虫效率

2023-01-08 21:09:34

一.我们为什么要使用多任务?

 

通常在使用request的时候,响应数据比较大的时候,程序一直在那里等待服务器的响应,而使得程序进入了阻塞状态,不能有效地利用电脑的cpu,如果数据量比较大的时候,这个阻塞浪费的时间就很明显了,如何解决就要涉及到多任务。

1.多进程爬取

为什么不像java或者c一样是多线程呢?

在python的实际中有一个GIL锁,为了保证资源的统一性,只有当python的一个线程运行结束,才可以运行下一个线程,

我们来对比一组数据,用来模拟使用request

不使用多线程进行多组url请求的发起

# 正常使用request进行爬取3个请求数据
import time

# 模拟一组需要获取的网页
urls = [i for i in range(1, 4)]


# 发送请求的方法
def GetResp(url):
    # 模拟request获取响应时阻塞
    time.sleep(2)
    return f"抓取成功{url}"


if __name__ == '__main__':
    # 循环获取每一个url的响应数据
    # 计时开始
    t1 = time.time()
    for url in urls:
        Resp = GetResp(url)
        print(Resp)
    # 计时结束
    t2 = time.time()
    print("总运行时间是", t2 - t1)

抓取成功1
抓取成功2
抓取成功3
总运行时间是 6.002578973770142

使用多线程进行多组url请求的发起

# 多进程使用request进行爬取3个请求数据
import time
from multiprocessing import Process, Queue

# 模拟一组需要获取的网页
urls = [i for i in range(1, 4)]


# 发送请求的方法
def GetResp(url, queue):
    # 模拟request获取响应时阻塞
    time.sleep(2)
    queue.put(f"抓取成功{url}")


if __name__ == '__main__':
    # 循环获取每一个url的响应数据
    # 计时开始
    t1 = time.time()
    jobs = []
    resp = []
    queues = []
    for url in urls:
        # 使用队列来进行进程间的数据共享
        queue = Queue()
        queues.append(queue)
        p = Process(target=GetResp, args=(url, queue))
        jobs.append(p)
        p.start()

    # 等待进程全部运行结束是程序结束
    for j, i in enumerate(jobs):
        i.join()
        resp.append(queues[j].get())
    print(resp)
    # 计时结束
    t2 = time.time()
    print("总运行时间是", t2 - t1)

['抓取成功1', '抓取成功2', '抓取成功3']
总运行时间是 2.1951146125793457

对比两个运行时间可以发现,在使用进程的时候的抓取效率要比普通的方式要高效得多

注意:不推荐使用多进程 

  • 因为每个进程是具有独立资源的,所以每开启释放一个进程,都会消耗大量的系统资源
  • 进程会占据大量的运行内存与cpu,不可能无限制的去开启进程,进程开多了电脑会崩溃掉

当开启1000个进程时:

可以使用线程池或者协程代替

2.线程池

import time
from multiprocessing.dummy import Pool

# 模拟一组需要获取的网页
urls = [i for i in range(1, 10)]

resp = []
# 发送请求的方法
def GetResp(url):
    # 模拟request获取响应时阻塞
    time.sleep(2)
    resp.append(url)


if __name__ == '__main__':
    # 创建线程池,池的大小就是Pool中的参数
    t1 = time.time()

    pool = Pool(5)
    pool.map(GetResp, urls)

    print(resp)
    t2 = time.time()
    print("总运行时间是", t2 - t1)

[2, 3, 4, 1, 5, 9, 8, 6, 7]
总运行时间是 4.03861403465271

 优点:不需要频繁的去创建摧毁进程线程,同时限制了进程线程的数量,同时线程之间共享数据,不需要去使用队列共享数据

3.协程(推荐使用) 

# 协程(单线程+多任务)在python3.6之后才有
# event_loop:事件循环(死循环),把任务注册到事件循环中,满足一定条件就去执行
# coroutine:协程对象,可以把协程对象注册到事件循环之中,以满足被反复调用
# async:创建一个方法,在调用这个方法是,并不会立即执行,会返回个协程对象
# task:任务对写成对象的进一步封装,包含任务的各个状态
# future:即将要执行的任务
# await:挂起(会在后台继续执行,并切换到别的协程)


import asyncio
import time

# 模拟一组需要获取的网页
urls = [i for i in range(1, 100)]

resp = []


# 发送请求的方法
async def GetResp(url):
    # 模拟request获取响应时阻塞
    # time.sleep(2)  不能使用time.sleep同步操作,否则异步任务会转变成同步任务
    # await:遇到阻塞就挂起进行切换
    await asyncio.sleep(2)
    resp.append(url)


if __name__ == '__main__':
    # 计时开始
    t1 = time.time()
    tasks = []
    # 通过循环把每一个协程任务丢到tasks里面
    for url in urls:
        # 创建一个协程任务
        # task = asyncio.create_task(GetResp(url))
        task = asyncio.ensure_future(GetResp(url))
        tasks.append(task)
    # 创建一个事件循环对象
    loop = asyncio.get_event_loop()
    # 开始执行任务,直到任务全部结束asyncio.wait(tasks)
    loop.run_until_complete(asyncio.wait(tasks))
    print(resp)
    # 计时结束
    t2 = time.time()
    print("总运行时间是", t2 - t1)

[1, 3, 7, 15, 31, 63, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 62, 64, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 30, 32, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 14, 16, 13, 12, 11, 10, 9, 6, 8, 5, 2, 4]
总运行时间是 2.0129077434539795

如果对比多进程多线程来看,使用协程的效率要比多线程多进程好得多。 

二. 协程并发量的控制

1.为什么要进行并发量的控制

  • 请求过快导致服务器宕机
  • 占用系统资源

2.控制并发量的方法

  • 使用asyncio.Semaphore(并发数)方法获取Semaphore对象
  • 在需要控制并发的语句前面加上async with semaphore:
async def 协程方法():
     async with semaphore对象:
        // 需要控制并发量的语句

3.改进后的代码

# 协程(单线程+多任务)在python3.6之后才有
# event_loop:事件循环(死循环),把任务注册到事件循环中,满足一定条件就去执行
# coroutine:协程对象,可以把协程对象注册到事件循环之中,以满足被反复调用
# async:创建一个方法,在调用这个方法是,并不会立即执行,会返回个协程对象
# task:任务对写成对象的进一步封装,包含任务的各个状态
# future:即将要执行的任务
# await:挂起(会在后台继续执行,并切换到别的协程)


import asyncio
import time

# 模拟一组需要获取的网页
urls = [i for i in range(1, 100)]

resp = []
semaphore = asyncio.Semaphore(20)


# 发送请求的方法
async def GetResp(url):
    # 并发量控制
    async with semaphore:
        # 模拟request获取响应时阻塞
        # time.sleep(2)  不能使用time.sleep同步操作,否则异步任务会转变成同步任务
        # await:遇到阻塞就挂起进行切换
        await asyncio.sleep(2)
        resp.append(url)


if __name__ == '__main__':
    # 计时开始
    t1 = time.time()
    tasks = []
    # 通过循环把每一个协程任务丢到tasks里面
    for url in urls:
        # 创建一个协程任务
        # task = asyncio.create_task(GetResp(url))
        task = asyncio.ensure_future(GetResp(url))
        tasks.append(task)
    # 创建一个事件循环对象
    loop = asyncio.get_event_loop()
    # 开始执行任务,直到任务全部结束asyncio.wait(tasks)
    loop.run_until_complete(asyncio.wait(tasks))
    print(resp)
    # 计时结束
    t2 = time.time()
    print("总运行时间是", t2 - t1)

[1, 3, 7, 15, 20, 19, 18, 17, 14, 16, 13, 12, 11, 10, 9, 6, 8, 5, 2, 4, 21, 23, 27, 35, 40, 39, 38, 37, 34, 36, 33, 32, 31, 30, 29, 26, 28, 25, 22, 24, 41, 43, 47, 55, 60, 59, 58, 57, 54, 56, 53, 52, 51, 50, 49, 46, 48, 45, 42, 44, 61, 63, 67, 75, 80, 79, 78, 77, 74, 76, 73, 72, 71, 70, 69, 66, 68, 65, 62, 64, 81, 83, 87, 95, 99, 98, 97, 94, 96, 93, 92, 91, 90, 89, 86, 88, 85, 82, 84]
总运行时间是 10.04484486579895

 这里设置的最大并发数是20,在100个请求下最快完成时间为10秒。

三.使用协程去实现批量图片的下载

补充:

  • async定义的函数的调用被视为阻塞
  • 使用aiohttp代替request进行请求(request被视为同步操作,就像上面说的time.sleep一样,会影响异步操作,同时aiohttp是基于asyncio开发,适合协程中的网络请求的发送)
  • 作者:豪豪喜欢吃猪肉
  • 原文链接:https://blog.csdn.net/m0_46623754/article/details/123600613
    更新时间:2023-01-08 21:09:34