Python Selenium Beginners Series Part 3: Storing Data in AWS S3, MySQL & Postgres DBs
In Part 1 and Part 2 of this Python Selenium 6-Part Beginner Series, we learned how to build a basic web scraper and extract data from websites, as well as how to clean the scraped data.
In Part 3, we'll explore various methods for saving the data in formats suitable for common use cases. We'll cover saving data to JSON files, as well as storing the data in databases or AWS S3 buckets.
- Saving Data to a JSON File
- Saving Data to Amazon S3 Storage
- Saving Data to MySQL Database
- Saving Data to Postgres Database
Python Selenium 6-Part Beginner Series
-
Part 1: Basic Python Selenium Scraper - We'll go over the basics of scraping with Python, and build our first Python scraper. Part 1
-
Part 2: Cleaning Dirty Data & Dealing With Edge Cases - Web data can be messy, unstructured, and have lots of edge cases. In this tutorial we'll make our scraper robust to these edge cases, using data classes and data cleaning pipelines. Part 2
-
Part 3: Storing Data in AWS S3, MySQL & Postgres DBs - There are many different ways we can store the data that we scrape from databases, CSV files to JSON format, and S3 buckets. We'll explore several different ways we can store the data and talk about their pros, and cons and in which situations you would use them. (This article)
-
Part 4: Managing Retries & Concurrency - Make our scraper more robust and scalable by handling failed requests and using concurrency. Part 4
-
Part 5: Faking User-Agents & Browser Headers - Make our scraper production ready by using fake user agents & browser headers to make our scrapers look more like real users. (Coming Soon)
-
Part 6: Using Proxies To Avoid Getting Blocked - Explore how to use proxies to bypass anti-bot systems by hiding your real IP address and location. (Coming Soon)
Saving Data to a JSON File
💡Note: All methods for storing data in JSON files, databases, or S3 buckets will be implemented within the ProductDataPipeline
class, which was created in Part 2 of this series.
To save data to the JSON file, we’ll create a method called save_to_json()
. The method performs the following actions.
It iterates over each product in the products_to_save
list. For each product, it calls the asdict()
function to convert it into a dictionary. The resulting dictionaries are appended to a list called json_data
.
Open the JSON file specified by self.json_filename
in read mode. If the file exists, it uses json.load()
to load the existing data into the existing_data
list. Otherwise, it initializes existing_data
as an empty list.
The json_data
list containing the newly converted dictionaries is appended to the end of the existing_data
list, combining both datasets.
Finally, the method opens the JSON file in write mode and uses json.dump()
to write the combined data (now stored in existing_data
) back to the file.
import os
import json
from dataclasses import dataclass, field, fields, InitVar, asdict
class ProductDataPipeline:
"""
Previous code
"""
def save_to_json(self):
products_to_save = []
json_data = []
products_to_save.extend(self.storage_queue)
if not products_to_save:
return
for product in products_to_save:
json_data.append(asdict(product))
try:
with open(self.json_filename, mode="r", encoding="utf-8") as output_file:
existing_data = json.load(output_file)
except FileNotFoundError:
existing_data = []
existing_data.extend(json_data)
with open(self.json_filename, mode="w", encoding="utf-8") as output_file:
json.dump(existing_data, output_file, indent=2)
Here’s the JSON file:
Saving Data to Amazon S3 Storage
We've already covered how to export the data to a CSV file in Part 2 of this series. Nonetheless, we've also included the code for that, which you can find in the last section. Now, let's save the created CSV file directly to an Amazon S3 bucket (you'll need to have one set up already).
If you don't have a bucket yet, you can follow the instructions on how to set one up.
Once you’ve a bucket, we'll start by installing Boto3
, an external Python library created by Amazon to help with connecting to your S3 bucket.
pip install boto3
To get started, set up the necessary credentials to connect to Amazon S3. Next, the method creates an S3 client using the Boto3 library. It then uses the put_object
method of the S3 client to upload the file "product_data.csv" to the specified S3 bucket (self.aws_s3_bucket
) with the Key
"chocolate_data.csv".
Within the bucket, the Key
serves as the file name that would be used in the Amazon S3. The data to be uploaded is defined in the Body
parameter.
In this case, Body=open("product_data.csv", "rb")
opens the file in read-binary mode and sends its contents to S3. After the upload, the code checks the status of the operation to ensure success.
import boto3
class ProductDataPipeline:
"""
Previous code
"""
def save_to_s3_bucket(self):
aws_access_key_id = "YOUR_ACCESS_KEY"
aws_secret_access_key = "YOUR_SECRET_KEY"
s3_client = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
response = s3_client.put_object(
Bucket=self.aws_s3_bucket,
Key="chocolate_data.csv",
Body=open("product_data.csv", "rb"),
)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
if status == 200:
print(f"Successfully put on S3 bucket!")
else:
print(f"Unsuccessful operation: Status - {status}")
Obviously, you’ll need to replace the aws_key
and aws_secret
with your own Amazon Key and Secret. Also, pass your bucket name and correct file path.
Here’s the snapshot showing that the file has been uploaded successfully:
Note: When saving data to AWS S3, employ delayed file uploads. This means that the file is first temporarily saved locally on the machine running the scraper, and then uploaded to AWS S3 once the scraper's job is complete.
Saving Data to MySQL Database
Here, we'll show you how to save data to MySQL databases. To follow along, we assume you already have a database named chocolatedb
set up.
Downloading MySQL
For setting up a MySQL database, check out the following resources:
Creating MySQL Table
We assume you already have a database set up and a table called chocolate_products
in your database. If not, you can log in to your database and run the following command to create the table:
CREATE TABLE IF NOT EXISTS chocolate_products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
price_gb VARCHAR(255),
price_usd VARCHAR(255),
url TEXT
);
Connecting Scraper To MySQL Database
The first step is to establish a connection to your MySQL database and the specific table where you'll store the scraped data. To interact with your MySQL database from Python, you'll need the appropriate MySQL connector. You can easily install it using pip
:
pip install mysql-connector-python
To establish a connection with a MySQL server, you'll need the connect()
function from the mysql.connector
module. This function takes parameters such as host
, user
, and password
, and it returns a MySQLConnection
object, which is stored in a variable called connection
. This variable will be used to interact with your MySQL server.
Since you already have a database and just need to connect to it, you can pass an additional parameter called database
to the connect()
function.
import mysql.connector
from dataclasses import dataclass, field, fields, InitVar, asdict
class ProductDataPipeline:
"""
Previous code
"""
def save_to_mysql(self):
products_to_save = []
products_to_save.extend(self.storage_queue)
if not products_to_save:
return
self.connection = mysql.connector.connect(
host="localhost",
user="root",
password="mypass@654",
database=self.mysql_db_name,
)
# Create cursor object
with self.connection.cursor() as cursor:
for product in products_to_save:
item = asdict(product)
cursor.execute(
""" insert into chocolate_products ( name, price_gb, price_usd, url) values (%s,%s,%s,%s)""",
(item["name"], item["price_gb"], item["price_usd"], item["url"]),
)
self.connection.commit()
To insert data into the chocolate_products
table, you'll need to write an INSERT INTO
query as a string and pass it to the cursor.execute()
method. This method accepts MySQL queries and executes them on the connected database.
Notice the connection.commit()
statement at the end of the code. By default, the MySQL connector does not automatically commit transactions. In MySQL, modifications made within a transaction are only applied to the database when you explicitly use a COMMIT
command. Remember to call this method after each transaction to ensure that your changes are reflected in the actual table.
Here’s the snapshot of the data stored in the MySQL database:
Saving Data to Postgres Database
Here, we'll show you how to save data to Postgres databases. To follow along, we assume you already have a database named chocolatedb
set up.
Downloading Postgres
For setting up a Postgres database, check out the following resources:
Creating Postgres Table
As mentioned earlier, we're assuming you’ve a Postgres database set up and a table named chocolate_products
already exists in your database. If not, please log in to your Postgres database and execute the following command to create the table:
CREATE TABLE IF NOT EXISTS chocolate_products (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
price_gb VARCHAR(255),
price_usd VARCHAR(255),
url TEXT
);
Connecting Scraper To Postgres Database
To save data to a PostgreSQL database, the primary step is to update how the connection is created. To do so we’ll install the Python package psycopg2
.
pip install psycopg2
And update the connect function.
import psycopg2
from dataclasses import dataclass, field, fields, InitVar, asdict
class ProductDataPipeline:
"""
Previous code
"""
def save_to_postgresql(self):
products_to_save = []
products_to_save.extend(self.storage_queue)
if not products_to_save:
return
self.connection = psycopg2.connect(
host="localhost",
database=self.postgre_db_name,
user="postgres",
password="mypass@654",
)
with self.connection.cursor() as cursor:
for product in products_to_save:
item = asdict(product)
cursor.execute(
""" insert into chocolate_products ( name, price_gb, price_usd, url) values (%s,%s,%s,%s)""",
(item["name"], item["price_gb"], item["price_usd"], item["url"]),
)
self.connection.commit()
Here’s the snapshot of the data stored in the PostgreSQL database:
Complete Code
When you execute the code, it will create two files, product_data.csv
and product_data.json
. It will then store the data in MySQL, PostgreSQL, and the Amazon S3 bucket.
Execute the following command to see the data in your MySQL and PostgreSQL databases:
select * from chocolate_products;
Here’s the complete code:
from selenium import webdriver
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from dataclasses import dataclass, field, fields, InitVar, asdict
import csv
import json
import time
import os
import boto3
import psycopg2
import mysql.connector
@dataclass
class Product:
name: str = ""
price_string: InitVar[str] = ""
price_gb: float = field(init=False)
price_usd: float = field(init=False)
url: str = ""
def __post_init__(self, price_string):
self.name = self.clean_name()
self.price_gb = self.clean_price(price_string)
self.price_usd = self.convert_price_to_usd()
self.url = self.create_absolute_url()
def clean_name(self):
if self.name == "":
return "missing"
return self.name.strip()
def clean_price(self, price_string):
price_string = price_string.strip()
price_string = price_string.replace("Sale price\n£", "")
price_string = price_string.replace("Sale price\nFrom £", "")
if price_string == "":
return 0.0
return float(price_string)
def convert_price_to_usd(self):
return round(self.price_gb * 1.21, 2)
def create_absolute_url(self):
if self.url == "":
return "missing"
return self.url
class ProductDataPipeline:
def __init__(
self,
csv_filename="",
json_filename="",
mysql_db_name="",
postgre_db_name="",
aws_s3_bucket="",
storage_queue_limit=5,
):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.json_filename = json_filename
self.mysql_db_name = mysql_db_name
self.postgre_db_name = postgre_db_name
self.aws_s3_bucket = aws_s3_bucket
self.data_stored = False # Flag to track if data is stored
def save_to_csv(self):
products_to_save = []
products_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not products_to_save:
return
keys = [field.name for field in fields(products_to_save[0])]
file_exists = (
os.path.isfile(self.csv_filename) and os.path.getsize(
self.csv_filename) > 0
)
with open(
self.csv_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for product in products_to_save:
writer.writerow(asdict(product))
def save_to_json(self):
products_to_save = []
json_data = []
products_to_save.extend(self.storage_queue)
if not products_to_save:
return
for product in products_to_save:
json_data.append(asdict(product))
try:
with open(self.json_filename, mode="r", encoding="utf-8") as output_file:
existing_data = json.load(output_file)
except FileNotFoundError:
existing_data = []
existing_data.extend(json_data)
with open(self.json_filename, mode="w", encoding="utf-8") as output_file:
json.dump(existing_data, output_file, indent=2)
def save_to_mysql(self):
products_to_save = []
products_to_save.extend(self.storage_queue)
if not products_to_save:
return
self.connection = mysql.connector.connect(
host="localhost",
user="root",
port="3306",
password="mypass@654",
database=self.mysql_db_name,
)
# Create cursor object
with self.connection.cursor() as cursor:
for product in products_to_save:
item = asdict(product)
cursor.execute(
""" insert into chocolate_products ( name, price_gb, price_usd, url) values (%s,%s,%s,%s)""",
(item["name"], item["price_gb"],
item["price_usd"], item["url"]),
)
self.connection.commit()
def save_to_postgresql(self):
products_to_save = []
products_to_save.extend(self.storage_queue)
if not products_to_save:
return
self.connection = psycopg2.connect(
host="localhost",
database=self.postgre_db_name,
user="postgres",
password="mypass@654",
)
with self.connection.cursor() as cursor:
for product in products_to_save:
item = asdict(product)
cursor.execute(
""" insert into chocolate_products ( name, price_gb, price_usd, url) values (%s,%s,%s,%s)""",
(item["name"], item["price_gb"],
item["price_usd"], item["url"]),
)
self.connection.commit()
def save_to_s3_bucket(self):
aws_access_key_id = "YOUR_ACCESS_KEY"
aws_secret_access_key = "YOUR_SECRET_KEY"
s3_client = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
response = s3_client.put_object(
Bucket=self.aws_s3_bucket,
Key="chocolate_data.csv",
Body=open("product_data.csv", "rb"),
)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
if status == 200:
print("Successfully uploaded data to S3 bucket!")
else:
print(f"Failed to upload data to S3 bucket. Status code: {status}")
def clean_raw_product(self, scraped_data):
return Product(
name=scraped_data.get("name", ""),
price_string=scraped_data.get("price", ""),
url=scraped_data.get("url", ""),
)
def is_duplicate(self, product_data):
if product_data.name in self.names_seen:
print(f"Duplicate item found: {product_data.name}. Item dropped.")
return True
self.names_seen.append(product_data.name)
return False
def add_product(self, scraped_data):
product = self.clean_raw_product(scraped_data)
if self.is_duplicate(product) == False:
self.storage_queue.append(product)
if len(self.storage_queue) >= self.storage_queue_limit:
self.save_to_json()
self.save_to_mysql()
self.save_to_postgresql()
self.save_to_csv()
self.data_stored = True # Set flag to True when data is stored
def close_pipeline(self):
if len(self.storage_queue) > 0:
self.save_to_json()
self.save_to_mysql()
self.save_to_postgresql()
self.save_to_csv()
if self.data_stored: # Check if data is stored before printing
print("Data pipeline closed. Saved data to files and databases.")
else:
print("No data to save. Closing data pipeline.")
else:
print("No data to save. Closing data pipeline.")
list_of_urls = [
"https://www.chocolate.co.uk/collections/all",
]
def start_scrape():
for url in list_of_urls:
driver.get(url)
products = driver.find_elements(By.CLASS_NAME, "product-item")
for product in products:
name = product.find_element(
By.CLASS_NAME, "product-item-meta__title").text
price = product.find_element(
By.CLASS_NAME, "price").text
url = product.find_element(
By.CLASS_NAME, "product-item-meta__title"
).get_attribute("href")
data_pipeline.add_product(
{"name": name, "price": price, "url": url})
try:
next_page = driver.find_element(By.CSS_SELECTOR, "a[rel='next']")
if next_page:
list_of_urls.append(next_page.get_attribute("href"))
print("Scraped page", len(list_of_urls), "...")
time.sleep(1)
except:
print("No more pages found!")
if __name__ == "__main__":
options = Options()
options.add_argument("--headless") # Enables headless mode
# Using ChromedriverManager to automatically download and install Chromedriver
driver = webdriver.Chrome(
options=options, service=Service(ChromeDriverManager().install())
)
data_pipeline = ProductDataPipeline(
csv_filename="product_data.csv",
json_filename="product_data.json",
mysql_db_name="chocolatedb",
postgre_db_name="chocolatedb",
aws_s3_bucket="chocolate-products",
)
start_scrape()
data_pipeline.close_pipeline()
print("Congratulations! Data saved successfully on MySQL, PostgreSQL, and JSON.")
time.sleep(3)
data_pipeline.save_to_s3_bucket()
driver.quit() # Close the browser window after finishing
print("Scraping completed successfully. Browser closed.")
Next Steps
We hope you now have a good understanding of how to save the data you've scraped into the file or database you need! If you’ve any questions leave them in the comments below and we'll do our best to help out!
If you would like the code from this example please check out on Github here!
The next tutorial covers how to make our scraper more robust and scalable by handling failed requests and using concurrency.