网络抓取是一种从网站提取大量数据的强大技术。然而,按需抓取数据可能具有挑战性。在本综合指南中,我们将学习如何使用 FastAPI 将 Python 网络抓取工具转变为实时数据 API。
为什么要构建抓取 API?
以下是通过 API 提供抓取数据的一些主要优势:
实时数据 – API 可以按需抓取数据,而不是依赖陈旧的缓存批次。
自定义查询 – API 允许自定义数据查询,而不仅仅是转储整个网站。
可扩展性 – API 可处理流量峰值并扩展到数千名用户。
值得信赖 – API 重试失败的请求并实现强大的抓取逻辑。
高度灵活 – API 可以实现各种数据传输方法,例如 Webhooks。
抽象化 – API 对 API 消费者隐藏了复杂的抓取逻辑和基础设施。
抓取API架构
在较高层面上,网络抓取 API 遵循以下架构:
- API接收请求
- 爬虫获取数据
- 数据被缓存
- API返回抓取的数据
关键组件是:
API 服务器 – 处理客户请求并提供抓取的数据。
刮刀 – 根据需要从目标站点获取数据。
缓存 – 存储抓取的数据以避免冗余抓取。
数据库 – 可选择存储抓取的数据以进行历史分析。
为什么使用 FastAPI?
有许多优秀的 API 框架。对于刮刀,我推荐 FastAPI 因为:
FastAPI 非常快——非常适合数据抓取 API。
它提供自动文档、验证、序列化等。
支持 asyncio 进行异步抓取。
易于使用和学习。灵活适用于小型到大型 API。
拥有强大的抓取工具生态系统,如 httpx、parsel 等。
抓取 API 设置
我们将使用以下核心库:
FastAPI – 我们的 API 框架
httpx – 异步HTTP客户端
包 – HTML/XML 解析器
洛古鲁 – 日志实用程序
安装软件包:
pip install fastapi uvicorn httpx parsel loguru
在本指南中,我们将从雅虎财经获取一些基本股票数据。
创建 API
让我们使用单个端点设置初始 FastAPI 应用程序:
from fastapi import FastAPI
app = FastAPI()
@app.get("/stock/{symbol}")
async def get_stock(symbol: str):
return { "stock": symbol }
这个基本 API 仅返回我们提供的股票代码。
让我们启动服务器并测试它:
uvicorn main:app --reload
import httpx
print(httpx.get("http://localhost:8000/stock/AAPL").json())
# {‘stock‘: ‘AAPL‘}
我们的 API 已准备好接收请求。接下来让我们让它抓取一些数据。
抓取股票数据
要获取股票的数据,我们将:
- 从符号构建雅虎财经 URL
- 获取 HTML 页面
- 使用 XPath 解析值
from parsel import Selector # for xpath parsing
async def scrape_stock(symbol: str):
url = f"https://finance.yahoo.com/quote/{symbol}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
sel = Selector(response.text)
# parse summary values
values = sel.xpath(‘//div[contains(@data-test,"summary-table")]//tr‘)
data = {}
for value in values:
label = value.xpath("./td[1]/text()").get()
val = value.xpath("./td[2]/text()").get()
data[label] = val
# parse price
data["price"] = sel.xpath(‘//fin-streamer[@data-symbol=$symbol]/@value‘, symbol=symbol).get()
return data
该抓取器返回包含解析数据的字典。让我们将它连接到我们的 API:
from fastapi import FastAPI
from yahoo_finance import scrape_stock
app = FastAPI()
@app.get("/stock/{symbol}")
async def get_stock(symbol: str):
data = await scrape_stock(symbol)
return data
现在,当我们调用 API 时,它将获取最新的数据:
http http://localhost:8000/stock/AAPL
HTTP/1.1 200 OK
Content-Length: 340
{
"52 Week Range": "142.00 - 182.94",
"Beta (5Y Monthly)": "1.25",
"Diluted EPS (ttm)": "6.05",
"Earnings Date": "Oct 27, 2022",
"Ex-Dividend Date": "Aug 05, 2022",
"Forward Dividend & Yield": "0.92 (0.59%)",
"Market Cap": "2.44T",
"Open": "156.76",
"PE Ratio (ttm)": "25.60",
"Previous Close": "153.72",
"Price": "155.33",
"Volume": "53,978,024"
}
添加缓存
抓取每个请求是浪费的。让我们添加缓存,这样我们每 5 分钟只抓取一次股票。
我们将使用一个简单的 dict
存储按股票代码键入的抓取数据:
STOCK_CACHE = {}
async def scrape_stock(symbol):
if symbol in STOCK_CACHE:
return STOCK_CACHE[symbol]
data = ... # scrape
STOCK_CACHE[symbol] = data
return data
现在,重复的请求将返回缓存的数据,而不是每次都进行抓取。
我们还可以定期清除旧缓存:
import time
CACHE_MAX_AGE = 300 # seconds
async def clear_expired_cache():
curr_time = time.time()
for symbol, data in STOCK_CACHE.items():
if curr_time - data["ts"] > CACHE_MAX_AGE:
del STOCK_CACHE[symbol]
# run every 5 minutes
clear_cache_task = asyncio.create_task(clear_expired_cache())
这确保我们的缓存不会无限制地增长。
添加网络钩子
对于长时间的抓取作业,我们可以使用 webhooks 异步返回结果:
@app.get("/stock/{symbol}")
async def get_stock(symbol: str, webhook: str):
if webhook:
task = asyncio.create_task(fetch_stock(symbol, webhook))
return {"msg": "Fetching data"}
data = await scrape_stock(symbol)
return data
async def fetch_stock(symbol, url):
data = await scrape_stock(symbol)
async with httpx.AsyncClient() as client:
await client.post(url, json=data)
现在,我们的 API 将立即返回状态并将数据异步传递到回调 Webhook,而不是等待抓取完成。
我们可以使用类似的工具来测试它 Webhook.site.
扩展抓取 API
随着流量的增加,以下是一些扩展技术:
添加API缓存 – 使用 Redis 等缓存来减少抓取负载。
运行多个进程 – 使用gunicorn 跨核心/服务器进行扩展。
卸载抓取 – 将抓取工具移至 Celery 或 RabbitMQ 等工作人员处。
使用抓取服务 – 利用 Scrapfly 或 ScraperAPI 等抓取 API。
优化抓取工具 – 确保抓取工具高效并避免禁令。
添加数据库 – 将抓取的数据存储在数据库中以供进一步分析。
结论
在本指南中,我们使用 FastAPI 在 Python 中构建了一个网页抓取 API。关键要点是:
FastAPI 为抓取 API 提供了一个优秀的框架。
我们可以生成抓取工具来按需获取数据。
缓存和网络钩子有助于克服抓取限制。
随着流量的增长,有许多优化和扩展策略。
抓取 API 可以解锁网站上的大量数据。通过生成 API 而不是静态抓取,我们可以大规模提供低延迟、定制的数据。