1. 协程基本原理

1.1 案例引入

import logging
import time
import requests

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s: %(message)s')

URL = "https://www.httpbin.org/delay/5"
TOTAL_NUMBER = 10


start_time = time.time()

for _ in range(1, TOTAL_NUMBER):
    logging.info(f"scraping {URL}")
    response = requests.get(URL)

end_time = time.time()
logging.info(f"total time {end_time - start_time}")

1.2 基础知识

阻塞

阻塞

同步

异步

进程

协程

1.3 协程用法

1.4 定义协程

import asyncio


async def execute(x):
    print(f"Number: {x}")

coroutine = execute(1)
print(f"Coroutine: {coroutine}")
print("After calling execute")

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print("After calling loop")
import asyncio


async def execute(x):
    print(f"Number: {x}")

coroutine = execute(1)
print(f"Coroutine: {coroutine}")
print("After calling execute")

loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(f"Task: {task}")

loop.run_until_complete(task)
print(f"Task: {task}")
print("After calling loop")
import asyncio


async def execute(x):
    print(f"Number: {x}")

coroutine = execute(1)
print(f"Coroutine: {coroutine}")
print("After calling execute")

task = asyncio.ensure_future(coroutine)
print(f"Task: {task}")

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(f"Task: {task}")
print("After calling loop")

1.5 绑定回调

import asyncio
import requests


async def request():
    url = "https://www.baidu.com/"
    status = requests.get(url)
    return status


def callback(task):
    print(f"Status: {task.result()}")


coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print(f"Task: {task}")

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(f"Task: {task}")
  • 等效于
import asyncio
import requests


async def request():
    url = "https://www.baidu.com/"
    status = requests.get(url)
    return status


coroutine = request()
task = asyncio.ensure_future(coroutine)
print(f"Task: {task}")

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(f"Task: {task}")
print(f"Status: {task.result()}")

1.6 多任务协程

import asyncio
import requests


async def request():
    url = "https://www.baidu.com/"
    status = requests.get(url)
    return status


tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print(f"Tasks: {tasks}")

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print(f"Task result: {task.result()}")

1.7 协程实现

import asyncio
import time
import requests


async def request():
    url = "https://www.httpbin.org/delay/5"
    print(f"Waiting for {url}")
    response = requests.get(url)
    print(f"Response: {response} from {url}")

start = time.time()

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print(f"Cost time: {end - start}")
import asyncio
import time
import requests


async def request():
    url = "https://www.httpbin.org/delay/5"
    print(f"Waiting for {url}")
    response = await requests.get(url)
    print(f"Response: {response} from {url}")

start = time.time()

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print(f"Cost time: {end - start}")
import asyncio
import time
import requests


async def get(url):
    return requests.get(url)


async def request():
    url = "https://www.httpbin.org/delay/5"
    print(f"Waiting for {url}")
    response = await get(url)
    print(f"Response: {response} from {url}")

start = time.time()

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print(f"Cost time: {end - start}")

1.8 使用aiohttp

安装

pip3 install aiohttp

使用

import asyncio
import time
import aiohttp


async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    await response.text()
    await session.close()
    return response


async def request():
    url = "https://www.httpbin.org/delay/5"
    print(f"Waiting for {url}")
    response = await get(url)
    print(f"Response: {response} from {url}")

start = time.time()

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print(f"Cost time: {end - start}")

2. aiohttp的使用

2.1 基本介绍

2.2 基本实例

import aiohttp
import asyncio


async def fetch(session, url):
    # 上下文管理器自动分配和释放资源
    async with session.get(url) as response:
        return await response.json(), response.status


async def main():
    # 上下文管理器自动分配和释放资源
    async with aiohttp.ClientSession() as session:
        url = "https://www.httpbin.org/delay/5"
        html, status = await fetch(session, url)
        print(f"html: {html}")
        print(f"status: {status}")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    # 较高版本python可以显示声明事件循环
    asyncio.run(main())

2.3 URL参数设置

import aiohttp
import asyncio


async def main():
    params = {"name": "abc", "age": 10}
    async with aiohttp.ClientSession() as session:
        async with session.get("https://www.httpbin.org/get", params=params) as response:
            print(await response.text())

if __name__ == "__main__":
    asyncio.run(main())

2.4 其他请求类型

session.post("https://www.httpbin.org/post", data=b"data")
session.put("https://www.httpbin.org/put", data=b"data")
session.delete("https://www.httpbin.org/delete")
session.head("https://www.httpbin.org/get")
session.options("https://www.httpbin.org/get")
session.patch("https://www.httpbin.org/patch", data=b"data")

2.5 POST请求

表单提交

import aiohttp
import asyncio


async def main():
    data = {"name": "abc", "age": 10}
    async with aiohttp.ClientSession() as session:
        async with session.post("https://www.httpbin.org/post", data=data) as response:
            print(await response.text())

if __name__ == "__main__":
    asyncio.run(main())

JSON数据提交

import aiohttp
import asyncio


async def main():
    data = {"name": "abc", "age": 10}
    async with aiohttp.ClientSession() as session:
        async with session.post("https://www.httpbin.org/post", json=data) as response:
            print(await response.text())

if __name__ == "__main__":
    asyncio.run(main())

2.6 响应

import aiohttp
import asyncio


async def main():
    data = {"name": "abc", "age": 10}
    async with aiohttp.ClientSession() as session:
        async with session.post("https://www.httpbin.org/post", data=data) as response:
            print(f"status: {response.status}")
            print(f"headers: {response.headers}")
            print(f"body: {await response.text()}")
            print(f"bytes: {await response.read()}")
            print(f"json: {await response.json()}")


if __name__ == "__main__":
    asyncio.run(main())

2.7 超时设置

import aiohttp
import asyncio


async def main():
    # 设置2秒的超时时间
    timeout = aiohttp.ClientTimeout(total=2)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get("https://www.httpbin.org/get") as response:
            print(f"status: {response.status}")


if __name__ == "__main__":
    asyncio.run(main())

2.8 并发限制

import aiohttp
import asyncio

# 爬取最大并发量
CONCURRENCY = 5
URL = "https://www.baidu.com/"

# 创建一个信号量对象
semaphore = asyncio.Semaphore(CONCURRENCY)
session = None


async def scrape_api():
    # 信号量可以控制进入爬取最大协程数量
    async with semaphore:
        print(f"Scraping {URL}")
        async with session.get(URL) as response:
            await asyncio.sleep(1)
            return await response.text()


async def main():
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [
        asyncio.ensure_future(
            scrape_api()) for _ in range(1000)]
    await asyncio.gather(*scrape_index_tasks)


if __name__ == "__main__":
    asyncio.run(main())

3. aiohttp异步爬取实战

3.1 案例介绍

3.2 准备工作

3.3 页面分析

3.4 实现思路

3.5 基本配置

import logging


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s: %(message)s')


INDEX_URL = "https://spa5.scrape.center/api/book/?limit=18&offset={offset}"
DETAIL_URL = "https://spa5.scrape.center/detail/{id}"
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5

3.6 爬取列表

实现

通用的爬取方法
import asyncio
import aiohttp


async def scrape_api(url):
    async with semaphore:
        try:
            logging.info(f"Scraping: {url}")
            # verify_ssl: 是否开启SSL认证
            async with session.get(url, verify_ssl=False) as response:
                return await response.json()
        except aiohttp.ClientError:
            logging.info(f"Error: {url}", exc_info=True)
爬取列表
async def scrape_index(page):
    url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1))
    return await scrape_api(url)
串联并用
import json


async def main():
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [
        asyncio.ensure_future(
            scrape_index(page)) for page in range(
            1, PAGE_NUMBER + 1)]
    results = await asyncio.gather(*scrape_index_tasks)
    logging.info(
        f"Results: {json.dumps(results, ensure_ascii=False, indent=2)}")

if __name__ == "__main__":
    asyncio.run(main())

合并

import json
import asyncio
import logging
import aiohttp

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s: %(message)s')


INDEX_URL = "https://spa5.scrape.center/api/book/?limit=18&offset={offset}"
DETAIL_URL = "https://spa5.scrape.center/detail/{id}"
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5

semaphore = asyncio.Semaphore(CONCURRENCY)
session = None


async def scrape_api(url):
    async with semaphore:
        try:
            logging.info(f"Scraping: {url}")
            async with session.get(url, verify_ssl=False) as response:
                return await response.json()
        except aiohttp.ClientError:
            logging.info(f"Error: {url}", exc_info=True)


async def scrape_index(page):
    url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1))
    return await scrape_api(url)


async def main():
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [
        asyncio.ensure_future(
            scrape_index(page)) for page in range(
            1, PAGE_NUMBER + 1)]
    results = await asyncio.gather(*scrape_index_tasks)
    logging.info(
        f"Results: {json.dumps(results, ensure_ascii=False, indent=2)}")


if __name__ == "__main__":
    asyncio.run(main())

3.7 爬取详情页

实现

在main方法中将详情页的ID获取出
ids = []
for index_data in results:
    if not index_data:
        continue
    for item in index_data.get("results"):
        ids.append(item.get("id"))
爬取详情页
async def scrape_detail(id):
    url = DETAIL_URL.format(id=id)
    data = await scrape_api(url)
    logging.info(f"Saving: {data}")
main方法增加对scrape_detail方法的调用
scrape_detail_tasks = [
        asyncio.ensure_future(
            scrape_detail(id)) for id in ids]
    await asyncio.wait(scrape_detail_tasks)
    await session.close()

合并

import json
import asyncio
import logging
import aiohttp

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s: %(message)s')


INDEX_URL = "https://spa5.scrape.center/api/book/?limit=18&offset={offset}"
DETAIL_URL = "https://spa5.scrape.center/api/book/{id}"
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5

semaphore = asyncio.Semaphore(CONCURRENCY)
session = None


async def scrape_api(url):
    async with semaphore:
        try:
            logging.info(f"Scraping: {url}")
            async with session.get(url, verify_ssl=False) as response:
                return await response.json()
        except aiohttp.ClientError:
            logging.info(f"Error: {url}", exc_info=True)


async def scrape_index(page):
    url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1))
    return await scrape_api(url)


async def scrape_detail(id):
    url = DETAIL_URL.format(id=id)
    data = await scrape_api(url)
    logging.info(f"Saving {url}: {data}")


async def main():
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [
        asyncio.ensure_future(
            scrape_index(page)) for page in range(
            1, PAGE_NUMBER + 1)]
    results = await asyncio.gather(*scrape_index_tasks)
    logging.info(
        f"Results: {json.dumps(results, ensure_ascii=False, indent=2)}")

    ids = []
    for index_data in results:
        if not index_data:
            continue
        for item in index_data.get("results"):
            ids.append(item.get("id"))

    scrape_detail_tasks = [
        asyncio.ensure_future(
            scrape_detail(id)) for id in ids]
    await asyncio.wait(scrape_detail_tasks)
    await session.close()


if __name__ == "__main__":
    asyncio.run(main())

原文地址:https://blog.csdn.net/BlackOrnate/article/details/134735439

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_28676.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注