Skip to main content

Python Scrape Bing

How to Scrape Bing With Requests and BeautifulSoup

There are many popular search engines out there. For most people, Google comes to mind immediately. The second most popular one is Bing. Launched in 2009, Bing is an evolution of earlier search engines owned by Microsoft. It is very useful and if you need, it can work as a full fledged replacement for Google.

Today, we'll build a Bing crawler and a metadata scraper.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape Bing

If you need to scrape Bing but you don't have time to read this article and then write your own scraper, you can use this one. To use this scraper:

  1. Create a new project folder and add a config.json file with your ScrapeOps API key.
  2. Then copy this code and paste it into a Python file of your choice.
  3. Afterward, simply run python name_of_your_script.py.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class MetaData:
name: str = ""
url: str = ""
description: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")


excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number

parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"

search_data = SearchData(
name=h2,
base_url=base_url,
url=href,
page=page_number,
result_number=rank
)
data_pipeline.add_data(search_data)
result_number += 1

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_result(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")

title = head.find("title").text
meta_tags = head.find_all("meta")

meta_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")

description = "n/a"
description_holder = head.select_one("meta[name='description']")
if description_holder:
description = description_holder.get("content")

meta_data = MetaData(
name=title,
url=row["url"],
description=description
)
meta_pipeline.add_data(meta_data)
success = True

meta_pipeline.close_pipeline()

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_result,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Feel free to change any of the following constants from main:

  • MAX_THREADS: Determines the number of threads (parallel tasks) the script will run simultaneously.
  • MAX_RETRIES: Specifies the maximum number of retry attempts for a failed request.
  • PAGES: Controls how many pages of search results the script will process.
  • LOCATION: Defines the geographical location from which the scraping requests should appear to originate.
  • keyword_list: A list of keywords or phrases that will be used as search queries on Bing.

How To Architect Our Bing Scraper

Our Bing scraper project will consist of two scrapers.

  1. The first will be a Bing crawler.
    • The crawler extracts data directly from Bing search results.
  2. The second scraper will be a metadata scraper.
    • The metadata scraper will look up all the sites we pulled with the crawler and extract some of their metadata.

We'll need to do the following tasks with the crawler:

  1. Parse search results.
  2. Paginate our result batches.
  3. Store the data we've parsed.
  4. Perform steps 1 through 3 with concurrency
  5. Use a proxy to avoid getting blocked.

Our scraper will perform these tasks:

  1. Read the CSV file from the crawler.
  2. Parse each row from the file.
  3. Store the data from the step above.
  4. Concurrently perform tasks 2 and 3 on multiple pages simultaneously.
  5. Proxy Integration to bypass anti-bots.

Understanding How To Scrape Bing

Step 1: How To Request Bing Pages

We can request Bing pages with a simple GET request. Take a look at the URL in the screenshot below. Our URL is laid out like this:

https://www.bing.com/search?q=learn+rust
  • ? denotes the fact that we want to perform a query and the query we perform is q=learn+rust.
  • If we wanted to look up online banks, we would instead pass in q=online+banks.

Bing Search Results Page


Step 2: How To Extract Data From Bing Results and Pages

To extract data from Bing, we need to find all of the h2 elements. These h2 tags contain our links. So we first find our h2 and then find the link within it. In the screenshot below, you can see the a element embedded within the h2.

Bing Search Results Page HTML Inspection

After we've finished our crawl, we're going to access a bunch of different websites with different layouts. What all these sites have in common though is metadata.

meta and title tags get embedded within the head element on the page. Take a look at the screenshot below and you can see this in action.

Bing Search Results Meta Data Inspection


Step 3: How To Control Pagination

For pagination control, we simply need to add one param to our URL. start={result_number} holds the key to our pagination.

Like many other sites, Bing gives each result a unique number. Page 0 holds results 1 through 10, page 1 holds 11 through 20... You get the idea, our result_number is page_number * 10.


Step 4: Geolocated Data

To handle geolocation, we'll once again be using the ScrapeOps Proxy API.

The ScrapeOps API takes in a country parameter. This parameter allows us to specifiy our location with the proxy.

  • If we want to appear in the US, we tell ScrapeOps: "country": "us".
  • If we want to appear in the UK, we could tell the server: "country": "uk".

Setting Up Our Bing Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir bing-scraper

cd bing-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build a Bing Search Crawler

Let's get started on our crawler. Our crawler is going to perform a search on Bing. Then it will parse and store the results from the search. We're going to go through and build the following into our crawler step-by-step.

  1. Parsing
  2. Pagination
  3. Data Storage
  4. Concurrency
  5. Proxy Integration

Step 1: Create Simple Search Data Parser

Let's get started by building a basic parsing function.

In the code below, we add our dependencies, some error handling and some basic retry logic. while we still have tries left and the operation hasn't succeeded, we attempt to parse the data. We first find the h2 elements and get our relevant data from there.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")


excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number

parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"

search_data = {
"name": h2,
"base_url": base_url,
"url": url,
"page": page_number,
"result_number": result_number
}

print(search_data)
result_number += 1

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")

Once we've found the h2, we do the following:

  • Find our link element with header.find("a").
  • Extract the link with link.get("href").
  • If we receive an incomplete link, we complete it: href = f"https://www.bing.com{href}".
  • We assign a unique number to each result: rank = result_number.
  • After parsing the url, we extract the base domain name as well: f"{parsed_url.scheme}://{parsed_url.netloc}".

Step 2: Add Pagination

Now that we're parsing pages properly, we need to be able to paginate them. To paginate our URL, we're going to add one param to it, start.

Along with this, we'll add another function, start_scrape(). This function will allow us to call scrape_search_results() on a full set of pages.

Here is start_scrape().

def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)

This function is relatively simple. It just uses a for loop to scrape each page in our list of pages.

Here is the fully updated code we're working with.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")


excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number

parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"

search_data = {
"name": h2,
"base_url": base_url,
"url": url,
"page": page_number,
"result_number": result_number
}

print(search_data)
result_number += 1

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")
  • Our URL now holds a parameter for pagination: "https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
  • start_scrape() allows us to scrape a list of pages.

Step 3: Storing the Scraped Data

Without proper storage, our parsed data is completely useless. To hold our data, we'll use a dataclass, SearchData. This class holds individual information about each result we extract during the crawl.

We then pass the SearchData object into a DataPipeline. Our DataPipeline opens a pipe to a CSV file and puts our SearchData through it. This DataPipeline also goes through and removes duplicates from our storage.

Here is our SearchData class. It holds all of the fields we extracted in the parse along with our page number as well.

@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here, you can get a better look at our DataPipeline as well.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

Our fully updated code is available below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")


excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number

parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"

search_data = SearchData(
name=h2,
base_url=base_url,
url=href,
page=page_number,
result_number=rank
)
data_pipeline.add_data(search_data)
result_number += 1

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • Before calling start_scrape() we create a DataPipeline and pass it into start_scrape().
  • Once we've parsed our data, we turn it into a SearchData object.
  • We then pass our SearchData into the DataPipeline via the add_data() method.

Step 4: Adding Concurrency

We now need to add concurrency to our crawler. Concurrency allows us to scrape multiple pages at the same time. In order to do this, we need to refactor start_scrape() by removing our for loop and replacing it with a call to ThreadPoolExecutor.

Take a look at the finished function below.

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

Look closely at our arguments to executor.map():

  • Our first arg, scrape_search_results is the function we want to call on each open thread.
  • All arguments to scrape_search_results get passed into executor.map() as arrays which then get passed into scrape_search_results on each individual thread that runs.

Our code for the crawler is almost finished. Here is what it looks like now.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")


excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number

parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"

search_data = SearchData(
name=h2,
base_url=base_url,
url=href,
page=page_number,
result_number=rank
)
data_pipeline.add_data(search_data)
result_number += 1

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 5: Bypassing Anti-Bots

Now, we need to unlock the power of proxy. With the ScrapeOps Proxy API, we can go through pretty much any anti-bot that comes our way. This proxy gives us a new IP address in the country of our choosing.

We pass the following params into ScrapeOps: "api_key", "url", "country".

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
  • "api_key" holds our ScrapeOps API key.
  • "url" is the url we're trying to scrape.
  • "country" is the country we'd like to be routed through.
  • The function takes all these params and returns a url configured to the ScrapeOps proxy.

Step 6: Production Run

Now that our crawler is finished, we need to run it in production and get a good feel for its performance. We'll scrape 5 pages of results and see what happens. Here is our updated main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

You can see our results in the screenshot below.

Crawler Performance Terminal

We parsed 5 pages in 7.05 seconds. This comes out to 1.41 seconds per page. This is lightning fast to scrape any page. Your results may vary depending on your hardware, the quality of your internet connection and server latency.


Build a Bing Scraper

Now that we're crawling Bing and generating results, we need to build a scraper that does something with those results. The problem with this is that all of our URLs go to different websites with different layouts.

However, there is one thing that all of these sites have in common... metadata. All sites contain a head tag and within it, they embed the title of the site along with a bunch of metadata elements encased in the tag, meta.

This scraper is going to run the following processes in order:

  1. Read the data from the CSV file.
  2. Parse the metadata from the sites we saved in the CSV.
  3. Store the parsed data inside a new CSV file.
  4. Run steps 2 and 3 concurrently on multiple websites.
  5. Once again, integrate with a proxy to bypass anti-bots.

Step 1: Create Simple Website Data Parser

In this section, we'll start our scraper. Very similar to how we started earlier, we'll start with basic parsing, error handling and retry logic. This gives us a structure that we can continue to build on easily. Take a look at our parsing function.

def process_result(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")

title = head.find("title").text
meta_tags = head.find_all("meta")

description = "n/a"
description_holder = head.select_one("meta[name='description']")
if description_holder:
description = description_holder.get("content")

meta_data = {
"name": title,
"url": row["url"],
"description": description
}

print(meta_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")

while we still have retries left and the operation has not succeeded:

  • We first find the head tag: soup.find("head").
  • Then, we find the title: head.find("title").text.
  • Afterward, we set the default description to "n/a".
  • If there is a description present, we set that value to our description variable. Otherwise, we retain the "n/a" value.

Step 2: Loading URLs To Scrape

In order to use our parsing function, it needs a url. To get our urls, we're going to read the CSV file we generated with the crawler. Once we've read the file, we'll call parse_result() on each row that we read from the CSV file.

Here is our new process_results() function.

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_result(row, location, retries=retries)

After we put it all together, the full code looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")


excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number

parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"

search_data = SearchData(
name=h2,
base_url=base_url,
url=href,
page=page_number,
result_number=rank
)
data_pipeline.add_data(search_data)
result_number += 1

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_result(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")

title = head.find("title").text
meta_tags = head.find_all("meta")

description = "n/a"
description_holder = head.select_one("meta[name='description']")
if description_holder:
description = description_holder.get("content")

meta_data = {
"name": title,
"url": row["url"],
"description": description
}

print(meta_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_result(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

process_results() does all of the following things for us:

  • Read the CSV file into an array.
  • Iterate through the rows of the CSV.
  • Call process_result() on each row from the CSV.

This give us the final structure for how our code will be laid out.


Step 3: Storing the Scraped Data

Just like earlier, we need to store the data we've parsed. In order to do that, we'll add one more dataclass. We'll call this one MetaData. The sole purpose of this one is to hold the site metadata we've beeing parsing.

You can take a look at it below, it's virtually identical to SearchData.

@dataclass
class MetaData:
name: str = ""
url: str = ""
description: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

We then need to open a DataPipeline within our parsing function and pass MetaData into it with add_data(). In the full code below, we do just that.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class MetaData:
name: str = ""
url: str = ""
description: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")


excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number

parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"

search_data = SearchData(
name=h2,
base_url=base_url,
url=href,
page=page_number,
result_number=rank
)
data_pipeline.add_data(search_data)
result_number += 1

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_result(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")

title = head.find("title").text
meta_tags = head.find_all("meta")

meta_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")

description = "n/a"
description_holder = head.select_one("meta[name='description']")
if description_holder:
description = description_holder.get("content")

meta_data = MetaData(
name=title,
url=row["url"],
description=description
)
meta_pipeline.add_data(meta_data)
success = True

meta_pipeline.close_pipeline()

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_result(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
  • We now open up a DataPipeline in our parsing function: meta_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
  • We pass our MetaData object into it: meta_pipeline.add_data(meta_data).
  • After the operation has succeeded, we go ahead and close the pipeline.

Step 4: Adding Concurrency

Adding concurrency is relatively simply now that we've done it once before in this tutorial. In the code below, we refactor process_results() to add multithreading just like we did on the crawler earlier.

Here is our new process_results() function.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_result,
reader,
[location] * len(reader),
[retries] * len(reader)
)
  • process_result is the function we want to call on all of our open threads.
  • All args to process_result get passed in as arrays just like when we added multithreading earlier.

Step 5: Bypassing Anti-Bots

Finally, we need to add proxy integration to our scraper as well. We've already got get_scrapeops_url(), we just need to call it from within our parsing function.

We'll change one line and unlock the full power of the proxy.

response = requests.get(get_scrapeops_url(url, location=location))

You can look at our production ready code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
base_url: str = ""
url: str = ""
page: int = 0
result_number: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class MetaData:
name: str = ""
url: str = ""
description: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.bing.com/search?q={formatted_keyword}&first={result_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
headers = soup.find_all("h2")


excluded_words = ["explore further"]
for header in headers:
if header.text.lower() in excluded_words:
continue
link = header.find("a")
h2 = header.text
if not link:
continue
href = link.get("href")
if "https://" not in href:
href = f"https://www.bing.com{href}"
rank = result_number

parsed_url = urlparse(href)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"

search_data = SearchData(
name=h2,
base_url=base_url,
url=href,
page=page_number,
result_number=rank
)
data_pipeline.add_data(search_data)
result_number += 1

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_result(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")

title = head.find("title").text
meta_tags = head.find_all("meta")

meta_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")

description = "n/a"
description_holder = head.select_one("meta[name='description']")
if description_holder:
description = description_holder.get("content")

meta_data = MetaData(
name=title,
url=row["url"],
description=description
)
meta_pipeline.add_data(meta_data)
success = True

meta_pipeline.close_pipeline()

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_result,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Now, let's test this out in production! Like before, we'll set pages to 5. If you need to look at our main again, here it is.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

As always feel free to tweak any of the constants from your main to change your results. Here are our results.

Scraper Performance Terminal

If you remember from earlier, we crawled 5 pages in roughly 7 seconds and this full crawl and scrape took 35.065 seconds. Our crawl generated a file with 17 results. We'll estimate our scrape at 28 seconds. 28 seconds / 17 results = 1.64 seconds per result.


When you access a website, you're subject to their Terms of Service as well as their robots.txt. Since scraping is a type of access, today, you are subject to Bing's terms.

You can view their terms of service here. Since Bing is a Microsoft product, it is subject to their terms. You can view their robots.txt here.

It is generally legal to scrape data that is publicly available on the web. If you don't have to login, it's considered public data. Any data gated behind a login page is considered private data.

If you're not sure that your scraper is legal, consult an attorney.


Conclusion

You now know how to build a full fledged crawl and scrape project on Bing. You've got a solid understanding of parsing, pagination, data storage, concurrency, and proxy integration. You should also have decent grasp on how to use Requests and BeautifulSoup. Take this new knowledge and go build something!


More Python Web Scraping Guides

If you're looking to learn more or you're in a mood to binge read, take a look at the guided projects below. You'll continue mastering Requests and BeautifulSoup and you'll also implement the concepts we worked on this guide.

Check our The Python Web Scraping Playbook or take a look at some of the guides below!