Web スクレイピングは、Web サイトから大量のデータを抽出する強力な技術です。ただし、オンデマンドでデータをスクレイピングするのは困難な場合があります。この包括的なガイドでは、FastAPI を使用して Python Web スクレイパーをリアルタイム データ API に変える方法を学びます。
スクレイピング API を構築する理由
API を介してスクレイピングされたデータを配信する主な利点をいくつか紹介します。
リアルタイムデータ – API は、キャッシュされた古くなったバッチに依存するのではなく、オンデマンドでデータをスクレイピングできます。
カスタムクエリ – API を使用すると、Web サイト全体をダンプするだけでなく、データのカスタム クエリが可能になります。
スケーラビリティ – API はトラフィックの急増を処理し、数千のユーザーに拡張します。
信頼性の向上 – API は失敗したリクエストを再試行し、堅牢なスクレイピング ロジックを実装します。
柔軟性 – API は Webhook などのさまざまなデータ配信方法を実装できます。
抽象化 – API は、複雑なスクレイピング ロジックとインフラストラクチャを API コンシューマーから隠します。
スクレイピング API アーキテクチャ
大まかに言うと、Web スクレイピング API は次のアーキテクチャに従います。
- APIがリクエストを受信
- スクレーパーがデータを取得する
- データがキャッシュされる
- API はスクレイピングされたデータを返します
主要なコンポーネントは次のとおりです。
APIサーバー – クライアントのリクエストを処理し、スクレイピングされたデータを配信します。
スクレーパー – ターゲット サイトからオンデマンドでデータを取得します。
キャッシュ – 冗長なスクレイピングを避けるために、スクレイピングされたデータを保存します。
データベース – オプションで、履歴分析のためにスクレイピングされたデータを保存します。
FastAPI を使用する理由
世の中には優れた API フレームワークがたくさんあります。スクレーパーの場合は、こちらがおすすめです FastAPI
FastAPI は非常に高速なので、データ スクレイピング API に最適です。
自動ドキュメント、検証、シリアル化などを提供します。
非同期スクレイピングのための asyncio をサポートします。
使い方も学習も簡単です。小規模から大規模な API まで柔軟に対応します。
httpx、parsel などのスクレイピング ツールの優れたエコシステムがあります。
スクレイピングAPIのセットアップ
次のコア ライブラリを使用します。
FastAPI – 当社の API フレームワーク
httpx – 非同期HTTPクライアント
パーセル – HTML/XMLパーサー
ログル – ロギングユーティリティ
パッケージをインストールします。
pip install fastapi uvicorn httpx parsel loguru
このガイドでは、Yahoo Finance からいくつかの基本的な株式データを収集します。
APIを作成する
単一のエンドポイントを使用して最初の FastAPI アプリをセットアップしましょう。
from fastapi import FastAPI
app = FastAPI()
@app.get("/stock/{symbol}")
async def get_stock(symbol: str):
return { "stock": symbol }
この基本的な API は、提供された銘柄記号を返すだけです。
サーバーを起動してテストしてみましょう。
uvicorn main:app --reload
import httpx
print(httpx.get("http://localhost:8000/stock/AAPL").json())
# {‘stock‘: ‘AAPL‘}
API はリクエストを受信する準備ができています。次に、データをスクレイピングしてみましょう。
株式データのスクレイピング
株のデータを取得するには、次のようにします。
- シンボルから Yahoo Finance URL を構築する
- HTMLページを取得する
- XPath を使用して値を解析する
from parsel import Selector # for xpath parsing
async def scrape_stock(symbol: str):
url = f"https://finance.yahoo.com/quote/{symbol}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
sel = Selector(response.text)
# parse summary values
values = sel.xpath(‘//div[contains(@data-test,"summary-table")]//tr‘)
data = {}
for value in values:
label = value.xpath("./td[1]/text()").get()
val = value.xpath("./td[2]/text()").get()
data[label] = val
# parse price
data["price"] = sel.xpath(‘//fin-streamer[@data-symbol=$symbol]/@value‘, symbol=symbol).get()
return data
このスクレイパーは、解析されたデータを含む辞書を返します。これを API に接続しましょう。
from fastapi import FastAPI
from yahoo_finance import scrape_stock
app = FastAPI()
@app.get("/stock/{symbol}")
async def get_stock(symbol: str):
data = await scrape_stock(symbol)
return data
ここで API を呼び出すと、最新のデータが取得されます。
http http://localhost:8000/stock/AAPL
HTTP/1.1 200 OK
Content-Length: 340
{
"52 Week Range": "142.00 - 182.94",
"Beta (5Y Monthly)": "1.25",
"Diluted EPS (ttm)": "6.05",
"Earnings Date": "Oct 27, 2022",
"Ex-Dividend Date": "Aug 05, 2022",
"Forward Dividend & Yield": "0.92 (0.59%)",
"Market Cap": "2.44T",
"Open": "156.76",
"PE Ratio (ttm)": "25.60",
"Previous Close": "153.72",
"Price": "155.33",
"Volume": "53,978,024"
}
キャッシュの追加
リクエストごとにスクレイピングするのは無駄です。キャッシュを追加して、ストックを 5 分に XNUMX 回だけスクレイピングするようにしましょう。
シンプルなものを使用します dict
株式記号をキーとしてスクレイピングされたデータを保存するには:
STOCK_CACHE = {}
async def scrape_stock(symbol):
if symbol in STOCK_CACHE:
return STOCK_CACHE[symbol]
data = ... # scrape
STOCK_CACHE[symbol] = data
return data
繰り返されるリクエストは、毎回スクレイピングするのではなく、キャッシュされたデータを返すようになりました。
古いキャッシュを定期的にクリアすることもできます。
import time
CACHE_MAX_AGE = 300 # seconds
async def clear_expired_cache():
curr_time = time.time()
for symbol, data in STOCK_CACHE.items():
if curr_time - data["ts"] > CACHE_MAX_AGE:
del STOCK_CACHE[symbol]
# run every 5 minutes
clear_cache_task = asyncio.create_task(clear_expired_cache())
これにより、キャッシュが際限なく増大することがなくなります。
Webhook の追加
長時間のスクレイピング ジョブの場合は、Webhook を使用して結果を非同期的に返すことができます。
@app.get("/stock/{symbol}")
async def get_stock(symbol: str, webhook: str):
if webhook:
task = asyncio.create_task(fetch_stock(symbol, webhook))
return {"msg": "Fetching data"}
data = await scrape_stock(symbol)
return data
async def fetch_stock(symbol, url):
data = await scrape_stock(symbol)
async with httpx.AsyncClient() as client:
await client.post(url, json=data)
スクレイピングが完了するのを待つ代わりに、API はすぐにステータスを返し、データを非同期でコールバック Webhook に配信します。
次のようなツールを使用してこれをテストできます Webhook.サイト.
スクレイピングAPIのスケーリング
トラフィックの増加に応じて、いくつかのスケーリング手法を次に示します。
APIキャッシュを追加する – Redis などのキャッシュを使用してスクレイピングの負荷を軽減します。
複数のプロセスを実行する – gunicorn を使用してコア/サーバー全体に拡張します。
オフロードスクレイピング – スクレーパーを Celery や RabbitMQ などのワーカーに移動します。
スクレイピングサービスを利用する – Scrapfly や ScraperAPI などのスクレイピング API を活用します。
スクレーパーの最適化 – スクレイパーが効率的であることを確認し、禁止を回避します。
データベースの追加 – さらに分析するために、スクレイピングしたデータをデータベースに保存します。
まとめ
このガイドでは、FastAPI を使用して Python で Web スクレイピング API を構築しました。重要なポイントは次のとおりです。
FastAPI は、API をスクレイピングするための優れたフレームワークを提供します。
スクレイパーを生成して、オンデマンドでデータを取得できます。
キャッシュと Webhook は、スクレイピングの制限を克服するのに役立ちます。
トラフィックの増加に応じて、多くの最適化およびスケーリング戦略が存在します。
API をスクレイピングすると、Web サイト上の豊富なデータが解放されます。静的スクレイピングの代わりに API を生成することで、低レイテンシーのカスタマイズされたデータを大規模に配信できます。