How to Scrape Target With Requests and BeautifulSoup
Target is one of the top 10 largest online retailers in the United States. While the company was founded in 1902, they opened their first discount store in 1962. As a conglomerate, Target has owned numerous other companies and continues to do so. This allows them to source a very large amount of inventory both in stores and online. Scraping Target's data can be valuable for gathering pricing information, tracking product availability, or conducting market research.
In this tutorial, we're going to learn how to scrape Target.
- TLDR: How to Scrape Target
- How To Architect Our Scraper
- Understanding How To Scrape Target
- Setting Up Our Target Scraper
- Build A Target Search Crawler
- Build A Rarget Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
TLDR - How to Scrape Target
If you need to scrape products on Target, use this one below!
- Create a new project folder with a
config.json
file inside. - Add your ScrapeOps API key to the config file,
{"api_key": "your-super-secret-api-key"}
. - Then, copy and paste the code below into a Python file.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ProductData:
name: str = ""
price: str = ""
rating: float = 0.0
review_count: int = 0
details: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)
for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"
search_data = SearchData(
name=name,
url=link,
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_product(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
title = soup.select_one("h1[data-test='product-title']").text
rating = "n/a"
review_count = 0
rating_holder = soup.select_one("span[data-test='ratings']")
if rating_holder:
rating_array = rating_holder.text.split(" ")
rating = rating_array[0]
review_count = rating_array[-2]
price_holder = soup.select_one("span[data-test='product-price']")
price = price_holder.text
details = soup.select_one("div[data-test='productDetailTabs-itemDetailsTab']").text
product_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
product_data = ProductData(
name=title,
price=price,
rating=rating,
review_count=review_count,
details=details
)
product_pipeline.add_data(product_data)
product_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_product,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 2
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
To change your results, feel free to change any of the following:
MAX_RETRIES
: Determines the maximum number of retries the script will attempt if a request fails (e.g., due to a network issue or a non-200 status code).MAX_THREADS
: Defines the number of concurrent threads used during the scraping and processing tasks.PAGES
: Specifies the number of pages to scrape for each keyword. Each page typically contains a set of search results.LOCATION
: Sets the location/country code for the scraping requests. It is passed to the proxy URL to simulate requests coming from a specific region.keyword_list
: Contains the list of keywords for which you want to scrape data. Each keyword corresponds to a separate search query on the Target website.
You can run it with python name_of_your_file.py
.
How To Architect Our Target Scraper
As mentioned above, to scrape Target's products properly, we're going to need two scrapers.
- We need a result crawler to scrape both product names and URLs.
- Then, we'll make an item scraper that pings each item url and scrapes the specific details about the item.
Target's layout is different from other online retailers because our search results initially only hold the item title and link to the product page. All other information is loaded dynamically. With Requests, we can't load JavaScript, so we'll take our link and name. The item scraper will go then through and scrape each individual product url.
Here is the process for our result crawler:
- Perform a search and parse the results.
- Use pagination to control our results.
- Safely store our data inside a CSV file for later use.
- Concurrently perform steps 1 through 3 on multiple search pages.
- Proxy Integration will get us past Target's anti-bots.
After each crawl, our item scraper will perform these actions:
- Read our crawler report into an array of
dict
objects. - Parse each item from the array.
- Store the data safely.
- Run steps 2 and 3 conurrently on multiple items.
- Bypass anti-bots with a proxy.
Understanding How To Scrape Target
Content on Target is loaded dynamically. This can definitely create some issues when trying to scrape.
In order to load our pages, we'll tell ScrapeOps to wait before sending us our responses. This allows the initial page to load.
However, ratings and prices are loaded dynamically as we scrape the page. This is where the real issues com from.
Since we can't scroll the page, we'll scrape our product names and URLs. Afterward, we'll go through and scrape each item.
In the next few sections, we'll take a look at all this from a high level. Then, we'll be able to properly plan out our code.
Step 1: How To Request Target Pages
We'll start by how to GET a Target page. The search URL is laid out like this:
https://www.target.com/s?searchTerm={formatted_keyword}
If you want to perform a search for laptops, your URL would be:
https://www.target.com/s?searchTerm=laptop
You can see this in practice below.
Our individual item pages are laid out like this:
https://www.target.com/p/{name-of-item}/-/some-other-stuff
If we really needed to, we could probably rebuild these links from scratch, but our crawler is finding these, so we don't need to. The page below holds this link:
https://www.target.com/p/hp-15-6-34-fhd-laptop-intel-core-i5-8gb-ram-512gb-ssd-storage-silver-15-fd0075tg/-/A-89476632#lnk=sametab
Step 2: How To Extract Data From Target Results and Pages
To extract data from our pages, first, we need to look at the data that's available. The data we get from regular access is going to be different than the data we get using Requests and the ScrapeOps Proxy API.
Look at the pages below. Their CSS links are broken, so they look quite different than when you'd normally view the page.
Take a look at it when we inspect. You can see the last fully loaded item at the top of the picture. The rest of the items are presented simply as links with titles.
However, these items are embedded within a div
. This div
has a data-test
value of @web/site-top-of-funnel/ProductCardWrapper
. We'll use these elements to extract our link and title.
Below are some screenshots of an item page through the ScrapeOps Proxy. If you look at these images, you can see where the item details and price are located.
Step 3: How To Control Pagination
We can control pagination with a Nao
parameter. When you go to page 2, you'll see Nao=24
in the URL. Our full URLs are laid out like this:
https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}
Page 1 would be 0 * 24, page 2 would be 1 * 24 and so on and so forth.
Step 4: Geolocated Data
To handle geolocated data, we'll be using the ScrapeOps Proxy API. When we use a ScrapeOps Proxy, we can pass a country
param into our URL.
- If we wish to appear in the US, we can set
"country": "us"
. - If we wish to appear in the UK, we can set
"country": "uk"
.
You can view a full list of our supported countries here.
Setting Up Our Target Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir target-scraper
cd target-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A Target Search Crawler
Now, that we understand what exactly we want to do, we'll go through and build it. We're going to start with our crawler. We'll build all of the following features in order.
- Parsing
- Pagination
- Data Storage
- Concurrency
- Proxy integration
Step 1: Create Simple Search Data Parser
Time to get started with our crawler. We'll start by writing a basic script with a parsing function.
If you look at the script below, you'll see that we have error handling, retry logic and a parser. This sets the stage for everything else that we're going to do in this project.
Pay special attention to scrape_search_results()
, it's our parsing function.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)
for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"
search_data = {
"name": name,
"url": link,
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
From within our parser, we:
- Find all of our
a
elements. - Find our
href
witha_tags[0].get("href")
. - Extract our name from the
href
with some string splitting. - Fix our link by adding
"https://www.target.com"
to ourhref
.
Step 2: Add Pagination
As you read earlier, we're going to use the Nao
parameter to control our pagination, Nao={page_number*24}
to be more precise. Page 2 starts with Nao=24
, so we when we find our pages, we'll start counting at 0.
- Page 1: 0 * 24 = 0
- Page 2: 1 * 24 = 24
- Page 3: 2 * 24 = 48
Here is what our full URL looks like:
https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}
We'll also create a function that allows us to crawl through a list of pages. Take a look at start_scrape()
.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
You can see how it all fits together in our full code below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)
for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"
search_data = {
"name": name,
"url": link,
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
- The
Nao
parameter is used to control our pagination. start_scrape()
gives us the ability to crawl multiple pages at once.
Step 3: Storing the Scraped Data
We need to store our data. If we don't store it, we won't be able to review it later and our program won't be able to read it later.
To store it, we'll write two new classes, SearchData
and DataPipeline
.
Take a look at SearchData
, we use it to hold each item's title and URL.
@dataclass
class SearchData:
name: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is our DataPipeline
. We feed it SearchData
objects and it pipes them to a CSV file. Not only does it pipe them to a CSV, but it also checks and removes duplicate items based on their name
.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
In the full code below, we open a DataPipeline
and pass our SearchData
objects into it.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)
for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"
search_data = SearchData(
name=name,
url=link,
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- Our
SearchData
class is used to represent actual results from our search. DataPipeline
pipes our results into a CSV file.
Step 4: Adding Concurrency
Everything seems to be running correctly now. It's time to add concurrency. We'll use ThreadPoolExecutor
to give us multithreading support. Then on each available thread, we'll call scrape_search_results()
.
Here is our updated start_scrape()
function.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
These are our arguments to executor.map()
:
scrape_search_results()
: the function we want to call on each thread.[keyword * pages]
: ourkeyword
as an array the size of our pages.[location * pages]
: ourlocation
as an array the size of our pages.range(pages)
: our list of pages.[data_pipeline * page]
: ourdata_pipeline
passed in as an array the size of our pages.[retries * pages]
: ourretries
passed in as an array the size of our page list.
Put it all together, and this is what you get.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)
for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"
search_data = SearchData(
name=name,
url=link,
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 5: Bypassing Anti-Bots
To bypass anti-bots, we need to add proxy support. Here, we'll create a relatively simple function that uses string operations to take in a url and give us a fully proxied version of that same url.
Take a look at the function below, this is where our proxy support comes from.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
Take a look at the payload
as well:
"api_key"
: our ScrapeOps API key."url"
: the url we want to scrape."country"
: the country we want to appear in."wait"
: the amount of time we want the server to wait before sending our response."residential"
: a boolean. If we want to a residential IP address, we set this toTrue
.
Our full production crawler is available below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)
for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"
search_data = SearchData(
name=name,
url=link,
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Let's get a feel for this thing in production. We'll scrape 2 pages of laptops. Take a look at our updated main
below.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 2
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
To change your results, go ahead and change any of the following:
MAX_RETRIES
: Determines the maximum number of retries the script will attempt if a request fails (e.g., due to a network issue or a non-200 status code).MAX_THREADS
: Defines the number of concurrent threads used during the scraping and processing tasks.PAGES
: Specifies the number of pages to scrape for each keyword. Each page typically contains a set of search results.LOCATION
: Sets the location/country code for the scraping requests. It is passed to the proxy URL to simulate requests coming from a specific region.keyword_list
: Contains the list of keywords for which you want to scrape data. Each keyword corresponds to a separate search query on the Target website.
Here are our results.
We crawled 2 pages of results and generated a report with 53 items in 18.711 seconds. This comes out to 9.35 seconds per page.
Build A Target Scraper
Now, we need to build a scraper for all the items we retrieved during our crawl. Our scraper needs to be able to do the following:
- Read the CSV file into an array.
- Parse each row from the array.
- Store our parsed data inside a new CSV file.
- Concurrently run steps 2 and 3 on multiple items.
- Use the ScrapeOps Proxy API to get past anti-bots.
Step 1: Create Simple Item Data Parser
Let's get started by building a parsing function just like we did earlier. We'll add error handling, retries and our parsing logic. We'll call this function, process_product()
.
def process_product(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
title = soup.select_one("h1[data-test='product-title']").text
rating = "n/a"
review_count = 0
rating_holder = soup.select_one("span[data-test='ratings']")
if rating_holder:
rating_array = rating_holder.text.split(" ")
rating = rating_array[0]
review_count = rating_array[-2]
price_holder = soup.select_one("span[data-test='product-price']")
price = price_holder.text
details = soup.select_one("div[data-test='productDetailTabs-itemDetailsTab']").text
product_data = {
"name": title,
"price": price,
"rating": rating,
"review_count": review_count,
"details": details
}
print(product_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
title = soup.select_one("h1[data-test='product-title']").text
is used to extract our title.- We set our
rating
andreview_count
to"n/a"
and0
by default. - If there is a
rating
orreview_count
present, we reassign it to its respective variable. soup.select_one("div[data-test='productDetailTabs-itemDetailsTab']").text
is used to find ourdetails
.
Step 2: Loading URLs To Scrape
In order to use our parsing function, we need to load the urls from our CSV file... this is why we saved them earlier.
Take a look at the function below, it reads the CSV file and then runs process_product()
on each item from the array.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_product(row, location, retries=retries)
After we put everything together, we get a script that looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)
for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"
search_data = SearchData(
name=name,
url=link,
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_product(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
title = soup.select_one("h1[data-test='product-title']").text
rating = "n/a"
review_count = 0
rating_holder = soup.select_one("span[data-test='ratings']")
if rating_holder:
rating_array = rating_holder.text.split(" ")
rating = rating_array[0]
review_count = rating_array[-2]
price_holder = soup.select_one("span[data-test='product-price']")
price = price_holder.text
details = soup.select_one("div[data-test='productDetailTabs-itemDetailsTab']").text
product_data = {
"name": title,
"price": price,
"rating": rating,
"review_count": review_count,
"details": details
}
print(product_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_product(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
process_results()
is used to read our CSV file and iterate through it.- We call
process_product()
on each item from the array as we iterate.
Step 3: Storing the Scraped Data
In order to store our data, we need to add another dataclass
. We'll call this one ProductData
. It holds the following fields:
name
price
rating
review_count
details
Here is our ProductData
class.
@dataclass
class ProductData:
name: str = ""
price: str = ""
rating: float = 0.0
review_count: int = 0
details: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In our full code below, we open a DataPipeline
inside our parsing function and then pass a ProductData
object into it.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ProductData:
name: str = ""
price: str = ""
rating: float = 0.0
review_count: int = 0
details: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)
for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"
search_data = SearchData(
name=name,
url=link,
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_product(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
title = soup.select_one("h1[data-test='product-title']").text
rating = "n/a"
review_count = 0
rating_holder = soup.select_one("span[data-test='ratings']")
if rating_holder:
rating_array = rating_holder.text.split(" ")
rating = rating_array[0]
review_count = rating_array[-2]
price_holder = soup.select_one("span[data-test='product-price']")
price = price_holder.text
details = soup.select_one("div[data-test='productDetailTabs-itemDetailsTab']").text
product_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
product_data = ProductData(
name=title,
price=price,
rating=rating,
review_count=review_count,
details=details
)
product_pipeline.add_data(product_data)
product_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_product(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 4: Adding Concurrency
Now, we're going to once again add concurrency. Just like we did earlier, we'll use ThreadPoolExecutor
to call our parsing function on all available threads.
Take a look at our refactored process_results()
.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_product,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Like before, look at executor.map()
:
process_product
is the function we want to call on each thread.reader
is the array of items we wish to parse.- All other arguments get passed in as arrays, just like we did earlier.
Step 5: Bypassing Anti-Bots
We'll bypass anti-bots exactly the way we did earlier. We already have get_scrapeops_url()
, we just need to use it in the right place. We'll change one line from within our parser.
response = requests.get(get_scrapeops_url(url, location=location))
You can view our production ready code below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ProductData:
name: str = ""
price: str = ""
rating: float = 0.0
review_count: int = 0
details: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)
for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"
search_data = SearchData(
name=name,
url=link,
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_product(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
title = soup.select_one("h1[data-test='product-title']").text
rating = "n/a"
review_count = 0
rating_holder = soup.select_one("span[data-test='ratings']")
if rating_holder:
rating_array = rating_holder.text.split(" ")
rating = rating_array[0]
review_count = rating_array[-2]
price_holder = soup.select_one("span[data-test='product-price']")
price = price_holder.text
details = soup.select_one("div[data-test='productDetailTabs-itemDetailsTab']").text
product_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
product_data = ProductData(
name=title,
price=price,
rating=rating,
review_count=review_count,
details=details
)
product_pipeline.add_data(product_data)
product_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_product,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 2
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Now, let's test everything in production. We'll set PAGES
to 2 and MAX_THREADS
to 5 just like we did earlier. If you need a refresher, here is our main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 2
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Here are our results.
We generated a report with 52 results. If you remember from earlier, our crawl took 18.711 seconds. Our full crawl and scrape took 283.987 seconds. 283.987 - 18.711 = 265.276 seconds spent scraping items. 265.276 seconds / 52 items = 5.101 seconds per item. This is nearly twice as fast as our crawler.
Legal and Ethical Considerations
When we scrape the web there are certain rules we need to follow. Particularly, we need to pay attention to privacy an intellectual property. If your data isn't gated behind a login page, this data is generally considered to be public. If you need to login, it's considered private data.
When accessing any site, you are subject to their Terms and Conditions and their robots.txt
.
You can view Target's terms here. Their robots.txt
is available here.
If you're unsure of your scraper, you should talk to an attorney.
Conclusion
Congrats! You've successfully crawled and scraped Target. At this point, you've had a crash course in using Requests and BeautifulSoup and you've also learned about parsing, pagination, data storage, concurrency, and proxy integration.
If you'd like to learn more about the tech from this article, check out the links below.
More Python Web Scraping Guides
At ScrapeOps, we have tons of learning resources available. If you're brand new to coding, or you're a seasoned developer, you can pick up something useful here. Take a look at our Python Web Scraping Playbook.
If you want to read more from our "How To Scrape" series, check out the articles below.