Skip to main content

Storing Data

Python Requests/BS4 Beginners Series Part 3: Storing Data

In Part 1 and Part 2 of this Python Requests/BeautifulSoup 6-Part Beginner Series, we learned how to build a basic web scraper and extract data from websites, as well as how to clean the scraped data.

In Part 3, we'll explore various methods for saving the data in formats suitable for common use cases. We'll cover saving data to JSON files, as well as storing the data in databases or AWS S3 buckets.


Python Requests/BeautifulSoup 6-Part Beginner Series

  • Part 1: Basic Python Requests/BeautifulSoup Scraper - We'll go over the basics of scraping with Python, and build our first Python scraper. (Part 1)

  • Part 2: Cleaning Dirty Data & Dealing With Edge Cases - Web data can be messy, unstructured, and have lots of edge cases. In this tutorial we'll make our scraper robust to these edge cases, using data classes and data cleaning pipelines. (Part 2)

  • Part 3: Storing Data in AWS S3, MySQL & Postgres DBs - There are many different ways we can store the data that we scrape from databases, CSV files to JSON format, and S3 buckets. We'll explore several different ways we can store the data and talk about their pros, and cons and in which situations you would use them. (Part 3)

  • Part 4: Managing Retries & Concurrency - Make our scraper more robust and scalable by handling failed requests and using concurrency. (Part 4)

  • Part 5: Faking User-Agents & Browser Headers - Make our scraper production ready by using fake user agents & browser headers to make our scrapers look more like real users. (Part 5)

  • Part 6: Using Proxies To Avoid Getting Blocked - Explore how to use proxies to bypass anti-bot systems by hiding your real IP address and location. (Part 6)

The code for this project is available on GitHub.


Saving Data to a JSON File

💡Note: All methods for storing data in JSON files, databases, or S3 buckets will be implemented within the ProductDataPipeline class, which was created in Part 2 of this series.

To save data to the JSON file, we’ll create a method called save_to_json(). The method performs the following actions.

It iterates over each product in the products_to_save list. For each product, it calls the asdict() function to convert it into a dictionary. The resulting dictionaries are appended to a list called json_data.

Open the JSON file specified by self.json_filename in read mode. If the file exists, it uses json.load() to load the existing data into the existing_data list. Otherwise, it initializes existing_data as an empty list.

The json_data list containing the newly converted dictionaries is appended to the end of the existing_data list, combining both datasets.

Finally, the method opens the JSON file in write mode and uses json.dump() to write the combined data (now stored in existing_data) back to the file.

import os
import json
from dataclasses import dataclass, field, fields, InitVar, asdict

class ProductDataPipeline:
"""
Previous code
"""

def save_to_json(self):
products_to_save = []
json_data = []
products_to_save.extend(self.storage_queue)

if not products_to_save:
return
for product in products_to_save:
json_data.append(asdict(product))
try:
with open(self.json_filename, mode="r", encoding="utf-8") as output_file:
existing_data = json.load(output_file)
except FileNotFoundError:
existing_data = []
existing_data.extend(json_data)

with open(self.json_filename, mode="w", encoding="utf-8") as output_file:
json.dump(existing_data, output_file, indent=2)

Here’s the JSON file:

Storing Data - Saving data to a JSON file - json_file


Saving Data to Amazon S3 Storage

We've already covered how to export the data to a CSV file in Part 2 of this series. Nonetheless, we've also included the code for that, which you can find in the last section. Now, let's save the created CSV file directly to an Amazon S3 bucket (you'll need to have one set up already).

If you don't have a bucket yet, you can follow the instructions on how to set one up.

Once you’ve a bucket, we'll start by installing Boto3, an external Python library created by Amazon to help with connecting to your S3 bucket.

pip install boto3

To get started, set up the necessary credentials to connect to Amazon S3. Next, the method creates an S3 client using the Boto3 library. It then uses the put_object method of the S3 client to upload the file "product_data.csv" to the specified S3 bucket (self.aws_s3_bucket) with the Key "chocolate_data.csv".

Within the bucket, the Key serves as the file name that would be used in the Amazon S3. The data to be uploaded is defined in the Body parameter.

In this case, Body=open("product_data.csv", "rb") opens the file in read-binary mode and sends its contents to S3. After the upload, the code checks the status of the operation to ensure success.

import boto3

class ProductDataPipeline:
"""
Previous code
"""

def save_to_s3_bucket(self):
aws_access_key_id = "YOUR_ACCESS_KEY"
aws_secret_access_key = "YOUR_SECRET_KEY"

s3_client = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)

response = s3_client.put_object(
Bucket=self.aws_s3_bucket,
Key="chocolate_data.csv",
Body=open("product_data.csv", "rb"),
)

status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

if status == 200:
print(f"Successfully put on S3 bucket!")
else:
print(f"Unsuccessful operation: Status - {status}")

Obviously, you’ll need to replace the aws_key and aws_secret with your own Amazon Key and Secret. Also, pass your bucket name and correct file path.

Here’s the snapshot showing that the file has been uploaded successfully:

Storing Data - Saving Data to Amazon S3 Storage - aws_s3_data

Note: When saving data to AWS S3, employ delayed file uploads. This means that the file is first temporarily saved locally on the machine running the scraper, and then uploaded to AWS S3 once the scraper's job is complete.


Saving Data to MySQL Database

Here, we'll show you how to save data to MySQL databases. To follow along, we assume you already have a database named chocolatedb set up.

Downloading MySQL

For setting up a MySQL database, check out the following resources:

Creating MySQL Table

We assume you already have a database set up and a table called chocolate_products in your database. If not, you can log in to your database and run the following command to create the table:

CREATE TABLE IF NOT EXISTS chocolate_products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
price_gb VARCHAR(255),
price_usd VARCHAR(255),
url TEXT
);

Connecting Scraper To MySQL Database

The first step is to establish a connection to your MySQL database and the specific table where you'll store the scraped data. To interact with your MySQL database from Python, you'll need the appropriate MySQL connector. You can easily install it using pip:

pip install mysql-connector-python

To establish a connection with a MySQL server, you'll need the connect() function from the mysql.connector module. This function takes parameters such as host, user, and password, and it returns a MySQLConnection object, which is stored in a variable called connection. This variable will be used to interact with your MySQL server.

Since you already have a database and just need to connect to it, you can pass an additional parameter called database to the connect() function.

import mysql.connector
from dataclasses import dataclass, field, fields, InitVar, asdict

class ProductDataPipeline:
"""
Previous code
"""

def save_to_mysql(self):
products_to_save = []
products_to_save.extend(self.storage_queue)

if not products_to_save:
return
self.connection = mysql.connector.connect(
host="localhost",
user="root",
password="Admin123@",
database=self.mysql_db_name,
)

# Create cursor object
with self.connection.cursor() as cursor:
for product in products_to_save:
item = asdict(product)
cursor.execute(
""" insert into chocolate_products ( name, price_gb, price_usd, url) values (%s,%s,%s,%s)""",
(item["name"], item["price_gb"], item["price_usd"], item["url"]),
)
self.connection.commit()

To insert data into the chocolate_products table, you'll need to write an INSERT INTO query as a string and pass it to the cursor.execute() method. This method accepts MySQL queries and executes them on the connected database.

Notice the connection.commit() statement at the end of the code. By default, the MySQL connector does not automatically commit transactions. In MySQL, modifications made within a transaction are only applied to the database when you explicitly use a COMMIT command. Remember to call this method after each transaction to ensure that your changes are reflected in the actual table.

Here’s the snapshot of the data stored in the MySQL database:

Storing Data - Saving Data to MySQL and PostgreSQL Databases - Saving data to a MySQL database - mysql_data


Saving Data to Postgres Database

Here, we'll show you how to save data to Postgres databases. To follow along, we assume you already have a database named chocolatedb set up.

Downloading Postgres

For setting up a Postgres database, check out the following resources:

Creating Postgres Table

As mentioned earlier, we're assuming you’ve a Postgres database set up and a table named chocolate_products already exists in your database. If not, please log in to your Postgres database and execute the following command to create the table:

CREATE TABLE IF NOT EXISTS chocolate_products (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
price_gb VARCHAR(255),
price_usd VARCHAR(255)
url TEXT
);

Connecting Scraper To Postgres Database

To save data to a PostgreSQL database, the primary step is to update how the connection is created. To do so we’ll install the Python package psycopg2.

pip install psycopg2

And update the connect function.

import psycopg2
from dataclasses import dataclass, field, fields, InitVar, asdict

class ProductDataPipeline:
"""
Previous code
"""

def save_to_postgresql(self):
products_to_save = []
products_to_save.extend(self.storage_queue)

if not products_to_save:
return
self.connection = psycopg2.connect(
host="localhost",
database=self.postgre_db_name,
user="postgres",
password="mypass@123",
)

with self.connection.cursor() as cursor:
for product in products_to_save:
item = asdict(product)
cursor.execute(
""" insert into chocolate_products ( name, price_gb, price_usd, url) values (%s,%s,%s,%s)""",
(item["name"], item["price_gb"], item["price_usd"], item["url"]),
)
self.connection.commit()

Here’s the snapshot of the data stored in the PostgreSQL database:

Storing Data - Saving Data to MySQL and PostgreSQL Databases - Saving data to a PostgreSQL database - postgresql_data


Complete Code

When you execute the code, it will create two files, product_data.csv and product_data.json. It will then store the data in MySQL, PostgreSQL, and the Amazon S3 bucket.

Execute the following command to see the data in your MySQL and PostgreSQL databases:

select * from chocolate_products;

Here’s the complete code:

import os
import csv
import json
import time
import requests
import boto3
import psycopg2
import mysql.connector
from bs4 import BeautifulSoup
from dataclasses import dataclass, field, fields, InitVar, asdict

@dataclass
class Product:
name: str = ""
price_string: InitVar[str] = ""
price_gb: float = field(init=False)
price_usd: float = field(init=False)
url: str = ""

def __post_init__(self, price_string):
self.name = self.clean_name()
self.price_gb = self.clean_price(price_string)
self.price_usd = self.convert_price_to_usd()
self.url = self.create_absolute_url()

def clean_name(self):
if self.name == "":
return "missing"
return self.name.strip()

def clean_price(self, price_string):
price_string = price_string.strip()
price_string = price_string.replace("Sale price£", "")
price_string = price_string.replace("Sale priceFrom £", "")
if price_string == "":
return 0.0
return float(price_string)

def convert_price_to_usd(self):
return round(self.price_gb * 1.21, 2)

def create_absolute_url(self):
if self.url == "":
return "missing"
return "https://www.chocolate.co.uk" + self.url

class ProductDataPipeline:
def __init__(
self,
csv_filename="",
json_filename="",
mysql_db_name="",
postgre_db_name="",
aws_s3_bucket="",
storage_queue_limit=5,
):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.json_filename = json_filename
self.mysql_db_name = mysql_db_name
self.postgre_db_name = postgre_db_name
self.aws_s3_bucket = aws_s3_bucket

def save_to_csv(self):
products_to_save = []
products_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not products_to_save:
return
keys = [field.name for field in fields(products_to_save[0])]
file_exists = (
os.path.isfile(self.csv_filename) and os.path.getsize(
self.csv_filename) > 0
)

with open(
self.csv_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()
for product in products_to_save:
writer.writerow(asdict(product))

def save_to_json(self):
products_to_save = []
json_data = []
products_to_save.extend(self.storage_queue)

if not products_to_save:
return
for product in products_to_save:
json_data.append(asdict(product))
try:
with open(self.json_filename, mode="r", encoding="utf-8") as output_file:
existing_data = json.load(output_file)
except FileNotFoundError:
existing_data = []
existing_data.extend(json_data)

with open(self.json_filename, mode="w", encoding="utf-8") as output_file:
json.dump(existing_data, output_file, indent=2)

def save_to_mysql(self):
products_to_save = []
products_to_save.extend(self.storage_queue)

if not products_to_save:
return
self.connection = mysql.connector.connect(
host="localhost",
user="root",
password="mypass@123",
database=self.mysql_db_name,
)

# Create cursor object

with self.connection.cursor() as cursor:
for product in products_to_save:
item = asdict(product)
cursor.execute(
""" insert into chocolate_products ( name, price_gb, price_usd, url) values (%s,%s,%s,%s)""",
(item["name"], item["price_gb"],
item["price_usd"], item["url"]),
)
self.connection.commit()

def save_to_postgresql(self):
products_to_save = []
products_to_save.extend(self.storage_queue)

if not products_to_save:
return
self.connection = psycopg2.connect(
host="localhost",
database=self.postgre_db_name,
user="postgres",
password="mypass@123",
)

with self.connection.cursor() as cursor:
for product in products_to_save:
item = asdict(product)
cursor.execute(
""" insert into chocolate_products ( name, price_gb, price_usd, url) values (%s,%s,%s,%s)""",
(item["name"], item["price_gb"],
item["price_usd"], item["url"]),
)
self.connection.commit()

def save_to_s3_bucket(self):
aws_access_key_id = "YOUR_ACCESS_KEY"
aws_secret_access_key = "YOUR_SECRET_KEY"

s3_client = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)

response = s3_client.put_object(
Bucket=self.aws_s3_bucket,
Key="chocolate_data.csv",
Body=open("product_data.csv", "rb"),
)

status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

if status == 200:
print(f"Successfully put on S3 bucket!")
else:
print(f"Unsuccessful operation: Status - {status}")

def clean_raw_product(self, scraped_data):
return Product(
name=scraped_data.get("name", ""),
price_string=scraped_data.get("price", ""),
url=scraped_data.get("url", ""),
)

def is_duplicate(self, product_data):
if product_data.name in self.names_seen:
print(f"Duplicate item found: {product_data.name}. Item dropped.")
return True
self.names_seen.append(product_data.name)
return False

def add_product(self, scraped_data):
product = self.clean_raw_product(scraped_data)
if self.is_duplicate(product) == False:
self.storage_queue.append(product)
if len(self.storage_queue) >= self.storage_queue_limit:
self.save_to_json()
self.save_to_mysql()
self.save_to_postgresql()
self.save_to_csv()

def close_pipeline(self):
if len(self.storage_queue) > 0:
self.save_to_json()
self.save_to_mysql()
self.save_to_postgresql()
self.save_to_csv()

list_of_urls = [
"https://www.chocolate.co.uk/collections/all",
]

def start_scrape():

# Loop Through a List of URLs

for url in list_of_urls:

# Send Request

response = requests.get(url)

if response.status_code == 200:

# Parse Data

soup = BeautifulSoup(response.content, "html.parser")
products = soup.select("product-item")
for product in products:
name = product.select(
"a.product-item-meta__title")[0].get_text()
price = (
product.select("span.price")[0]
.get_text()
.replace("\nSale price£", "")
)
url = product.select("div.product-item-meta a")[0]["href"]

# Add To Data Pipeline

data_pipeline.add_product(
{"name": name, "price": price, "url": url})
# Next Page

next_page = soup.select('a[rel="next"]')
if len(next_page) > 0:
list_of_urls.append(
"https://www.chocolate.co.uk" + next_page[0]["href"]
)

if __name__ == "__main__":
data_pipeline = ProductDataPipeline(
csv_filename="product_data.csv",
json_filename="product_data.json",
mysql_db_name="chocolatedb",
postgre_db_name="chocolatedb",
aws_s3_bucket="chocolate-products",
)
start_scrape()
data_pipeline.close_pipeline()
print("Congratulations! Data saved successfully on MySQL, PostgreSQL, and JSON.")
time.sleep(3)
data_pipeline.save_to_s3_bucket()

Next Steps

We hope you now have a good understanding of how to save the data you've scraped into the file or database you need! If you’ve any questions leave them in the comments below and we'll do our best to help out!

If you would like the code from this example please check out on Github here!

The next tutorial covers how to make our scraper more robust and scalable by handling failed requests and using concurrency.