Automating Real-Time Data Pipelines: Integrating Weather and Flight APIs with Python and MySQL

Learn how to build a real-time data pipeline using Python to fetch weather and flight information and store it in a MySQL database.

By Janmajay Kumar | October 2024

Introduction

In this article, we'll walk you through the steps of building an automated data pipeline that integrates data from OpenWeather API and AeroDataBox API to store real-time weather and flight information into a MySQL database. Along the way, we'll learn about the importance of data pipelines in ensuring smooth and scalable data processing workflows and how to secure sensitive credentials such as API keys and database passwords.

Google Cloud Architecture

The Importance of Database Pipelines in Modern Data Engineering

In today’s data-driven world, businesses and organizations rely on vast amounts of data to make informed decisions, optimize operations, and predict future trends. This data comes from multiple sources: customer transactions, IoT devices, weather APIs, flight information, and more. The challenge lies not only in collecting this data but also in storing, processing, and analyzing it effectively.

This is where database pipelines come into play. A well-structured database pipeline ensures data flows seamlessly from source to storage and then to analysis. It connects different systems and data sources, ensuring the right information is available in the right format at the right time.

What is a Database Pipeline?

A database pipeline is a series of steps that data goes through from collection to storage in a database, and then further to processing or analysis. The pipeline typically involves multiple stages:

A pipeline’s primary function is to automate and streamline the flow of data, minimizing manual intervention and ensuring consistency across systems.

Building a Real-Time Data Pipeline

Let's dive into building a real-time data pipeline that fetches weather and flight information from APIs and stores it in a MySQL database. We'll use Python for data collection and transformation, and MySQL for data storage. Here's an overview of the pipeline:

Architecture Overview

The real-time data pipeline consists of:

1: Data Collection

Fetching City Data from Wikipedia Using BeautifulSoup, we scrape the coordinates and country for each city from Wikipedia:


from bs4 import BeautifulSoup
import requests
from lat_lon_parser import parse
import pandas as pd
def cities_data(cities):
    city_data = []
    for city in cities:
        url = f"https://www.wikipedia.org/wiki/{city}"
        response = requests.get(url)
        city_soup = BeautifulSoup(response.content, 'html.parser')
        latitude = city_soup.find(class_="latitude").get_text()
        longitude = city_soup.find(class_="longitude").get_text()
        country = city_soup.find(class_="infobox-data").get_text()
        city_data.append({
            "City": city,
            "Country": country,
            "Latitude": parse(latitude),
            "Longitude": parse(longitude),
        })
    return pd.DataFrame(city_data)
    

2. Fetching Weather Data

from OpenWeather API We use the OpenWeather API to retrieve weather data like temperature, humidity, and weather conditions for the cities. The following is a brief sketch of the code. (Full code one can find on GitHub)


def get_weather_data(lat, lon, api_key):
    url = f"https://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={api_key}&units=metric"
    response = requests.get(url)
    if response.status_code == 200:
       weather_data = response.json()
       return weather_data
    else:
       print(f"Failed to retrieve data: {response.status_code}")
       return None

3. Fetching Flight Data from AeroDataBox API

The AeroDataBox API provides flight information such as scheduled arrivals for specific airports. The following is a brief sketch of the code. (Full code one can find on GitHub)


    def get_flight_data(icao_list, api_key):
    flight_items = []
    for icao in icao_list:
        url = f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{icao}/2024-10-10T00:00/2024-10-10T23:59"
        headers = {
            'x-rapidapi-key': api_key
        }
        response = requests.get(url, headers=headers)
        flights_json = response.json()
        
        for item in flights_json["arrivals"]:
            flight_items.append({
                "arrival_airport_icao": icao,
                "flight_number": item.get("number", None),
                "scheduled_arrival_time": item["arrival"]["scheduledTime"].get("local", None)
            })
    return pd.DataFrame(flight_items)
        

4. Storing Data in MySQL Database

The retrieved data is stored in a MySQL database for future retrieval and analysis. We use SQLAlchemy to interact with the database. The following is a brief sketch of the code. (Full code one can find on GitHub)


from sqlalchemy import create_engine
def connect_database(config):
    connect_string = f'mysql+pymysql://{config["user"]}:{config["password"]}@{config["host"]}:{config["port"]}/{config["database"]}'
    engine = create_engine(connect_string)
    return engine
def push_to_database(df, table_name, engine):
    df.to_sql(table_name, con=engine, if_exists='append', index=False)    
     

Retrieving and Analyzing Data

Once the data is stored, it can be queried and analyzed from the database. Here’s how we retrieve the weather data for analysis:


            
def retrieve_from_db(table, engine):
    query = f"SELECT * FROM {table}"
    return pd.read_sql(query, con=engine)

engine = connect_database(config)
weather_df = retrieve_from_db('weather_data', engine)  
        

On can keep main workflow in a main.py file. The project also contains a main.py file, which serves as the central point for executing the code and controlling the various components of the data pipeline. One can maneuver this file to adjust what data is fetched, stored, or retrieved, depending on the desired output.

Automating with Cron job

To ensure the pipeline runs automatically at regular intervals, I used a cron job for scheduling. This allows the system to fetch weather and flight data, store it in the database, and send notifications — without manual input. For example, to run the script daily at midnight, one can set up the following cron job:


              # Open the crontab editor  
              crontab -e

              # Add this line to schedule the Python script
              0 0 * * * /path/to/your/project/automation.sh
            

and automation.sh look like the following, one can tailor it according to the need :


#!/bin/bash

# Navigate to your project directory
#cd /path/to/your/project
# Activate your virtual environment if needed
# source /path/to/venv/bin/activate

# Log file location
LOG_FILE="automation.log"

# Execute your Python script for weather data
echo "Fetching weather data..." >> $LOG_FILE
python3 main.py --weather >> $LOG_FILE 2>&1
echo "Weather data fetched successfully!" >> $LOG_FILE

# Execute your Python script for flight data
echo "Fetching flight data..." >> $LOG_FILE
python3 main.py --flights >> $LOG_FILE 2>&1
echo "Flight data fetched successfully!" >> $LOG_FILE

# Send a desktop notification (optional, for Linux)
notify-send "Automation Complete" "Weather and Flight data fetching completed."

# Optionally, send an email notification if required

For further details, you can explore the full project on my GitHub

Database Schema Overview.

The diagram above illustrates the database schema used in my data pipeline project. It highlights how different tables, such as weather, flights, airports, and cities, are structured and interconnected:

Key Takeaways: