Introduction
In this article, we'll walk you through the steps of building an automated data pipeline that integrates data from OpenWeather API and AeroDataBox API to store real-time weather and flight information into a MySQL database. Along the way, we'll learn about the importance of data pipelines in ensuring smooth and scalable data processing workflows and how to secure sensitive credentials such as API keys and database passwords.
The Importance of Database Pipelines in Modern Data Engineering
In today’s data-driven world, businesses and organizations rely on vast amounts of data to make informed decisions, optimize operations, and predict future trends. This data comes from multiple sources: customer transactions, IoT devices, weather APIs, flight information, and more. The challenge lies not only in collecting this data but also in storing, processing, and analyzing it effectively.
This is where database pipelines come into play. A well-structured database pipeline ensures data flows seamlessly from source to storage and then to analysis. It connects different systems and data sources, ensuring the right information is available in the right format at the right time.
What is a Database Pipeline?
A database pipeline is a series of steps that data goes through from collection to storage in a database, and then further to processing or analysis. The pipeline typically involves multiple stages:
- Data Collection: Fetching data from various sources like APIs, internal systems, or third-party services.
- Data Transformation: Cleaning, formatting, and preparing the data for storage, and use.
- Data Storage: Storing the processed data in a database or data warehouse.
- Data Retrieval: Querying the database to retrieve stored data for analysis, reporting, or decision-making.
Building a Real-Time Data Pipeline
Let's dive into building a real-time data pipeline that fetches weather and flight information from APIs and stores it in a MySQL database. We'll use Python for data collection and transformation, and MySQL for data storage. Here's an overview of the pipeline:
Architecture Overview
The real-time data pipeline consists of:
- Google Pub/Sub - Event ingestion & messaging
- Cloud Dataflow - Stream processing & transformations
- BigQuery - Data storage & analytics
- Cloud Functions - Event-driven triggers
1: Data Collection
Fetching City Data from Wikipedia Using BeautifulSoup, we scrape the coordinates and country for each city from Wikipedia:
from bs4 import BeautifulSoup
import requests
from lat_lon_parser import parse
import pandas as pd
def cities_data(cities):
city_data = []
for city in cities:
url = f"https://www.wikipedia.org/wiki/{city}"
response = requests.get(url)
city_soup = BeautifulSoup(response.content, 'html.parser')
latitude = city_soup.find(class_="latitude").get_text()
longitude = city_soup.find(class_="longitude").get_text()
country = city_soup.find(class_="infobox-data").get_text()
city_data.append({
"City": city,
"Country": country,
"Latitude": parse(latitude),
"Longitude": parse(longitude),
})
return pd.DataFrame(city_data)
2. Fetching Weather Data
from OpenWeather API We use the OpenWeather API to retrieve weather data like temperature, humidity, and weather conditions for the cities. The following is a brief sketch of the code. (Full code one can find on GitHub)
def get_weather_data(lat, lon, api_key):
url = f"https://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={api_key}&units=metric"
response = requests.get(url)
if response.status_code == 200:
weather_data = response.json()
return weather_data
else:
print(f"Failed to retrieve data: {response.status_code}")
return None
3. Fetching Flight Data from AeroDataBox API
The AeroDataBox API provides flight information such as scheduled arrivals for specific airports. The following is a brief sketch of the code. (Full code one can find on GitHub)
def get_flight_data(icao_list, api_key):
flight_items = []
for icao in icao_list:
url = f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{icao}/2024-10-10T00:00/2024-10-10T23:59"
headers = {
'x-rapidapi-key': api_key
}
response = requests.get(url, headers=headers)
flights_json = response.json()
for item in flights_json["arrivals"]:
flight_items.append({
"arrival_airport_icao": icao,
"flight_number": item.get("number", None),
"scheduled_arrival_time": item["arrival"]["scheduledTime"].get("local", None)
})
return pd.DataFrame(flight_items)
4. Storing Data in MySQL Database
The retrieved data is stored in a MySQL database for future retrieval and analysis. We use SQLAlchemy to interact with the database. The following is a brief sketch of the code. (Full code one can find on GitHub)
from sqlalchemy import create_engine
def connect_database(config):
connect_string = f'mysql+pymysql://{config["user"]}:{config["password"]}@{config["host"]}:{config["port"]}/{config["database"]}'
engine = create_engine(connect_string)
return engine
def push_to_database(df, table_name, engine):
df.to_sql(table_name, con=engine, if_exists='append', index=False)
Retrieving and Analyzing Data
Once the data is stored, it can be queried and analyzed from the database. Here’s how we retrieve the weather data for analysis:
def retrieve_from_db(table, engine):
query = f"SELECT * FROM {table}"
return pd.read_sql(query, con=engine)
engine = connect_database(config)
weather_df = retrieve_from_db('weather_data', engine)
On can keep main workflow in a main.py file. The project also contains a main.py file, which serves as the central point for executing the code and controlling the various components of the data pipeline. One can maneuver this file to adjust what data is fetched, stored, or retrieved, depending on the desired output.
Automating with Cron job
To ensure the pipeline runs automatically at regular intervals, I used a cron job for scheduling. This allows the system to fetch weather and flight data, store it in the database, and send notifications — without manual input. For example, to run the script daily at midnight, one can set up the following cron job:
# Open the crontab editor
crontab -e
# Add this line to schedule the Python script
0 0 * * * /path/to/your/project/automation.sh
and automation.sh look like the following, one can tailor it according to the need :
#!/bin/bash
# Navigate to your project directory
#cd /path/to/your/project
# Activate your virtual environment if needed
# source /path/to/venv/bin/activate
# Log file location
LOG_FILE="automation.log"
# Execute your Python script for weather data
echo "Fetching weather data..." >> $LOG_FILE
python3 main.py --weather >> $LOG_FILE 2>&1
echo "Weather data fetched successfully!" >> $LOG_FILE
# Execute your Python script for flight data
echo "Fetching flight data..." >> $LOG_FILE
python3 main.py --flights >> $LOG_FILE 2>&1
echo "Flight data fetched successfully!" >> $LOG_FILE
# Send a desktop notification (optional, for Linux)
notify-send "Automation Complete" "Weather and Flight data fetching completed."
# Optionally, send an email notification if required
For further details, you can explore the full project on my GitHub
Database Schema Overview.
The diagram above illustrates the database schema used in my data pipeline project. It highlights how different tables, such as weather, flights, airports, and cities, are structured and interconnected:
- cities: Contains information about cities, including their names, countries, and coordinates.
- airports: Stores details about airports, such as their ICAO codes and locations.
- weather_data: Holds weather information for each city, including temperature, humidity, and weather conditions.
- flight_data: Contains flight data, such as flight numbers and scheduled arrival times.
Key Takeaways:
- Database pipelines are essential for managing real-time data and ensuring seamless data flow.
- Automating data pipelines can save time, reduce errors, and improve data accuracy.
- Integrating APIs with Python and MySQL can help fetch, store, and analyze data efficiently.
- Securing sensitive information like API keys and database credentials is crucial for data privacy and security.
- Using cron jobs for scheduling can automate data pipelines and ensure timely data updates.
Conclusion
Database pipelines are indispensable in today’s data-centric world, especially when they are automated and designed to handle real-time data. In this project, we demonstrated how to integrate weather and flight APIs into a MySQL database, automate the process using cron jobs, and ensure seamless data flow.
By automating data pipelines, you not only eliminate manual tasks but also ensure that your data is always up to date, enabling accurate and timely data-driven decisions. Whether you’re managing weather data, flight schedules, or any other real-time information, building a well-structured and automated pipeline can streamline the process and unlock the full potential of your data.
With proper database pipeline infrastructure, organizations can scale their data operations, improve efficiency, and make more accurate data-driven decisions. For further details, explore the full project on my GitHub