Bỏ để qua phần nội dung

Cách biến trình thu thập dữ liệu web thành API dữ liệu

Quét web là một kỹ thuật mạnh mẽ để trích xuất lượng lớn dữ liệu từ các trang web. Tuy nhiên, việc cạo dữ liệu theo yêu cầu có thể là một thách thức. Trong hướng dẫn toàn diện này, chúng ta sẽ tìm hiểu cách biến trình quét web Python thành API dữ liệu thời gian thực bằng FastAPI.

Tại sao nên xây dựng API Scraping?

Dưới đây là một số lợi ích chính của việc phân phối dữ liệu cóp nhặt thông qua API:

  • Dữ liệu theo thời gian thực – API có thể cạo dữ liệu theo yêu cầu thay vì dựa vào các lô cũ, được lưu trong bộ nhớ đệm.

  • Truy vấn tùy chỉnh – API cho phép truy vấn dữ liệu tùy chỉnh thay vì chỉ hủy toàn bộ trang web.

  • khả năng mở rộng – API xử lý lưu lượng truy cập tăng đột biến và mở rộng quy mô tới hàng nghìn người dùng.

  • Độ tin cậy – API thử lại các yêu cầu không thành công và triển khai logic quét mạnh mẽ.

  • Linh hoạt – API có thể triển khai nhiều phương thức phân phối dữ liệu khác nhau như webhook.

  • Trừu tượng – API ẩn logic và cơ sở hạ tầng phức tạp khỏi người tiêu dùng API.

Quét kiến ​​trúc API

Ở cấp độ cao, API quét web tuân theo kiến ​​trúc này:

Quét kiến ​​trúc API

  1. API nhận yêu cầu
  2. Scraper tìm nạp dữ liệu
  3. Dữ liệu được lưu vào bộ nhớ đệm
  4. API trả về dữ liệu cóp nhặt

Các thành phần chính là:

  • Máy chủ API – Xử lý các yêu cầu của khách hàng và cung cấp dữ liệu cóp nhặt.

  • cái nạo – Tìm nạp dữ liệu theo yêu cầu từ các trang web mục tiêu.

  • Bộ nhớ cache – Lưu trữ dữ liệu đã được cạo để tránh việc cạo dư thừa.

  • Cơ sở dữ liệu – Tùy chọn lưu trữ dữ liệu cóp nhặt để phân tích lịch sử.

Tại sao nên sử dụng FastAPI?

Có rất nhiều khung API tuyệt vời hiện có. Đối với máy cạp, tôi khuyên bạn nên API nhanh bởi vì:

  • FastAPI rất nhanh – hoàn hảo cho các API quét dữ liệu.

  • Nó cung cấp tài liệu tự động, xác nhận, tuần tự hóa, v.v.

  • Hỗ trợ asyncio để quét không đồng bộ.

  • Đơn giản để sử dụng và tìm hiểu. Linh hoạt cho các API từ nhỏ đến lớn.

  • Có hệ sinh thái tuyệt vời gồm các công cụ thu thập dữ liệu như httpx, Parsel, v.v.

Thiết lập API quét

Chúng tôi sẽ sử dụng các thư viện cốt lõi sau:

  • API nhanh – Khung API của chúng tôi

  • httpx – Máy khách HTTP không đồng bộ

  • Bưu kiện – Trình phân tích cú pháp HTML/XML

  • loguru – Tiện ích ghi nhật ký

Cài đặt các gói:

pip install fastapi uvicorn httpx parsel loguru

Đối với hướng dẫn này, chúng tôi sẽ lấy một số dữ liệu chứng khoán cơ bản từ Yahoo Finance.

Tạo API

Hãy thiết lập ứng dụng FastAPI ban đầu với một điểm cuối duy nhất:

from fastapi import FastAPI

app = FastAPI() 

@app.get("/stock/{symbol}")  
async def get_stock(symbol: str):
    return { "stock": symbol } 

API cơ bản này chỉ trả về ký hiệu chứng khoán mà chúng tôi cung cấp.

Hãy khởi động máy chủ và kiểm tra nó:

uvicorn main:app --reload
import httpx

print(httpx.get("http://localhost:8000/stock/AAPL").json())

# {‘stock‘: ‘AAPL‘}

API của chúng tôi sẵn sàng nhận yêu cầu. Tiếp theo hãy làm cho nó cạo một số dữ liệu.

Quét dữ liệu chứng khoán

Để có được dữ liệu về cổ phiếu, chúng tôi sẽ:

  1. Xây dựng URL Yahoo Finance từ biểu tượng
  2. Tìm nạp trang HTML
  3. Phân tích giá trị bằng XPath
from parsel import Selector # for xpath parsing

async def scrape_stock(symbol: str):

    url = f"https://finance.yahoo.com/quote/{symbol}"

    async with httpx.AsyncClient() as client:
       response = await client.get(url)

    sel = Selector(response.text)

    # parse summary values 
    values = sel.xpath(‘//div[contains(@data-test,"summary-table")]//tr‘)
    data = {}
    for value in values:
        label = value.xpath("./td[1]/text()").get()
        val = value.xpath("./td[2]/text()").get()
        data[label] = val

    # parse price 
    data["price"] = sel.xpath(‘//fin-streamer[@data-symbol=$symbol]/@value‘, symbol=symbol).get()

    return data

Scraper này trả về một từ điển chứa dữ liệu được phân tích cú pháp. Hãy kết nối nó với API của chúng tôi:

from fastapi import FastAPI
from yahoo_finance import scrape_stock

app = FastAPI()

@app.get("/stock/{symbol}")
async def get_stock(symbol: str):
    data = await scrape_stock(symbol) 
    return data

Bây giờ khi chúng ta gọi API, nó sẽ tìm nạp dữ liệu mới nhất:

http http://localhost:8000/stock/AAPL

HTTP/1.1 200 OK
Content-Length: 340

{
  "52 Week Range": "142.00 - 182.94",
  "Beta (5Y Monthly)": "1.25", 
  "Diluted EPS (ttm)": "6.05",
  "Earnings Date": "Oct 27, 2022",
  "Ex-Dividend Date": "Aug 05, 2022",
  "Forward Dividend & Yield": "0.92 (0.59%)",
  "Market Cap": "2.44T",
  "Open": "156.76",
  "PE Ratio (ttm)": "25.60",
  "Previous Close": "153.72",
  "Price": "155.33",  
  "Volume": "53,978,024"
}

Thêm bộ nhớ đệm

Cạo theo mọi yêu cầu là lãng phí. Hãy thêm bộ nhớ đệm để chúng tôi chỉ xóa kho 5 phút một lần.

Chúng ta sẽ sử dụng một cách đơn giản dict để lưu trữ dữ liệu đã được loại bỏ được khóa bằng ký hiệu chứng khoán:

STOCK_CACHE = {} 

async def scrape_stock(symbol):

   if symbol in STOCK_CACHE:
       return STOCK_CACHE[symbol] 

   data = ... # scrape 
   STOCK_CACHE[symbol] = data

   return data

Giờ đây, các yêu cầu lặp lại sẽ trả về dữ liệu được lưu trong bộ nhớ đệm thay vì phải thu thập dữ liệu mỗi lần.

Chúng tôi cũng có thể xóa bộ nhớ đệm cũ theo định kỳ:

import time

CACHE_MAX_AGE = 300 # seconds

async def clear_expired_cache():
   curr_time = time.time()
   for symbol, data in STOCK_CACHE.items():
       if curr_time - data["ts"] > CACHE_MAX_AGE:
           del STOCK_CACHE[symbol]

# run every 5 minutes   
clear_cache_task = asyncio.create_task(clear_expired_cache()) 

Điều này đảm bảo bộ đệm của chúng tôi sẽ không bị giới hạn.

Thêm Webhook

Đối với các công việc thu thập dữ liệu dài, chúng ta có thể sử dụng webhooks để trả về kết quả không đồng bộ:

@app.get("/stock/{symbol}")
async def get_stock(symbol: str, webhook: str):

   if webhook:
      task = asyncio.create_task(fetch_stock(symbol, webhook))
      return {"msg": "Fetching data"}

   data = await scrape_stock(symbol)  
   return data

async def fetch_stock(symbol, url):
   data = await scrape_stock(symbol)

   async with httpx.AsyncClient() as client:
      await client.post(url, json=data)

Giờ đây, thay vì đợi quá trình thu thập dữ liệu hoàn tất, API của chúng tôi sẽ ngay lập tức trả về trạng thái và phân phối dữ liệu một cách không đồng bộ đến webhook gọi lại.

Chúng ta có thể kiểm tra điều này bằng cách sử dụng một công cụ như Webhook.site.

Mở rộng API Scraping

Khi lưu lượng truy cập tăng lên, đây là một số kỹ thuật mở rộng quy mô:

  • Thêm bộ nhớ đệm API – Sử dụng bộ nhớ đệm như Redis để giảm tải cho việc thu thập dữ liệu.

  • Chạy nhiều tiến trình – Mở rộng quy mô trên các lõi/máy chủ bằng gunicorn.

  • Cạo giảm tải – Di chuyển máy cạp cho các công nhân như Celery hoặc RabbitMQ.

  • Sử dụng dịch vụ cạo – Tận dụng các API quét như Scrapfly hoặc ScraperAPI.

  • Tối ưu hóa máy cạp – Đảm bảo người dọn dẹp hoạt động hiệu quả và tránh bị cấm.

  • Thêm cơ sở dữ liệu – Lưu trữ dữ liệu đã được thu thập trong cơ sở dữ liệu để phân tích thêm.

Kết luận

Trong hướng dẫn này, chúng tôi đã xây dựng API quét web bằng Python bằng FastAPI. Những điểm mấu chốt là:

  • FastAPI cung cấp một khuôn khổ tuyệt vời để thu thập các API.

  • Chúng tôi có thể tạo các trình dọn dẹp để tìm nạp dữ liệu theo yêu cầu.

  • Bộ nhớ đệm và webhooks giúp khắc phục các hạn chế về việc thu thập dữ liệu.

  • Có nhiều chiến lược tối ưu hóa và mở rộng quy mô khi lưu lượng truy cập tăng lên.

API quét mở khóa kho dữ liệu dồi dào trên các trang web. Bằng cách tạo API thay vì quét tĩnh, chúng tôi có thể cung cấp dữ liệu tùy chỉnh, có độ trễ thấp trên quy mô lớn.

tags:

Tham gia vào cuộc đối thoại

Chúng tôi sẽ không công khai email của bạn. Các ô đánh dấu * là bắt buộc *