跳到内容

如何将网络爬虫转变为数据 API

网络抓取是一种从网站提取大量数据的强大技术。然而,按需抓取数据可能具有挑战性。在本综合指南中,我们将学习如何使用 FastAPI 将 Python 网络抓取工具转变为实时数据 API。

为什么要构建抓取 API?

以下是通过 API 提供抓取数据的一些主要优势:

  • 实时数据 – API 可以按需抓取数据,而不是依赖陈旧的缓存批次。

  • 自定义查询 – API 允许自定义数据查询,而不仅仅是转储整个网站。

  • 可扩展性 – API 可处理流量峰值并扩展到数千名用户。

  • 值得信赖 – API 重试失败的请求并实现强大的抓取逻辑。

  • 高度灵活 – API 可以实现各种数据传输方法,例如 Webhooks。

  • 抽象化 – API 对 API 消费者隐藏了复杂的抓取逻辑和基础设施。

抓取API架构

在较高层面上,网络抓取 API 遵循以下架构:

抓取API架构

  1. API接收请求
  2. 爬虫获取数据
  3. 数据被缓存
  4. API返回抓取的数据

关键组件是:

  • API 服务器 – 处理客户请求并提供抓取的数据。

  • 刮刀 – 根据需要从目标站点获取数据。

  • 缓存 – 存储抓取的数据以避免冗余抓取。

  • 数据库 – 可选择存储抓取的数据以进行历史分析。

为什么使用 FastAPI?

有许多优秀的 API 框架。对于刮刀,我推荐 FastAPI 因为:

  • FastAPI 非常快——非常适合数据抓取 API。

  • 它提供自动文档、验证、序列化等。

  • 支持 asyncio 进行异步抓取。

  • 易于使用和学习。灵活适用于小型到大型 API。

  • 拥有强大的抓取工具生态系统,如 httpx、parsel 等。

抓取 API 设置

我们将使用以下核心库:

  • FastAPI – 我们的 API 框架

  • httpx – 异步HTTP客户端

  • – HTML/XML 解析器

  • 洛古鲁 – 日志实用程序

安装软件包:

pip install fastapi uvicorn httpx parsel loguru

在本指南中,我们将从雅虎财经获取一些基本股票数据。

创建 API

让我们使用单个端点设置初始 FastAPI 应用程序:

from fastapi import FastAPI

app = FastAPI() 

@app.get("/stock/{symbol}")  
async def get_stock(symbol: str):
    return { "stock": symbol } 

这个基本 API 仅返回我们提供的股票代码。

让我们启动服务器并测试它:

uvicorn main:app --reload
import httpx

print(httpx.get("http://localhost:8000/stock/AAPL").json())

# {‘stock‘: ‘AAPL‘}

我们的 API 已准备好接收请求。接下来让我们让它抓取一些数据。

抓取股票数据

要获取股票的数据,我们将:

  1. 从符号构建雅虎财经 URL
  2. 获取 HTML 页面
  3. 使用 XPath 解析值
from parsel import Selector # for xpath parsing

async def scrape_stock(symbol: str):

    url = f"https://finance.yahoo.com/quote/{symbol}"

    async with httpx.AsyncClient() as client:
       response = await client.get(url)

    sel = Selector(response.text)

    # parse summary values 
    values = sel.xpath(‘//div[contains(@data-test,"summary-table")]//tr‘)
    data = {}
    for value in values:
        label = value.xpath("./td[1]/text()").get()
        val = value.xpath("./td[2]/text()").get()
        data[label] = val

    # parse price 
    data["price"] = sel.xpath(‘//fin-streamer[@data-symbol=$symbol]/@value‘, symbol=symbol).get()

    return data

该抓取器返回包含解析数据的字典。让我们将它连接到我们的 API:

from fastapi import FastAPI
from yahoo_finance import scrape_stock

app = FastAPI()

@app.get("/stock/{symbol}")
async def get_stock(symbol: str):
    data = await scrape_stock(symbol) 
    return data

现在,当我们调用 API 时,它将获取最新的数据:

http http://localhost:8000/stock/AAPL

HTTP/1.1 200 OK
Content-Length: 340

{
  "52 Week Range": "142.00 - 182.94",
  "Beta (5Y Monthly)": "1.25", 
  "Diluted EPS (ttm)": "6.05",
  "Earnings Date": "Oct 27, 2022",
  "Ex-Dividend Date": "Aug 05, 2022",
  "Forward Dividend & Yield": "0.92 (0.59%)",
  "Market Cap": "2.44T",
  "Open": "156.76",
  "PE Ratio (ttm)": "25.60",
  "Previous Close": "153.72",
  "Price": "155.33",  
  "Volume": "53,978,024"
}

添加缓存

抓取每个请求是浪费的。让我们添加缓存,这样我们每 5 分钟只抓取一次股票。

我们将使用一个简单的 dict 存储按股票代码键入的抓取数据:

STOCK_CACHE = {} 

async def scrape_stock(symbol):

   if symbol in STOCK_CACHE:
       return STOCK_CACHE[symbol] 

   data = ... # scrape 
   STOCK_CACHE[symbol] = data

   return data

现在,重复的请求将返回缓存的数据,而不是每次都进行抓取。

我们还可以定期清除旧缓存:

import time

CACHE_MAX_AGE = 300 # seconds

async def clear_expired_cache():
   curr_time = time.time()
   for symbol, data in STOCK_CACHE.items():
       if curr_time - data["ts"] > CACHE_MAX_AGE:
           del STOCK_CACHE[symbol]

# run every 5 minutes   
clear_cache_task = asyncio.create_task(clear_expired_cache()) 

这确保我们的缓存不会无限制地增长。

添加网络钩子

对于长时间的抓取作业,我们可以使用 webhooks 异步返回结果:

@app.get("/stock/{symbol}")
async def get_stock(symbol: str, webhook: str):

   if webhook:
      task = asyncio.create_task(fetch_stock(symbol, webhook))
      return {"msg": "Fetching data"}

   data = await scrape_stock(symbol)  
   return data

async def fetch_stock(symbol, url):
   data = await scrape_stock(symbol)

   async with httpx.AsyncClient() as client:
      await client.post(url, json=data)

现在,我们的 API 将立即返回状态并将数据异步传递到回调 Webhook,而不是等待抓取完成。

我们可以使用类似的工具来测试它 Webhook.site.

扩展抓取 API

随着流量的增加,以下是一些扩展技术:

  • 添加API缓存 – 使用 Redis 等缓存来减少抓取负载。

  • 运行多个进程 – 使用gunicorn 跨核心/服务器进行扩展。

  • 卸载抓取 – 将抓取工具移至 Celery 或 RabbitMQ 等工作人员处。

  • 使用抓取服务 – 利用 Scrapfly 或 ScraperAPI 等抓取 API。

  • 优化抓取工具 – 确保抓取工具高效并避免禁令。

  • 添加数据库 – 将抓取的数据存储在数据库中以供进一步分析。

结论

在本指南中,我们使用 FastAPI 在 Python 中构建了一个网页抓取 API。关键要点是:

  • FastAPI 为抓取 API 提供了一个优秀的框架。

  • 我们可以生成抓取工具来按需获取数据。

  • 缓存和网络钩子有助于克服抓取限制。

  • 随着流量的增长,有许多优化和扩展策略。

抓取 API 可以解锁网站上的大量数据。通过生成 API 而不是静态抓取,我们可以大规模提供低延迟、定制的数据。

标签:

加入谈话

您的电邮地址不会被公开。 必填带 *