Skip to main content

Scrape Target With Python Requests and BeautifulSoup

How to Scrape Target With Requests and BeautifulSoup

Target is one of the top 10 largest online retailers in the United States. While the company was founded in 1902, they opened their first discount store in 1962. As a conglomerate, Target has owned numerous other companies and continues to do so. This allows them to source a very large amount of inventory both in stores and online. Scraping Target's data can be valuable for gathering pricing information, tracking product availability, or conducting market research.

In this tutorial, we're going to learn how to scrape Target.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape Target

If you need to scrape products on Target, use this one below!

  1. Create a new project folder with a config.json file inside.
  2. Add your ScrapeOps API key to the config file, {"api_key": "your-super-secret-api-key"}.
  3. Then, copy and paste the code below into a Python file.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ProductData:
name: str = ""
price: str = ""
rating: float = 0.0
review_count: int = 0
details: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)

for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"

search_data = SearchData(
name=name,
url=link,
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_product(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

title = soup.select_one("h1[data-test='product-title']").text
rating = "n/a"
review_count = 0
rating_holder = soup.select_one("span[data-test='ratings']")
if rating_holder:
rating_array = rating_holder.text.split(" ")
rating = rating_array[0]
review_count = rating_array[-2]

price_holder = soup.select_one("span[data-test='product-price']")
price = price_holder.text
details = soup.select_one("div[data-test='productDetailTabs-itemDetailsTab']").text

product_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
product_data = ProductData(
name=title,
price=price,
rating=rating,
review_count=review_count,
details=details
)
product_pipeline.add_data(product_data)
product_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_product,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 2
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

To change your results, feel free to change any of the following:

  • MAX_RETRIES: Determines the maximum number of retries the script will attempt if a request fails (e.g., due to a network issue or a non-200 status code).
  • MAX_THREADS: Defines the number of concurrent threads used during the scraping and processing tasks.
  • PAGES: Specifies the number of pages to scrape for each keyword. Each page typically contains a set of search results.
  • LOCATION: Sets the location/country code for the scraping requests. It is passed to the proxy URL to simulate requests coming from a specific region.
  • keyword_list: Contains the list of keywords for which you want to scrape data. Each keyword corresponds to a separate search query on the Target website.

You can run it with python name_of_your_file.py.


How To Architect Our Target Scraper

As mentioned above, to scrape Target's products properly, we're going to need two scrapers.

  1. We need a result crawler to scrape both product names and URLs.
  2. Then, we'll make an item scraper that pings each item url and scrapes the specific details about the item.

Target's layout is different from other online retailers because our search results initially only hold the item title and link to the product page. All other information is loaded dynamically. With Requests, we can't load JavaScript, so we'll take our link and name. The item scraper will go then through and scrape each individual product url.

Here is the process for our result crawler:

  1. Perform a search and parse the results.
  2. Use pagination to control our results.
  3. Safely store our data inside a CSV file for later use.
  4. Concurrently perform steps 1 through 3 on multiple search pages.
  5. Proxy Integration will get us past Target's anti-bots.

After each crawl, our item scraper will perform these actions:

  1. Read our crawler report into an array of dict objects.
  2. Parse each item from the array.
  3. Store the data safely.
  4. Run steps 2 and 3 conurrently on multiple items.
  5. Bypass anti-bots with a proxy.

Understanding How To Scrape Target

Content on Target is loaded dynamically. This can definitely create some issues when trying to scrape.

In order to load our pages, we'll tell ScrapeOps to wait before sending us our responses. This allows the initial page to load.

However, ratings and prices are loaded dynamically as we scrape the page. This is where the real issues com from.

Since we can't scroll the page, we'll scrape our product names and URLs. Afterward, we'll go through and scrape each item.

In the next few sections, we'll take a look at all this from a high level. Then, we'll be able to properly plan out our code.


Step 1: How To Request Target Pages

We'll start by how to GET a Target page. The search URL is laid out like this:

https://www.target.com/s?searchTerm={formatted_keyword}

If you want to perform a search for laptops, your URL would be:

https://www.target.com/s?searchTerm=laptop

You can see this in practice below.

Target Search Results Page

Our individual item pages are laid out like this:

https://www.target.com/p/{name-of-item}/-/some-other-stuff

If we really needed to, we could probably rebuild these links from scratch, but our crawler is finding these, so we don't need to. The page below holds this link:

https://www.target.com/p/hp-15-6-34-fhd-laptop-intel-core-i5-8gb-ram-512gb-ssd-storage-silver-15-fd0075tg/-/A-89476632#lnk=sametab

Target Product Page


Step 2: How To Extract Data From Target Results and Pages

To extract data from our pages, first, we need to look at the data that's available. The data we get from regular access is going to be different than the data we get using Requests and the ScrapeOps Proxy API.

Look at the pages below. Their CSS links are broken, so they look quite different than when you'd normally view the page.

Target Broken Page

Take a look at it when we inspect. You can see the last fully loaded item at the top of the picture. The rest of the items are presented simply as links with titles.

However, these items are embedded within a div. This div has a data-test value of @web/site-top-of-funnel/ProductCardWrapper. We'll use these elements to extract our link and title.

Search Results Page HTML Inspection

Below are some screenshots of an item page through the ScrapeOps Proxy. If you look at these images, you can see where the item details and price are located.

Product Page HTML Inspection Product Page HTML Inspection


Step 3: How To Control Pagination

We can control pagination with a Nao parameter. When you go to page 2, you'll see Nao=24 in the URL. Our full URLs are laid out like this:

https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}

Page 1 would be 0 * 24, page 2 would be 1 * 24 and so on and so forth.


Step 4: Geolocated Data

To handle geolocated data, we'll be using the ScrapeOps Proxy API. When we use a ScrapeOps Proxy, we can pass a country param into our URL.

  • If we wish to appear in the US, we can set "country": "us".
  • If we wish to appear in the UK, we can set "country": "uk".

You can view a full list of our supported countries here.


Setting Up Our Target Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir target-scraper

cd target-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A Target Search Crawler

Now, that we understand what exactly we want to do, we'll go through and build it. We're going to start with our crawler. We'll build all of the following features in order.

  1. Parsing
  2. Pagination
  3. Data Storage
  4. Concurrency
  5. Proxy integration

Step 1: Create Simple Search Data Parser

Time to get started with our crawler. We'll start by writing a basic script with a parsing function.

If you look at the script below, you'll see that we have error handling, retry logic and a parser. This sets the stage for everything else that we're going to do in this project.

Pay special attention to scrape_search_results(), it's our parsing function.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)

for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"

search_data = {
"name": name,
"url": link,
}

print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

From within our parser, we:

  • Find all of our a elements.
  • Find our href with a_tags[0].get("href").
  • Extract our name from the href with some string splitting.
  • Fix our link by adding "https://www.target.com" to our href.

Step 2: Add Pagination

As you read earlier, we're going to use the Nao parameter to control our pagination, Nao={page_number*24} to be more precise. Page 2 starts with Nao=24, so we when we find our pages, we'll start counting at 0.

  • Page 1: 0 * 24 = 0
  • Page 2: 1 * 24 = 24
  • Page 3: 2 * 24 = 48

Here is what our full URL looks like:

https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}

We'll also create a function that allows us to crawl through a list of pages. Take a look at start_scrape().

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)

You can see how it all fits together in our full code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)

for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"

search_data = {
"name": name,
"url": link,
}

print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)




if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")
  • The Nao parameter is used to control our pagination.
  • start_scrape() gives us the ability to crawl multiple pages at once.

Step 3: Storing the Scraped Data

We need to store our data. If we don't store it, we won't be able to review it later and our program won't be able to read it later.

To store it, we'll write two new classes, SearchData and DataPipeline.

Take a look at SearchData, we use it to hold each item's title and URL.

@dataclass
class SearchData:
name: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is our DataPipeline. We feed it SearchData objects and it pipes them to a CSV file. Not only does it pipe them to a CSV, but it also checks and removes duplicate items based on their name.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

In the full code below, we open a DataPipeline and pass our SearchData objects into it.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)

for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"

search_data = SearchData(
name=name,
url=link,
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)




if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • Our SearchData class is used to represent actual results from our search.
  • DataPipeline pipes our results into a CSV file.

Step 4: Adding Concurrency

Everything seems to be running correctly now. It's time to add concurrency. We'll use ThreadPoolExecutor to give us multithreading support. Then on each available thread, we'll call scrape_search_results().

Here is our updated start_scrape() function.

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

These are our arguments to executor.map():

  • scrape_search_results(): the function we want to call on each thread.
  • [keyword * pages]: our keyword as an array the size of our pages.
  • [location * pages]: our location as an array the size of our pages.
  • range(pages): our list of pages.
  • [data_pipeline * page]: our data_pipeline passed in as an array the size of our pages.
  • [retries * pages]: our retries passed in as an array the size of our page list.

Put it all together, and this is what you get.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)

for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"

search_data = SearchData(
name=name,
url=link,
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)




if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 5: Bypassing Anti-Bots

To bypass anti-bots, we need to add proxy support. Here, we'll create a relatively simple function that uses string operations to take in a url and give us a fully proxied version of that same url.

Take a look at the function below, this is where our proxy support comes from.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

Take a look at the payload as well:

  • "api_key": our ScrapeOps API key.
  • "url": the url we want to scrape.
  • "country": the country we want to appear in.
  • "wait": the amount of time we want the server to wait before sending our response.
  • "residential": a boolean. If we want to a residential IP address, we set this to True.

Our full production crawler is available below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)

for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"

search_data = SearchData(
name=name,
url=link,
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Let's get a feel for this thing in production. We'll scrape 2 pages of laptops. Take a look at our updated main below.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 2
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

To change your results, go ahead and change any of the following:

  • MAX_RETRIES: Determines the maximum number of retries the script will attempt if a request fails (e.g., due to a network issue or a non-200 status code).
  • MAX_THREADS: Defines the number of concurrent threads used during the scraping and processing tasks.
  • PAGES: Specifies the number of pages to scrape for each keyword. Each page typically contains a set of search results.
  • LOCATION: Sets the location/country code for the scraping requests. It is passed to the proxy URL to simulate requests coming from a specific region.
  • keyword_list: Contains the list of keywords for which you want to scrape data. Each keyword corresponds to a separate search query on the Target website.

Here are our results.

Crawler Results Performance Terminal

We crawled 2 pages of results and generated a report with 53 items in 18.711 seconds. This comes out to 9.35 seconds per page.


Build A Target Scraper

Now, we need to build a scraper for all the items we retrieved during our crawl. Our scraper needs to be able to do the following:

  1. Read the CSV file into an array.
  2. Parse each row from the array.
  3. Store our parsed data inside a new CSV file.
  4. Concurrently run steps 2 and 3 on multiple items.
  5. Use the ScrapeOps Proxy API to get past anti-bots.

Step 1: Create Simple Item Data Parser

Let's get started by building a parsing function just like we did earlier. We'll add error handling, retries and our parsing logic. We'll call this function, process_product().

def process_product(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

title = soup.select_one("h1[data-test='product-title']").text
rating = "n/a"
review_count = 0
rating_holder = soup.select_one("span[data-test='ratings']")
if rating_holder:
rating_array = rating_holder.text.split(" ")
rating = rating_array[0]
review_count = rating_array[-2]

price_holder = soup.select_one("span[data-test='product-price']")
price = price_holder.text
details = soup.select_one("div[data-test='productDetailTabs-itemDetailsTab']").text

product_data = {
"name": title,
"price": price,
"rating": rating,
"review_count": review_count,
"details": details
}
print(product_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
  • title = soup.select_one("h1[data-test='product-title']").text is used to extract our title.
  • We set our rating and review_count to "n/a" and 0 by default.
  • If there is a rating or review_count present, we reassign it to its respective variable.
  • soup.select_one("div[data-test='productDetailTabs-itemDetailsTab']").text is used to find our details.

Step 2: Loading URLs To Scrape

In order to use our parsing function, we need to load the urls from our CSV file... this is why we saved them earlier.

Take a look at the function below, it reads the CSV file and then runs process_product() on each item from the array.

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_product(row, location, retries=retries)

After we put everything together, we get a script that looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)

for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"

search_data = SearchData(
name=name,
url=link,
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_product(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

title = soup.select_one("h1[data-test='product-title']").text
rating = "n/a"
review_count = 0
rating_holder = soup.select_one("span[data-test='ratings']")
if rating_holder:
rating_array = rating_holder.text.split(" ")
rating = rating_array[0]
review_count = rating_array[-2]

price_holder = soup.select_one("span[data-test='product-price']")
price = price_holder.text
details = soup.select_one("div[data-test='productDetailTabs-itemDetailsTab']").text

product_data = {
"name": title,
"price": price,
"rating": rating,
"review_count": review_count,
"details": details
}
print(product_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_product(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
  • process_results() is used to read our CSV file and iterate through it.
  • We call process_product() on each item from the array as we iterate.

Step 3: Storing the Scraped Data

In order to store our data, we need to add another dataclass. We'll call this one ProductData. It holds the following fields:

  • name
  • price
  • rating
  • review_count
  • details

Here is our ProductData class.

@dataclass
class ProductData:
name: str = ""
price: str = ""
rating: float = 0.0
review_count: int = 0
details: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In our full code below, we open a DataPipeline inside our parsing function and then pass a ProductData object into it.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ProductData:
name: str = ""
price: str = ""
rating: float = 0.0
review_count: int = 0
details: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)

for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"

search_data = SearchData(
name=name,
url=link,
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_product(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

title = soup.select_one("h1[data-test='product-title']").text
rating = "n/a"
review_count = 0
rating_holder = soup.select_one("span[data-test='ratings']")
if rating_holder:
rating_array = rating_holder.text.split(" ")
rating = rating_array[0]
review_count = rating_array[-2]

price_holder = soup.select_one("span[data-test='product-price']")
price = price_holder.text
details = soup.select_one("div[data-test='productDetailTabs-itemDetailsTab']").text

product_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
product_data = ProductData(
name=title,
price=price,
rating=rating,
review_count=review_count,
details=details
)
product_pipeline.add_data(product_data)
product_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_product(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

Step 4: Adding Concurrency

Now, we're going to once again add concurrency. Just like we did earlier, we'll use ThreadPoolExecutor to call our parsing function on all available threads.

Take a look at our refactored process_results().

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_product,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Like before, look at executor.map():

  • process_product is the function we want to call on each thread.
  • reader is the array of items we wish to parse.
  • All other arguments get passed in as arrays, just like we did earlier.

Step 5: Bypassing Anti-Bots

We'll bypass anti-bots exactly the way we did earlier. We already have get_scrapeops_url(), we just need to use it in the right place. We'll change one line from within our parser.

response = requests.get(get_scrapeops_url(url, location=location))

You can view our production ready code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ProductData:
name: str = ""
price: str = ""
rating: float = 0.0
review_count: int = 0
details: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.target.com/s?searchTerm={formatted_keyword}&Nao={page_number*24}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-test='@web/site-top-of-funnel/ProductCardWrapper']", recursive=False)

for div_card in div_cards:
a_tags = div_card.find_all("a")
href = a_tags[0].get("href")
name = href.split("/")[2]
link = f"https://www.target.com{href}"

search_data = SearchData(
name=name,
url=link,
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_product(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

title = soup.select_one("h1[data-test='product-title']").text
rating = "n/a"
review_count = 0
rating_holder = soup.select_one("span[data-test='ratings']")
if rating_holder:
rating_array = rating_holder.text.split(" ")
rating = rating_array[0]
review_count = rating_array[-2]

price_holder = soup.select_one("span[data-test='product-price']")
price = price_holder.text
details = soup.select_one("div[data-test='productDetailTabs-itemDetailsTab']").text

product_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
product_data = ProductData(
name=title,
price=price,
rating=rating,
review_count=review_count,
details=details
)
product_pipeline.add_data(product_data)
product_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_product,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 2
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Now, let's test everything in production. We'll set PAGES to 2 and MAX_THREADS to 5 just like we did earlier. If you need a refresher, here is our main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 2
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Here are our results.

Scraper Results Performance Terminal

We generated a report with 52 results. If you remember from earlier, our crawl took 18.711 seconds. Our full crawl and scrape took 283.987 seconds. 283.987 - 18.711 = 265.276 seconds spent scraping items. 265.276 seconds / 52 items = 5.101 seconds per item. This is nearly twice as fast as our crawler.


When we scrape the web there are certain rules we need to follow. Particularly, we need to pay attention to privacy an intellectual property. If your data isn't gated behind a login page, this data is generally considered to be public. If you need to login, it's considered private data.

When accessing any site, you are subject to their Terms and Conditions and their robots.txt.

You can view Target's terms here. Their robots.txt is available here.

If you're unsure of your scraper, you should talk to an attorney.


Conclusion

Congrats! You've successfully crawled and scraped Target. At this point, you've had a crash course in using Requests and BeautifulSoup and you've also learned about parsing, pagination, data storage, concurrency, and proxy integration.

If you'd like to learn more about the tech from this article, check out the links below.


More Python Web Scraping Guides

At ScrapeOps, we have tons of learning resources available. If you're brand new to coding, or you're a seasoned developer, you can pick up something useful here. Take a look at our Python Web Scraping Playbook.

If you want to read more from our "How To Scrape" series, check out the articles below.