Skip to content

Using Apache Airflow to Build a Robust Pipeline for Scraped Data

As web scraping projects grow in scale, the amount of unstructured data they generate can quickly get out of hand. Without a proper pipeline, critical data can end up lost or disorganized.

In this comprehensive guide, we’ll explore how to leverage Apache Airflow to build resilient pipelines for web scraped data.

Why Airflow is a Game Changer for Data Pipelines

Apache Airflow provides a framework to programmatically author, schedule and monitor complex workflows. According to a recent survey by Astronomer, 78% of data teams rely on Airflow for ETL and data integration.

Here are some of the key advantages Airflow delivers:

Visual Pipeline authoring

Airflow uses Python to define Directed Acyclic Graphs (DAGs) which provide a birds-eye view of the workflow pipeline. Nodes represent tasks, while edges show dependencies. Visually constructing pipelines makes the logic easy to comprehend.

Dynamic scheduling with smart retry logic

Tasks can run on standard cron schedules, while also supporting dynamic date-based scheduling. Failed tasks are automatically retried using exponential backoff. The scheduler is highly configurable.

Scalable distributed execution

Airflow leverages Celery to distribute tasks across many workers in parallel. Pipelines effortlessly scale to handle millions of tasks. Asana saw a 5x speedup by switching to Airflow.

Advanced monitoring and alerting

Airflow has built-in charts, logs and task views to monitor pipelines. It can push metadata to enterprise APM tools. Task success and failure alerts can be configured using webhooks or email.

Extensible and Open

Airflow has a modular plug-in architecture. The rich Python API makes it easy to customize and extend capabilities. The open-source community contributes operators for almost every tool.

For complex web scraping projects, Airflow addresses many common pain points out of the box through its enterprise-grade workflow orchestration capabilities. Next let‘s walk through a simple web scraping example.

Scraping Product Data

To demonstrate core concepts, we’ll build a scraper for an e-commerce site using Python and Requests. While basic, it illustrates the key tasks in a web scraping pipeline.

Let‘s start by scraping data from a single page:

import requests

URL = ‘https://www.ecommerce.com/p/1‘

def scrape_page(url):
  response = requests.get(url)

  # Parse HTML and extract data into dict 

  return data

page_data = scrape_page(URL)
print(page_data)

This makes a requests to the URL, parses the HTML, extracts fields like title and price into a Python dict, and returns the structured data.

With some additional logic, we can loop through multiple product pages:

URLS = [...] # List of 1000 urls 

all_data = []

for url in URLS:
  data = scrape_page(url)
  all_data.append(data) 

print(all_data)

This allows batch scraping of an entire catalog. But even for a few thousand pages, issues can arise:

  • Slow performance – Requests are made sequentially, limited by network I/O
  • Fragility – Any single failure breaks the entire loop
  • Duplicates – No deduplication if a page is scraped multiple times
  • No work tracking – Can‘t identify pages that still need scraping

Without robust orchestration, these problems are amplified at scale. Airflow provides the solution.

Creating an Airflow DAG for Web Scraping

In Airflow, pipelines are defined through DAGs (Directed Acyclic Graphs). A DAG outlines the steps and their dependencies as code.

We‘ll create a DAG to:

  1. Fetch URL list – Get CSV of product URLs from S3
  2. Scrape Page – Download and parse each URL
  3. Store Data – Insert JSON output into PostgreSQL

First we define some helper functions:

# extractor.py
import requests, json
from bs4 import BeautifulSoup

def scrape(url):
  """Scrape page and return JSON data"""

  response = requests.get(url)
  soup = BeautifulSoup(response.text, ‘html.parser‘)

  data = {
   "title": soup.find("h1").text,  
   "price": soup.find("div", class_="price").text,
   # ... extract other fields
  }

  return json.dumps(data)

# loader.py  
import psycopg2

def insert_row(data):
  """Insert JSON data into Postgres"""

  conn = psycopg2.connect(DATABASE_URI)
  cur = conn.cursor() 

  # Use psycopg2 to insert JSON data 

  conn.commit()
  cur.close()

These contain the main business logic for scraping and data loading.

Now we import them into our DAG file:

# scraper_dag.py

from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime

from extractor import scrape
from loader import insert_row

default_args = {
  ‘owner‘: ‘My Name‘,
  ‘depends_on_past‘: False,
  ‘email‘: [‘[email protected]‘],
  ‘retries‘: 2,
  ‘retry_delay‘: timedelta(minutes=5)
}

dag = DAG(
  ‘webscraper‘,
  default_args=default_args,
  start_date=datetime(2019, 1, 1),
  schedule_interval=‘@daily‘,   
)

fetch_task = PythonOperator(
  task_id=‘fetch‘,
  python_callable=fetch_url_list, # Loads CSV from S3 
  dag=dag
)

scrape_task = PythonOperator(
  task_id=‘scrape‘,
  python_callable=scrape,
  op_kwargs={‘url‘: ‘{{ ti.xcom_pull(task_ids="fetch") }}‘},  
  dag=dag,
)

store_task = PythonOperator(
  task_id=‘store‘,
  python_callable=insert_row,
  op_kwargs={‘data‘: ‘{{ ti.xcom_pull(task_ids="scrape") }}‘},
  dag=dag,
)

fetch_task >> scrape_task >> store_task

This DAG encapsulates the workflow by:

  • Instantiating a PythonOperator for each step
  • Passing data between tasks using XComs
  • Setting directional dependencies between tasks

When run, Airflow will:

  1. Call fetch_url_list to load the CSV of URLs
  2. Parse the CSV and send each URL to the scrape task
  3. Take the JSON output of scraping and insert into Postgres using insert_row

This provides a robust, scalable pipeline for web scraping!

Visualizing the DAG

The Airflow UI provides a visual DAG representation:

Airflow DAG

We can check:

  • Number of task instances currently running
  • Overall progress and health
  • Logs and metadata
  • Success/failure breakdown

The pipeline logic is transparently documented. When issues occur, Airflow surfaces them immediately to be diagnosed and fixed.

Comparing Airflow to Other Workflow Tools

Airflow is commonly compared to tools like Apache Oozie and Azkaban. Here‘s how it stacks up for web scraping:

Tool Strengths Weaknesses
Airflow Python DAGs, pluggable architecture, great UI, scalability Steep learning curve, can get complex fast
Oozie Built-in Hadoop support, simple XML workflows Designed for batch, limited flexibility
Azkaban Simple key-value configs, great visualization Not as extensible, smaller community

For general purpose pipelines, Airflow provides more power and flexibility. The pluggable architecture makes it easy to slot in new data sources, tools, and processing logic as needed.

Scraping Data From Multiple Sources

A key strength of Airflow is supporting complex workflows that span multiple systems.

Let‘s look at an example DAG that scrapes two different sites, combines and aggregates the data:

from airflow import DAG
from airflow.operators import PythonOperator

# Scraping operators

scrape_site1 = PythonOperator(task_id=‘scrape_site1‘, ...) 
scrape_site2 = PythonOperator(task_id=‘scrape_site2‘, ...)

# Transformation operators

clean_data = PythonOperator(task_id=‘clean_data‘, ...)
combine_data = PythonOperator(task_id=‘combine_data‘, ...)

# Aggregate and load

calculate_stats = PythonOperator(task_id=‘stats‘, ...)
load_warehouse = PythonOperator(task_id=‘load_warehouse‘, ...)

# Dependencies

(scrape_site1, scrape_site2) >> clean_data >> combine_data >> calculate_stats >> load_warehouse 

This demonstrates a complex branching pipeline with aggregation logic. Airflow‘s code-based approach makes it straightforward to build multi-step workflows.

Moving to Production

While the example provided demonstrates core concepts, more care is needed when moving to production. Here are some best practices:

Error Handling

  • Set up SMTP/SMS/Slack alerts for task failures
  • Build custom Airflow plugins to implement retry policies
  • Perform ad-hoc scrapes through API/CLI when needed

Logging & Metrics

  • Stream logs to ElasticSearch for analysis
  • Record business metrics like number of rows scraped
  • Add sensors to check SLAs and data quality

Scaling Out

  • Run Airflow on Kubernetes for auto-scaling
  • Add queues and pools for different work priority levels
  • Split work across regions using multiple DAGs

Security

  • Enable authentication and RBAC
  • Encrypt credentials and sensitive data
  • Use private VPCs with firewall rules to isolate workers

Monitoring

  • Set up external APM tools like Datadog to track performance
  • Create custom metrics on queue lag, data processed, pipeline duration etc.

Testing

  • Leverage Airflow CLI to run one-off tests
  • Mock outputs to simulate different scenarios
  • Implement integration tests that validate pipeline correctness

These are just a few tips – entire books have been written on mastering Airflow in production!

The key is to incrementally evolve the pipeline’s robustness as your use case grows while leaning on Airflow‘s architecture every step of the way.

Conclusion

Apache Airflow is invaluable for creating resilient, scalable data pipelines. For web scraping projects, it provides an enterprise-grade orchestration framework out of the box.

Some of the key benefits we explored include:

  • Modeling complex workflows as code through Python DAGs
  • Dynamic task scheduling and retries
  • Distributed scaling to handle large workloads
  • Advanced monitoring capabilities
  • Flexible framework to incorporate new data sources and processing logic

Airflow has a bit of learning curve, but pays dividends in taming unwieldy scraping pipelines. If you found this guide useful, check out the Official Airflow Docs for many more examples of production-grade workflows. Feel free to reach out if you have any other questions!

Tags:

Join the conversation

Your email address will not be published. Required fields are marked *