コンテンツにスキップ

Web スクレイパーをデータ API に変える方法

Web スクレイピングは、Web サイトから大量のデータを抽出する強力な技術です。ただし、オンデマンドでデータをスクレイピングするのは困難な場合があります。この包括的なガイドでは、FastAPI を使用して Python Web スクレイパーをリアルタイム データ API に変える方法を学びます。

スクレイピング API を構築する理由

API を介してスクレイピングされたデータを配信する主な利点をいくつか紹介します。

  • リアルタイムデータ – API は、キャッシュされた古くなったバッチに依存するのではなく、オンデマンドでデータをスクレイピングできます。

  • カスタムクエリ – API を使用すると、Web サイト全体をダンプするだけでなく、データのカスタム クエリが可能になります。

  • スケーラビリティ – API はトラフィックの急増を処理し、数千のユーザーに拡張します。

  • 信頼性の向上 – API は失敗したリクエストを再試行し、堅牢なスクレイピング ロジックを実装します。

  • 柔軟性 – API は Webhook などのさまざまなデータ配信方法を実装できます。

  • 抽象化 – API は、複雑なスクレイピング ロジックとインフラストラクチャを API コンシューマーから隠します。

スクレイピング API アーキテクチャ

大まかに言うと、Web スクレイピング API は次のアーキテクチャに従います。

スクレイピング API アーキテクチャ

  1. APIがリクエストを受信
  2. スクレーパーがデータを取得する
  3. データがキャッシュされる
  4. API はスクレイピングされたデータを返します

主要なコンポーネントは次のとおりです。

  • APIサーバー – クライアントのリクエストを処理し、スクレイピングされたデータを配信します。

  • スクレーパー – ターゲット サイトからオンデマンドでデータを取得します。

  • キャッシュ – 冗長なスクレイピングを避けるために、スクレイピングされたデータを保存します。

  • データベース – オプションで、履歴分析のためにスクレイピングされたデータを保存します。

FastAPI を使用する理由

世の中には優れた API フレームワークがたくさんあります。スクレーパーの場合は、こちらがおすすめです FastAPI

  • FastAPI は非常に高速なので、データ スクレイピング API に最適です。

  • 自動ドキュメント、検証、シリアル化などを提供します。

  • 非同期スクレイピングのための asyncio をサポートします。

  • 使い方も学習も簡単です。小規模から大規模な API まで柔軟に対応します。

  • httpx、parsel などのスクレイピング ツールの優れたエコシステムがあります。

スクレイピングAPIのセットアップ

次のコア ライブラリを使用します。

  • FastAPI – 当社の API フレームワーク

  • httpx – 非同期HTTPクライアント

  • パーセル – HTML/XMLパーサー

  • ログル – ロギングユーティリティ

パッケージをインストールします。

pip install fastapi uvicorn httpx parsel loguru

このガイドでは、Yahoo Finance からいくつかの基本的な株式データを収集します。

APIを作成する

単一のエンドポイントを使用して最初の FastAPI アプリをセットアップしましょう。

from fastapi import FastAPI

app = FastAPI() 

@app.get("/stock/{symbol}")  
async def get_stock(symbol: str):
    return { "stock": symbol } 

この基本的な API は、提供された銘柄記号を返すだけです。

サーバーを起動してテストしてみましょう。

uvicorn main:app --reload
import httpx

print(httpx.get("http://localhost:8000/stock/AAPL").json())

# {‘stock‘: ‘AAPL‘}

API はリクエストを受信する準備ができています。次に、データをスクレイピングしてみましょう。

株式データのスクレイピング

株のデータを取得するには、次のようにします。

  1. シンボルから Yahoo Finance URL を構築する
  2. HTMLページを取得する
  3. XPath を使用して値を解析する
from parsel import Selector # for xpath parsing

async def scrape_stock(symbol: str):

    url = f"https://finance.yahoo.com/quote/{symbol}"

    async with httpx.AsyncClient() as client:
       response = await client.get(url)

    sel = Selector(response.text)

    # parse summary values 
    values = sel.xpath(‘//div[contains(@data-test,"summary-table")]//tr‘)
    data = {}
    for value in values:
        label = value.xpath("./td[1]/text()").get()
        val = value.xpath("./td[2]/text()").get()
        data[label] = val

    # parse price 
    data["price"] = sel.xpath(‘//fin-streamer[@data-symbol=$symbol]/@value‘, symbol=symbol).get()

    return data

このスクレイパーは、解析されたデータを含む辞書を返します。これを API に接続しましょう。

from fastapi import FastAPI
from yahoo_finance import scrape_stock

app = FastAPI()

@app.get("/stock/{symbol}")
async def get_stock(symbol: str):
    data = await scrape_stock(symbol) 
    return data

ここで API を呼び出すと、最新のデータが取得されます。

http http://localhost:8000/stock/AAPL

HTTP/1.1 200 OK
Content-Length: 340

{
  "52 Week Range": "142.00 - 182.94",
  "Beta (5Y Monthly)": "1.25", 
  "Diluted EPS (ttm)": "6.05",
  "Earnings Date": "Oct 27, 2022",
  "Ex-Dividend Date": "Aug 05, 2022",
  "Forward Dividend & Yield": "0.92 (0.59%)",
  "Market Cap": "2.44T",
  "Open": "156.76",
  "PE Ratio (ttm)": "25.60",
  "Previous Close": "153.72",
  "Price": "155.33",  
  "Volume": "53,978,024"
}

キャッシュの追加

リクエストごとにスクレイピングするのは無駄です。キャッシュを追加して、ストックを 5 分に XNUMX 回だけスクレイピングするようにしましょう。

シンプルなものを使用します dict 株式記号をキーとしてスクレイピングされたデータを保存するには:

STOCK_CACHE = {} 

async def scrape_stock(symbol):

   if symbol in STOCK_CACHE:
       return STOCK_CACHE[symbol] 

   data = ... # scrape 
   STOCK_CACHE[symbol] = data

   return data

繰り返されるリクエストは、毎回スクレイピングするのではなく、キャッシュされたデータを返すようになりました。

古いキャッシュを定期的にクリアすることもできます。

import time

CACHE_MAX_AGE = 300 # seconds

async def clear_expired_cache():
   curr_time = time.time()
   for symbol, data in STOCK_CACHE.items():
       if curr_time - data["ts"] > CACHE_MAX_AGE:
           del STOCK_CACHE[symbol]

# run every 5 minutes   
clear_cache_task = asyncio.create_task(clear_expired_cache()) 

これにより、キャッシュが際限なく増大することがなくなります。

Webhook の追加

長時間のスクレイピング ジョブの場合は、Webhook を使用して結果を非同期的に返すことができます。

@app.get("/stock/{symbol}")
async def get_stock(symbol: str, webhook: str):

   if webhook:
      task = asyncio.create_task(fetch_stock(symbol, webhook))
      return {"msg": "Fetching data"}

   data = await scrape_stock(symbol)  
   return data

async def fetch_stock(symbol, url):
   data = await scrape_stock(symbol)

   async with httpx.AsyncClient() as client:
      await client.post(url, json=data)

スクレイピングが完了するのを待つ代わりに、API はすぐにステータスを返し、データを非同期でコールバック Webhook に配信します。

次のようなツールを使用してこれをテストできます Webhook.サイト.

スクレイピングAPIのスケーリング

トラフィックの増加に応じて、いくつかのスケーリング手法を次に示します。

  • APIキャッシュを追加する – Redis などのキャッシュを使用してスクレイピングの負荷を軽減します。

  • 複数のプロセスを実行する – gunicorn を使用してコア/サーバー全体に拡張します。

  • オフロードスクレイピング – スクレーパーを Celery や RabbitMQ などのワーカーに移動します。

  • スクレイピングサービスを利用する – Scrapfly や ScraperAPI などのスクレイピング API を活用します。

  • スクレーパーの最適化 – スクレイパーが効率的であることを確認し、禁止を回避します。

  • データベースの追加 – さらに分析するために、スクレイピングしたデータをデータベースに保存します。

まとめ

このガイドでは、FastAPI を使用して Python で Web スクレイピング API を構築しました。重要なポイントは次のとおりです。

  • FastAPI は、API をスクレイピングするための優れたフレームワークを提供します。

  • スクレイパーを生成して、オンデマンドでデータを取得できます。

  • キャッシュと Webhook は、スクレイピングの制限を克服するのに役立ちます。

  • トラフィックの増加に応じて、多くの最適化およびスケーリング戦略が存在します。

API をスクレイピングすると、Web サイト上の豊富なデータが解放されます。静的スクレイピングの代わりに API を生成することで、低レイテンシーのカスタマイズされたデータを大規模に配信できます。

タグ:

参加する

あなたのメールアドレスは公開されません。 必須フィールドは、マークされています *