How to Scrape Bing With Requests and BeautifulSoup
There are many popular search engines out there. For most people, Google comes to mind immediately. The second most popular one is Bing. Launched in 2009, Bing is an evolution of earlier search engines owned by Microsoft. It is very useful and if you need, it can work as a full fledged replacement for Google.
Today, we'll build a Bing crawler and a metadata scraper.
- TLDR How to Scrape Bing
- How To Architect Our Scraper
- Understanding How To Scrape Bing
- Setting Up Our Bing Scraper
- Build A Bing Search Crawler
- Build A Bing Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
TLDR - How to Scrape Bing
If you need to scrape Bing but you don't have time to read this article and then write your own scraper, you can use this one. To use this scraper:
- Create a new project folder and add a
config.json
file with your ScrapeOps API key. - Then copy this code and paste it into a Python file of your choice.
- Afterward, simply run
python name_of_your_script.py
.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class MetaData:
name: str = ""
url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")
excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number
parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
search_data = SearchData(
name=h2,
base_url=base_url,
url=href,
page=page_number,
result_number=rank
)
data_pipeline.add_data(search_data)
result_number += 1
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_result(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")
title = head.find("title").text
meta_tags = head.find_all("meta")
meta_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
description = "n/a"
description_holder = head.select_one("meta[name='description']")
if description_holder:
description = description_holder.get("content")
meta_data = MetaData(
name=title,
url=row["url"],
description=description
)
meta_pipeline.add_data(meta_data)
success = True
meta_pipeline.close_pipeline()
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_result,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Feel free to change any of the following constants from main
:
MAX_THREADS
: Determines the number of threads (parallel tasks) the script will run simultaneously.MAX_RETRIES
: Specifies the maximum number of retry attempts for a failed request.PAGES
: Controls how many pages of search results the script will process.LOCATION
: Defines the geographical location from which the scraping requests should appear to originate.keyword_list
: A list of keywords or phrases that will be used as search queries on Bing.
How To Architect Our Bing Scraper
Our Bing scraper project will consist of two scrapers.
- The first will be a Bing crawler.
- The crawler extracts data directly from Bing search results.
- The second scraper will be a metadata scraper.
- The metadata scraper will look up all the sites we pulled with the crawler and extract some of their metadata.
We'll need to do the following tasks with the crawler:
- Parse search results.
- Paginate our result batches.
- Store the data we've parsed.
- Perform steps 1 through 3 with concurrency
- Use a proxy to avoid getting blocked.
Our scraper will perform these tasks:
- Read the CSV file from the crawler.
- Parse each row from the file.
- Store the data from the step above.
- Concurrently perform tasks 2 and 3 on multiple pages simultaneously.
- Proxy Integration to bypass anti-bots.
Understanding How To Scrape Bing
Step 1: How To Request Bing Pages
We can request Bing pages with a simple GET request. Take a look at the URL in the screenshot below. Our URL is laid out like this:
https://www.bing.com/search?q=learn+rust
?
denotes the fact that we want to perform a query and the query we perform isq=learn+rust
.- If we wanted to look up online banks, we would instead pass in
q=online+banks
.
Step 2: How To Extract Data From Bing Results and Pages
To extract data from Bing, we need to find all of the h2
elements. These h2
tags contain our links. So we first find our h2
and then find the link within it. In the screenshot below, you can see the a
element embedded within the h2
.
After we've finished our crawl, we're going to access a bunch of different websites with different layouts. What all these sites have in common though is metadata.
meta
and title
tags get embedded within the head
element on the page. Take a look at the screenshot below and you can see this in action.
Step 3: How To Control Pagination
For pagination control, we simply need to add one param to our URL. start={result_number}
holds the key to our pagination.
Like many other sites, Bing gives each result a unique number. Page 0 holds results 1 through 10, page 1 holds 11 through 20... You get the idea, our result_number
is page_number * 10
.
Step 4: Geolocated Data
To handle geolocation, we'll once again be using the ScrapeOps Proxy API.
The ScrapeOps API takes in a country
parameter. This parameter allows us to specifiy our location with the proxy.
- If we want to appear in the US, we tell ScrapeOps:
"country": "us"
. - If we want to appear in the UK, we could tell the server:
"country": "uk"
.
Setting Up Our Bing Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir bing-scraper
cd bing-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build a Bing Search Crawler
Let's get started on our crawler. Our crawler is going to perform a search on Bing. Then it will parse and store the results from the search. We're going to go through and build the following into our crawler step-by-step.
- Parsing
- Pagination
- Data Storage
- Concurrency
- Proxy Integration
Step 1: Create Simple Search Data Parser
Let's get started by building a basic parsing function.
In the code below, we add our dependencies, some error handling and some basic retry logic. while
we still have tries
left and the operation hasn't succeeded, we attempt to parse the data. We first find the h2
elements and get our relevant data from there.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")
excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number
parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
search_data = {
"name": h2,
"base_url": base_url,
"url": url,
"page": page_number,
"result_number": result_number
}
print(search_data)
result_number += 1
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
Once we've found the h2
, we do the following:
- Find our link element with
header.find("a")
. - Extract the link with
link.get("href")
. - If we receive an incomplete link, we complete it:
href = f"https://www.bing.com{href}"
. - We assign a unique number to each result:
rank = result_number
. - After parsing the url, we extract the base domain name as well:
f"{parsed_url.scheme}://{parsed_url.netloc}"
.
Step 2: Add Pagination
Now that we're parsing pages properly, we need to be able to paginate them. To paginate our URL, we're going to add one param to it, start
.
Along with this, we'll add another function, start_scrape()
. This function will allow us to call scrape_search_results()
on a full set of pages.
Here is start_scrape()
.
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
This function is relatively simple. It just uses a for
loop to scrape each page in our list of pages.
Here is the fully updated code we're working with.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")
excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number
parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
search_data = {
"name": h2,
"base_url": base_url,
"url": url,
"page": page_number,
"result_number": result_number
}
print(search_data)
result_number += 1
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
- Our URL now holds a parameter for pagination:
"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
start_scrape()
allows us to scrape a list of pages.
Step 3: Storing the Scraped Data
Without proper storage, our parsed data is completely useless. To hold our data, we'll use a dataclass
, SearchData
. This class holds individual information about each result we extract during the crawl.
We then pass the SearchData
object into a DataPipeline
. Our DataPipeline
opens a pipe to a CSV file and puts our SearchData
through it. This DataPipeline
also goes through and removes duplicates from our storage.
Here is our SearchData
class. It holds all of the fields we extracted in the parse along with our page number as well.
@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here, you can get a better look at our DataPipeline
as well.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
Our fully updated code is available below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")
excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number
parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
search_data = SearchData(
name=h2,
base_url=base_url,
url=href,
page=page_number,
result_number=rank
)
data_pipeline.add_data(search_data)
result_number += 1
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- Before calling
start_scrape()
we create aDataPipeline
and pass it intostart_scrape()
. - Once we've parsed our data, we turn it into a
SearchData
object. - We then pass our
SearchData
into theDataPipeline
via theadd_data()
method.
Step 4: Adding Concurrency
We now need to add concurrency to our crawler. Concurrency allows us to scrape multiple pages at the same time. In order to do this, we need to refactor start_scrape()
by removing our for
loop and replacing it with a call to ThreadPoolExecutor
.
Take a look at the finished function below.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
Look closely at our arguments to executor.map()
:
- Our first arg,
scrape_search_results
is the function we want to call on each open thread. - All arguments to
scrape_search_results
get passed intoexecutor.map()
as arrays which then get passed intoscrape_search_results
on each individual thread that runs.
Our code for the crawler is almost finished. Here is what it looks like now.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")
excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number
parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
search_data = SearchData(
name=h2,
base_url=base_url,
url=href,
page=page_number,
result_number=rank
)
data_pipeline.add_data(search_data)
result_number += 1
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 5: Bypassing Anti-Bots
Now, we need to unlock the power of proxy. With the ScrapeOps Proxy API, we can go through pretty much any anti-bot that comes our way. This proxy gives us a new IP address in the country of our choosing.
We pass the following params into ScrapeOps: "api_key"
, "url"
, "country"
.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
"api_key"
holds our ScrapeOps API key."url"
is the url we're trying to scrape."country"
is the country we'd like to be routed through.- The function takes all these params and returns a url configured to the ScrapeOps proxy.
Step 6: Production Run
Now that our crawler is finished, we need to run it in production and get a good feel for its performance. We'll scrape 5 pages of results and see what happens. Here is our updated main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
You can see our results in the screenshot below.
We parsed 5 pages in 7.05 seconds. This comes out to 1.41 seconds per page. This is lightning fast to scrape any page. Your results may vary depending on your hardware, the quality of your internet connection and server latency.
Build a Bing Scraper
Now that we're crawling Bing and generating results, we need to build a scraper that does something with those results. The problem with this is that all of our URLs go to different websites with different layouts.
However, there is one thing that all of these sites have in common... metadata. All sites contain a head
tag and within it, they embed the title
of the site along with a bunch of metadata elements encased in the tag, meta
.
This scraper is going to run the following processes in order:
- Read the data from the CSV file.
- Parse the metadata from the sites we saved in the CSV.
- Store the parsed data inside a new CSV file.
- Run steps 2 and 3 concurrently on multiple websites.
- Once again, integrate with a proxy to bypass anti-bots.
Step 1: Create Simple Website Data Parser
In this section, we'll start our scraper. Very similar to how we started earlier, we'll start with basic parsing, error handling and retry logic. This gives us a structure that we can continue to build on easily. Take a look at our parsing function.
def process_result(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")
title = head.find("title").text
meta_tags = head.find_all("meta")
description = "n/a"
description_holder = head.select_one("meta[name='description']")
if description_holder:
description = description_holder.get("content")
meta_data = {
"name": title,
"url": row["url"],
"description": description
}
print(meta_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
while
we still have retries left and the operation has not succeeded:
- We first find the
head
tag:soup.find("head")
. - Then, we find the
title
:head.find("title").text
. - Afterward, we set the default
description
to"n/a"
. - If there is a
description
present, we set that value to ourdescription
variable. Otherwise, we retain the"n/a"
value.
Step 2: Loading URLs To Scrape
In order to use our parsing function, it needs a url. To get our urls, we're going to read the CSV file we generated with the crawler. Once we've read the file, we'll call parse_result()
on each row that we read from the CSV file.
Here is our new process_results()
function.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_result(row, location, retries=retries)
After we put it all together, the full code looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")
excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number
parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
search_data = SearchData(
name=h2,
base_url=base_url,
url=href,
page=page_number,
result_number=rank
)
data_pipeline.add_data(search_data)
result_number += 1
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_result(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")
title = head.find("title").text
meta_tags = head.find_all("meta")
description = "n/a"
description_holder = head.select_one("meta[name='description']")
if description_holder:
description = description_holder.get("content")
meta_data = {
"name": title,
"url": row["url"],
"description": description
}
print(meta_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_result(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
process_results()
does all of the following things for us:
- Read the CSV file into an array.
- Iterate through the rows of the CSV.
- Call
process_result()
on each row from the CSV.
This give us the final structure for how our code will be laid out.
Step 3: Storing the Scraped Data
Just like earlier, we need to store the data we've parsed. In order to do that, we'll add one more dataclass
. We'll call this one MetaData
. The sole purpose of this one is to hold the site metadata we've beeing parsing.
You can take a look at it below, it's virtually identical to SearchData
.
@dataclass
class MetaData:
name: str = ""
url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
We then need to open a DataPipeline
within our parsing function and pass MetaData
into it with add_data()
. In the full code below, we do just that.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class MetaData:
name: str = ""
url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")
excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number
parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
search_data = SearchData(
name=h2,
base_url=base_url,
url=href,
page=page_number,
result_number=rank
)
data_pipeline.add_data(search_data)
result_number += 1
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_result(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")
title = head.find("title").text
meta_tags = head.find_all("meta")
meta_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
description = "n/a"
description_holder = head.select_one("meta[name='description']")
if description_holder:
description = description_holder.get("content")
meta_data = MetaData(
name=title,
url=row["url"],
description=description
)
meta_pipeline.add_data(meta_data)
success = True
meta_pipeline.close_pipeline()
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_result(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
- We now open up a
DataPipeline
in our parsing function:meta_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
- We pass our
MetaData
object into it:meta_pipeline.add_data(meta_data)
. - After the operation has succeeded, we go ahead and close the pipeline.
Step 4: Adding Concurrency
Adding concurrency is relatively simply now that we've done it once before in this tutorial. In the code below, we refactor process_results()
to add multithreading just like we did on the crawler earlier.
Here is our new process_results()
function.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_result,
reader,
[location] * len(reader),
[retries] * len(reader)
)
process_result
is the function we want to call on all of our open threads.- All args to
process_result
get passed in as arrays just like when we added multithreading earlier.
Step 5: Bypassing Anti-Bots
Finally, we need to add proxy integration to our scraper as well. We've already got get_scrapeops_url()
, we just need to call it from within our parsing function.
We'll change one line and unlock the full power of the proxy.
response = requests.get(get_scrapeops_url(url, location=location))
You can look at our production ready code below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class MetaData:
name: str = ""
url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")
excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number
parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
search_data = SearchData(
name=h2,
base_url=base_url,
url=href,
page=page_number,
result_number=rank
)
data_pipeline.add_data(search_data)
result_number += 1
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_result(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")
title = head.find("title").text
meta_tags = head.find_all("meta")
meta_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
description = "n/a"
description_holder = head.select_one("meta[name='description']")
if description_holder:
description = description_holder.get("content")
meta_data = MetaData(
name=title,
url=row["url"],
description=description
)
meta_pipeline.add_data(meta_data)
success = True
meta_pipeline.close_pipeline()
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_result,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Now, let's test this out in production! Like before, we'll set pages to 5. If you need to look at our main
again, here it is.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
As always feel free to tweak any of the constants from your main
to change your results. Here are our results.
If you remember from earlier, we crawled 5 pages in roughly 7 seconds and this full crawl and scrape took 35.065 seconds. Our crawl generated a file with 17 results. We'll estimate our scrape at 28 seconds. 28 seconds / 17 results = 1.64 seconds per result.
Legal and Ethical Considerations
When you access a website, you're subject to their Terms of Service as well as their robots.txt
. Since scraping is a type of access, today, you are subject to Bing's terms.
You can view their terms of service here. Since Bing is a Microsoft product, it is subject to their terms. You can view their robots.txt
here.
It is generally legal to scrape data that is publicly available on the web. If you don't have to login, it's considered public data. Any data gated behind a login page is considered private data.
If you're not sure that your scraper is legal, consult an attorney.
Conclusion
You now know how to build a full fledged crawl and scrape project on Bing. You've got a solid understanding of parsing, pagination, data storage, concurrency, and proxy integration. You should also have decent grasp on how to use Requests and BeautifulSoup. Take this new knowledge and go build something!
More Python Web Scraping Guides
If you're looking to learn more or you're in a mood to binge read, take a look at the guided projects below. You'll continue mastering Requests and BeautifulSoup and you'll also implement the concepts we worked on this guide.
Check our The Python Web Scraping Playbook or take a look at some of the guides below!