Then check out ScrapeOps, the complete toolkit for web scraping.
<div>
elements.<h3>
taghref
that links to a websiteimport requestsfrom bs4 import BeautifulSoupfrom urllib.parse import urlparse, parse_qs, urlencodeimport csvimport concurrentfrom concurrent.futures import ThreadPoolExecutorimport osimport loggingimport timefrom dataclasses import dataclass, field, fields, asdictheaders = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str base_url: str link: str page: int result_number: int def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_open = False def save_to_csv(self): self.csv_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) with open(self.csv_filename, mode="a", encoding="UTF-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate Item Found: {input_data.name}. Item dropped") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url): payload = {'api_key': API_KEY, 'url': url, 'country': 'us'} proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload) return proxy_url def search_page(query, page, location="United States", headers=headers, pipeline=None, num=100, retries=3): url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}" payload = { "api_key": API_KEY, "url": url, } tries = 0 success = False while tries <= retries and not success: try: response = requests.get(get_scrapeops_url(url)) soup = BeautifulSoup(response.text, 'html.parser') divs = soup.find_all("div") index = 0 last_link = "" for div in divs: h3s = div.find_all("h3") if len(h3s) > 0: link = div.find("a", href=True) parsed_url = urlparse(link["href"]) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" site_info = {'title': h3s[0].text, "base_url": base_url, 'link': link["href"], "page": page, "result_number": index} search_data = SearchData( name = site_info["title"], base_url = site_info["base_url"], link = site_info["link"], page = site_info["page"], result_number = site_info["result_number"] ) if site_info["link"] != last_link: index += 1 last_link = site_info["link"] if pipeline: pipeline.add_data(search_data) success = True except: print(f"Failed to scrape page {page}") print(f"Retries left: {retries-tries}") tries += 1 if not success: print(f"Failed to scrape page {page}, no retries left") raise Exception(f"Max retries exceeded: {retries}") else: print(f"Scraped page {page} with {retries-tries} retries left") def full_search(query, pages=3, location="us", MAX_THREADS=5, MAX_RETRIES=3, num=10): with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: pipeline = DataPipeline(csv_filename=f"{query.replace(' ', '-')}.csv") tasks = [executor.submit(search_page, query, page, location, None, pipeline, num, MAX_RETRIES) for page in range(pages)] for future in tasks: future.result() pipeline.close_pipeline() if __name__ == "__main__": MAX_THREADS = 5 MAX_RETRIES = 5 queries = ["cool stuff"] logger.info("Starting full search...") for query in queries: full_search(query, pages=3, num=10) logger.info("Search complete.")
search_results
line:full_search(query, pages=20)
full_search(query, pages=100)
QUERIES
array: ["cool stuff", "boring stuff"]
Other things you can tweak are:location
MAX_THREADS
MAX_RETRIES
num
yourscript.py
. Obviously you can name it whatever you want. Once you have your script and dependencies installed, run the following command:python yourscript.py
https://www.google.com/search?q=cool+stuff
?q=cool+stuff
.In the address bar, a question mark, ?
denotes a query (in this case we're querying q
) and the variable for our query is denoted by an equal operator, =
.So, ?q=cool+stuff
means that our search query is for cool stuff
. If we wanted to search for boring stuff, we could instead use ?q=boring+stuff
.In the days of old, at the bottom of our page, we would see a list of page numbers. This made search results incredibly easy to scrape.While Google doesn't exactly give us page numbers anymore, they do give us a start
query that we can use in order to paginate our results. We get our results in batches of 10. With variables figured in, our url will look like this:https://www.google.com/search?q={query}&start={page * 10}
num
query that we can use to control the number of results that we get. Taking num
into account, our url would look more like this:https://www.google.com/search?q={query}&start={page * num}&num={num}"
num
up to 100 results, but Google's response doesn't always give us these results when we request them. Multiple times throughout the writing of this article, I've used num=100
and been blocked or gotten smaller results. Other times I have gotten proper results.<h3>
tag. To find our results, we can simply use BeautifulSoup's .find_all()
method. Some websites like to nest a bunch of different things inside of an element and Google is no exception.Here is the full HTML of our first result: <h3 class="LC20lb MBeuO DKV0Md">Cool Stuff</h3>
. As you can see, the class name is a bunch of jumbled garbage and there is no link within the tag! This is because Google (like many other sites) nests all of our important information within a <div>
.If the class name of each result was more legible and not subject to change, I would recommend using this as a way to parse the result. Since the class name is likely to change over time, we're simply going to get all of the <div>
elements and find the <h3>
elements nested inside of them. We'll use soup.find_all()
and we'll use a last_link
variable.For each result we get, we'll compair its link to the last link. If the current link is the same as the last link, we'll ignore this element and move on to the next one.https://www.google.com/search?q={query}&start={page * 10}
geo_location
parameter to our request.At the moment, our full request looks like this:https://www.google.com/search?q={query}&start={page * 10}
https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}
google-search-requests
.You can create a new folder through your file explorer or enter the following command:mkdir google-search-requests
Python3.10-venv
.First, we'll create a new virtual environment:Linux/Macpython3 -m venv google-search
python -m venv google-search
source google-search/bin/activate
.\google-search\Scripts\Activate.ps1
requests
and beautifulsoup4
.pip install requests beautifulsoup4
import requestsfrom bs4 import BeautifulSoupfrom urllib.parse import urlparse, parse_qs, urlencode #search a single pagedef google_search(query, retries=3): tries = 0 #runtime loop for the scrape while tries <= retries: try: url = f"https://www.google.com/search?q={query}" response = requests.get(url) results = [] last_link = "" soup = BeautifulSoup(response.text, 'html.parser') index = 0 for result in soup.find_all('div'): title = result.find('h3') if title: title = title.text else: continue base_url = "" link = result.find('a', href=True) if link: link = link['href'] parsed_url = urlparse(link) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" else: continue #this is the full site info we wish to extract site_info = {'title': title, "base_url": base_url, 'link': link, "result_number": index} if last_link != site_info["link"]: results.append(result) #return our list of results print(f"Finished scrape with {tries} retries") return results except: print("Failed to scrape the page") print("Retries left:", retries-tries) tries += 1 #if this line executes, the scrape has failed raise Exception(f"Max retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 5 QUERIES = ["cool stuff"] for query in QUERIES: results = google_search("cool stuff", retries=MAX_RETRIES) for result in results: print(result)
google_search()
function that takes our query as a parameterBeautifulSoup(response.text, 'html.parser')
creates a BeautifulSoup
instance to parse through the HTMLsoup.find_all("div")
finds all the <div>
objectsresult.find("h3")
is used to find the header element of each resultlink = result.find('a', href=True)
extracts the link from the resulturlparse(link)
parses our linkbase_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
reconstructs the base_url
so we can save itdict
, site_info
from the data we've extractedlink
from site_info
is different than last_link
, we add our result to the results
listresults
listimport requestsfrom bs4 import BeautifulSoupfrom urllib.parse import urlparse, parse_qs def google_search(query, pages=3, location="United States", retries=3): headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'} results = [] last_link = "" for page in range(0, pages): tries = 0 while tries <= retries: try: url = f"https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}" response = requests.get(url, headers=headers) soup = BeautifulSoup(response.text, 'html.parser') index = 0 for result in soup.find_all('div'): title = result.find('h3') if title: title = title.text else: continue base_url = "" #pull the raw link from the result link = result.find('a', href=True) if link: link = link['href'] parsed_url = urlparse(link) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" else: continue #this is the full site info we wish to extract site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index} #if the link is different from the last link if last_link != site_info["link"]: results.append(site_info) index += 1 last_link = link print(f"Scraped page {page} with {retries} retries left") return results except: print(f"Failed to scrape page {page}") print(f"Retries left: {retries-tries}") raise Exception(f"Max retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 5 QUERIES = ["cool stuff"] for query in QUERIES: results = google_search("cool stuff", retries=MAX_RETRIES) for result in results: print(result)
&start={page * 10}
. This is the basis for how we try to batch our results. We also add in functionality for our geo_location
, but by the time we add our proxy, this functionality is actually going to be moved elsewhere in our code.dict
with key-value pairs. Here is the first result so you can see how the data is laid out:{'title': 'Cool Stuff', 'base_url': '://', 'link': '/search?sca_esv=3d5aec0ebbda9031&q=cool+stuff&uds=AMwkrPusHYa-Y5lqXPwpg8jJI99FKYz2zi9dec3bfM0lH-hil3eHKWSsmwBdtnNX2uzO7rvzH_UOAG-8W6q5RMgyj5EtPQRweAkj97b7yv-dxhFjVNmTpUmjIG8LX5BTVMn1i8RvhFDaroRDPKXSl9mGzRdmu5ujMGh35B6t9hZQe5OWf6qF9qyxdHJPailq0Was2Ti5R1Efg6G0TWkZl8Q0a4QgLEUcLEh8uM-Gr_AIA73YM8e13Y_Y5x_btmkZoDODrensXIErfUplY9wGJ9in8N6PV9WQjCg77wu2IOm5pmE8706LnWQ&udm=2&prmd=isvnmbtz&sa=X&ved=2ahUKEwi459DNvrOFAxXzh1YBHfFMDlsQtKgLegQIEhAB', 'page': 0, 'result_number': 0}
title
, base_url
, link
, page
, and result_number
. Because we have uniform data stored in key-value pairs, we already have the makings of a DataFrame
and therefore a CSV
.To our imports, add the following line:import csv
import requestsfrom bs4 import BeautifulSoupfrom urllib.parse import urlparse, parse_qsimport csvfrom os import path def write_page_to_csv(filename, object_array): path_to_csv = filename file_exists = path.exists(filename) with open(path_to_csv, mode="a", newline="", encoding="UTF-8") as file: #name the headers after our object keys writer = csv.DictWriter(file, fieldnames=object_array[0].keys()) if not file_exists: writer.writeheader() writer.writerows(object_array) def google_search(query, pages=3, location="United States", retries=3): headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'} results = [] last_link = "" for page in range(0, pages): tries = 0 success = False while tries <= retries and not success: try: url = f"https://www.google.com/search?q={query}&start={page * 10}" response = requests.get(url, headers=headers) print(f"Response Code: {response.status_code}") soup = BeautifulSoup(response.text, 'html.parser') index = 0 for result in soup.find_all('div'): title = result.find('h3') if title: title = title.text else: continue base_url = "" #pull the raw link from the result link = result.find('a', href=True) if link: link = link['href'] parsed_url = urlparse(link) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" else: continue #this is the full site info we wish to extract site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index} #if the link is different from the last link if last_link != site_info["link"]: results.append(site_info) index += 1 last_link = link print(f"Scraped page {page} with {retries} retries left") write_page_to_csv(f"{query}.csv", results) success = True except: print(f"Failed to scrape page {page}") print(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 5 QUERIES = ["cool stuff"] for query in QUERIES: google_search("cool stuff", retries=MAX_RETRIES)
write_page_to_csv()
function:def write_page_to_csv(filename, object_array): path_to_csv = filename file_exists = path.exists(filename) with open(path_to_csv, mode="a", newline="", encoding="UTF-8") as file: #name the headers after our object keys writer = csv.DictWriter(file, fieldnames=object_array[0].keys()) if not file_exists: writer.writeheader() writer.writerows(object_array)
object_array
(in this case our page results) and writes it to our filename
.append
it.append
mode so we don't overwrite any important data that we've scraped previously.google_search()
function into two separate functions, search_page()
and full_search()
. search_page()
will search a single page and full_search()
will create multiple threads that call search_page()
concurrently.Add the following import
statement:from concurrent.futures import ThreadPoolExecutor
google_search()
function into our search_page()
function.def search_page(query, page, location="United States", retries=3, num=100): headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'} results = [] last_link = "" tries = 0 success = False while tries <= retries and not success: try: url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}" response = requests.get(url, headers=headers) if response.status_code != 200: print("Failed server response", response.status_code) raise Exception("Failed server response!") print(f"Response Code: {response.status_code}") soup = BeautifulSoup(response.text, 'html.parser') index = 0 for result in soup.find_all('div'): title = result.find('h3') if title: title = title.text else: continue base_url = "" #pull the raw link from the result link = result.find('a', href=True) if link: link = link['href'] parsed_url = urlparse(link) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" else: continue #this is the full site info we wish to extract site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index} #if the link is different from the last link if last_link != site_info["link"]: results.append(site_info) index += 1 last_link = link write_page_to_csv(f"{query}.csv", results) success = True except: print(f"Failed to scrape page {page}") print(f"Retries left: {retries-tries}") tries += 1 if not success: print(f"Failed to scrape page {page}, no retries left") raise Exception(f"Max retries exceeded: {retries}") else: print(f"Scraped page {page} with {retries} retries left")
pages
argument and replaces it with page
for
loop and iterating through pages, we simply execute our parsing logic on the page
we're searchingfull_search()
function:def full_search(query, pages=3, location="United States", MAX_THREADS=5, MAX_RETRIES=4, num=100): page_numbers = list(range(pages)) full_results = [] with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: executor.map(search_page, [query]*pages, page_numbers, [location]*pages, [MAX_RETRIES], [num])
query
. Everything else is a kwarg
used to tweak our settings.executor.map()
. As bizarre as it looks, it's actually pretty simple.search_page
as the first argument and the rest of the args are just parameters that we wish to pass into search_page()
.import requestsfrom bs4 import BeautifulSoupfrom urllib.parse import urlparse, parse_qsimport csvfrom os import pathfrom concurrent.futures import ThreadPoolExecutor def write_page_to_csv(filename, object_array): path_to_csv = filename file_exists = path.exists(filename) with open(path_to_csv, mode="a", newline="", encoding="UTF-8") as file: #name the headers after our object keys writer = csv.DictWriter(file, fieldnames=object_array[0].keys()) if not file_exists: writer.writeheader() writer.writerows(object_array) def search_page(query, page, location="United States", retries=3, num=100): headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'} results = [] last_link = "" tries = 0 success = False while tries <= retries and not success: try: url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}" response = requests.get(url, headers=headers) if response.status_code != 200: print("Failed server response", response.status_code) raise Exception("Failed server response!") print(f"Response Code: {response.status_code}") soup = BeautifulSoup(response.text, 'html.parser') index = 0 for result in soup.find_all('div'): title = result.find('h3') if title: title = title.text else: continue base_url = "" #pull the raw link from the result link = result.find('a', href=True) if link: link = link['href'] parsed_url = urlparse(link) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" else: continue #this is the full site info we wish to extract site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index} #if the link is different from the last link if last_link != site_info["link"]: results.append(site_info) index += 1 last_link = link write_page_to_csv(f"{query}.csv", results) success = True except: print(f"Failed to scrape page {page}") print(f"Retries left: {retries-tries}") tries += 1 if not success: print(f"Failed to scrape page {page}, no retries left") raise Exception(f"Max retries exceeded: {retries}") else: print(f"Scraped page {page} with {retries} retries left") def full_search(query, pages=3, location="United States", MAX_THREADS=5, MAX_RETRIES=4, num=100): page_numbers = list(range(pages)) full_results = [] with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: executor.map(search_page, [query]*pages, page_numbers, [location]*pages, [MAX_RETRIES], [num]) if __name__ == "__main__": MAX_RETRIES = 5 QUERIES = ["cool stuff"] for query in QUERIES: full_search(query, pages=1)
get_scrapeops_url()
. This is a really simply function that just performs some basic string formatting for us, but this is vital to our scraper. We now have the ability to convert any url into a proxied url with very minimal impact on our overall code. With this function, we can now run our Python script, without getting blocked!import requestsfrom bs4 import BeautifulSoupfrom urllib.parse import urlparse, parse_qs, urlencodeimport csvfrom os import pathfrom concurrent.futures import ThreadPoolExecutor #our default user agentheaders = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" def get_scrapeops_url(url, location='us'): payload = {'api_key': API_KEY, 'url': url, 'country': location} proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload) return proxy_url def write_page_to_csv(filename, object_array): path_to_csv = filename file_exists = path.exists(filename) with open(path_to_csv, mode="a", newline="", encoding="UTF-8") as file: #name the headers after our object keys writer = csv.DictWriter(file, fieldnames=object_array[0].keys()) if not file_exists: writer.writeheader() writer.writerows(object_array) def search_page(query, page, location="United States", retries=3, num=100): headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'} results = [] last_link = "" tries = 0 success = False while tries <= retries and not success: try: url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}" response = requests.get(get_scrapeops_url(url), headers=headers) if response.status_code != 200: print("Failed server response", response.status_code) raise Exception("Failed server response!") print(f"Response Code: {response.status_code}") soup = BeautifulSoup(response.text, 'html.parser') index = 0 for result in soup.find_all('div'): title = result.find('h3') if title: title = title.text else: continue base_url = "" #pull the raw link from the result link = result.find('a', href=True) if link: link = link['href'] parsed_url = urlparse(link) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" else: continue #this is the full site info we wish to extract site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index} #if the link is different from the last link if last_link != site_info["link"]: results.append(site_info) index += 1 last_link = link write_page_to_csv(f"{query}.csv", results) success = True except: print(f"Failed to scrape page {page}") print(f"Retries left: {retries-tries}") tries += 1 if not success: print(f"Failed to scrape page {page}, no retries left") raise Exception(f"Max retries exceeded: {retries}") else: print(f"Scraped page {page} with {retries} retries left") def full_search(query, pages=3, location="us", MAX_THREADS=5, MAX_RETRIES=4, num=100): page_numbers = list(range(pages)) full_results = [] with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: executor.map(search_page, [query]*pages, page_numbers, [location]*pages, [MAX_RETRIES]*pages, [num]*pages) if __name__ == "__main__": MAX_RETRIES = 5 RESULTS_PER_PAGE = 10 QUERIES = ["cool stuff"] for query in QUERIES: full_search(query, pages=3, num=RESULTS_PER_PAGE)
proxy_url
:
get_scrapeops_url()
to convert regular urls into proxied onessearch_page()
function and we use multithreading in the main
block at the bottom of the script instead a full_search()
function. We also added basic logging and file handling to prevent from overwriting results.Take note of the following classes: SearchData
and DataPipeline
. SearchData
is a more simple class that basically just holds the data we're choosing to scrape. DataPipeline
is where the real heavy lifting gets done.import requestsfrom bs4 import BeautifulSoupfrom urllib.parse import urlparse, parse_qs, urlencodeimport csvimport concurrentfrom concurrent.futures import ThreadPoolExecutorimport osimport loggingimport timefrom dataclasses import dataclass, field, fields, asdict#our default user agentheaders = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str base_url: str link: str page: int result_number: int def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_open = False def save_to_csv(self): self.csv_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) with open(self.csv_filename, mode="a", encoding="UTF-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate Item Found: {input_data.name}. Item dropped") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url): payload = {'api_key': API_KEY, 'url': url, 'country': 'us'} proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload) return proxy_url def search_page(query, page, location="United States", headers=headers, pipeline=None, num=100, retries=3): url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}" payload = { "api_key": API_KEY, "url": url, } tries = 0 success = False while tries <= retries and not success: try: response = requests.get(get_scrapeops_url(url)) soup = BeautifulSoup(response.text, 'html.parser') divs = soup.find_all("div") index = 0 last_link = "" for div in divs: h3s = div.find_all("h3") if len(h3s) > 0: link = div.find("a", href=True) parsed_url = urlparse(link["href"]) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" site_info = {'title': h3s[0].text, "base_url": base_url, 'link': link["href"], "page": page, "result_number": index} search_data = SearchData( name = site_info["title"], base_url = site_info["base_url"], link = site_info["link"], page = site_info["page"], result_number = site_info["result_number"] ) if site_info["link"] != last_link: index += 1 last_link = site_info["link"] if pipeline: pipeline.add_data(search_data) success = True except: print(f"Failed to scrape page {page}") print(f"Retries left: {retries-tries}") tries += 1 if not success: print(f"Failed to scrape page {page}, no retries left") raise Exception(f"Max retries exceeded: {retries}") else: print(f"Scraped page {page} with {retries-tries} retries left") def full_search(query, pages=3, location="us", MAX_THREADS=5, MAX_RETRIES=3, num=10): with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: pipeline = DataPipeline(csv_filename=f"{query.replace(' ', '-')}.csv") tasks = [executor.submit(search_page, query, page, location, None, pipeline, num, MAX_RETRIES) for page in range(pages)] for future in tasks: future.result() pipeline.close_pipeline() if __name__ == "__main__": MAX_THREADS = 5 MAX_RETRIES = 5 queries = ["cool stuff"] logger.info("Starting full search...") for query in queries: full_search(query, pages=3, num=10) logger.info("Search complete.")
SearchData
is a class that simply holds our dataDataPipeline
does all the heavy lifting of removing duplicates and writing the data to our csv filerobots.txt
file to see what they allow. Generally, if you are scraping as a guest (not logged in), the information is considered to be public and scraping is usually alright.You can look at Google's robots.txt
here. In addition, if you're unclear about whether or not you can scrape a site, check their Terms and Conditions.You can view Google's Terms and Conditions here. Similar to many other companies, Google reserves the right to suspend, terminate or delete you account if they have reason to believe that you are connected to suspicious or malicious activity.Also, do not collect and release anyone's personal data when scraping. In many countries this is illegal, and even if it is legal in your country, it's a pretty immoral thing to do. Always consider how your scraped data will be used as well. When you scrape a site from Google, some of the information you find might fall under the Terms and Conditions of that site as well.Then check out ScrapeOps, the complete toolkit for web scraping.
from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom time import sleepimport csvfrom concurrent.futures import ThreadPoolExecutorfrom urllib.parse import urlencodeimport osimport loggingfrom dataclasses import dataclass, field, fields, asdict #create a custom options instanceoptions = webdriver.ChromeOptions()#add headless mode to our optionsoptions.add_argument("--headless") API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str link: str result_number: int page_number: int def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True self.data_to_save = [] self.data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not self.data_to_save: return keys = [field.name for field in fields(self.data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="UTF-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in self.data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url): payload = {'api_key': API_KEY, 'url': url, 'country': 'us'} proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload) return proxy_url #this function performs a search and parses the resultsdef search_page(query, page, location): #start Chrome with our custom options driver = webdriver.Chrome(options=options) #go to the page driver.get(get_scrapeops_url(f"https://www.google.com/search?q={query}&start={page * 10}")) #find each div containing site info...THEY'RE SUPER NESTED!!! divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div") #list to hold our results results = [] #index, this will be used to number the results index = 0 #last link last_link = "" #iterate through our divs for div in divs: #find the title element title = div.find_elements(By.CSS_SELECTOR, "h3") link = div.find_elements(By.CSS_SELECTOR, "a") if len(title) > 0 and len(link) > 0: #result number on the page result_number = index #site info object site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page} if site_info["link"] != last_link: #add the object to our list results.append(site_info) #increment the index index += 1 #update the last link last_link = site_info["link"] #the scrape has finished, close the browser driver.quit() #return the result list return results#function to search multiple pages, calls search_page() on eachdef full_search(query, pages=3, location="United States"): #list for our full results full_results = [] #list of page numbers page_numbers = list(range(0, pages)) #open with a max of 5 threads with ThreadPoolExecutor(max_workers=5) as executor: #call search page, pass all the following aruments into it future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages) #for each thread result for page_result in future_results: #add it to the full_results full_results.extend(page_result) #return the finalized list return full_results if __name__ == "__main__": logger.info("Starting scrape") data_pipeline = DataPipeline(csv_filename="production-search.csv") search_results = full_search("cool stuff") for result in search_results: search_data = SearchData(name=result["title"], link=result["link"], result_number=result["result_number"] , page_number=result["page"]) data_pipeline.add_data(search_data) data_pipeline.close_pipeline() logger.info("Scrape Complete")
python your-script.py
"cool stuff"
with whatever you'd like to querypages
kwargfull_search("boring stuff", pages=100)
https://www.google.com/search?q=cool+stuff
https://www.google.com
/search
?q=cool+stuff
represents the query we're making:
?
denotes the queryq
is the value that we're queryingcool+stuff
is equivalent to the string, "cool stuff"...+
denotes a space in the words<h3>
tag, so this is a good place to look. If you choose to inspect the page further, you'll come to notice that each of these headers is deeply nested inside a number of <div>
tags.To find our results, we need to find all the div
elements containing these h3
elements. If we properly identify and parse each div
, we can extract all of the relevant information from it.?
denotes a query. We can actually add other query parameters using &
. Google typically gives us results in batches of 10. With this in mind, we can actually request multiple "pages" by passing in a start query.After the the start
parameter is added, our formatted url looks like this:
'https://www.google.com/search?q={query}&start={page * 10}'
page
number multiplied by 10 because of the way our results get delivered. If we want to start at 0, our start would be {0 * 10}
. The next batch of results would be {1 * 10}
. Then {2 * 10}
and so on and so forth.geo_location
parameter to our query, we can actually get results based on that individual location.Now, our formatted url would look like this:
'https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}'
mkdir google-search
python3 -m venv google-search
python -m venv google-search
source google-search/bin/activate
.\google-search\Scripts\Activate.ps1
pip
.pip install selenium
google-chrome --version
Google Chrome 123.0.6312.105
'https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}'
from selenium import webdriverfrom selenium.webdriver.common.by import By#create a custom options instanceoptions = webdriver.ChromeOptions()#add headless mode to our optionsoptions.add_argument("--headless")#this function performs a search and parses the resultsdef search_page(query): #start Chrome with our custom options driver = webdriver.Chrome(options=options) #go to the page driver.get(f"https://www.google.com/search?q={query}") #find each div containing site info...THEY'RE SUPER NESTED!!! divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div") #list to hold our results results = [] #index, this will be used to number the results index = 0 #iterate through our divs for div in divs: #find the title element title = div.find_elements(By.CSS_SELECTOR, "h3") #find the link element link = div.find_elements(By.CSS_SELECTOR, "a") #result number on the page result_number = index #if we have a result if len(title) > 0: #site info object site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number} #add the object to our list results.append(site_info) #increment the index index += 1 #the scrape has finished, close the browser driver.quit() #return the result list return results ####this is our main program down here####search_results = search_page("cool stuff")#print our resultsfor result in search_results: print(result)
ChromeOptions
and add the "--headless"
argument to itsearch_page()
function that takes a query as a parameterwebdriver.Chrome(options=options)
opens our browser in headless modedriver.get()
to go to our sitediv
elements using their CSS Selector...They are SUPER NESTED!index
variable so that we can give each result a numberfind_elements()
to get the title
and link
for each objectfind_elements()
is not empty, we save the following:
title.text
link.get_attribute("href")
result_number
'https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}'
search_page()
function.from selenium import webdriverfrom selenium.webdriver.common.by import By#create a custom options instanceoptions = webdriver.ChromeOptions()#add headless mode to our optionsoptions.add_argument("--headless")#this function performs a search and parses the resultsdef search_page(query, page, location): #start Chrome with our custom options driver = webdriver.Chrome(options=options) #go to the page driver.get(f"https://www.google.com/search?q={query}&start={page * 10}&location={location}") #find each div containing site info...THEY'RE SUPER NESTED!!! divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div") #list to hold our results results = [] #index, this will be used to number the results index = 0 #last link last_link = "" #iterate through our divs for div in divs: #find the title element title = div.find_elements(By.CSS_SELECTOR, "h3") #find the link element link = div.find_elements(By.CSS_SELECTOR, "a") #result number on the page result_number = index #if we have a result if len(title) > 0: #site info object site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page} if site_info["link"] != last_link: #add the object to our list results.append(site_info) #increment the index index += 1 #update the last link last_link = site_info["link"] #the scrape has finished, close the browser driver.quit() #return the result list return results#function to search multiple pages, calls search_page() on eachdef full_search(query, pages=3, location="United States"): #list for our full results full_results = [] #iterate through our pages for page in range(0, pages): #get the results of the page page_results = search_page(query, page, location) #add them to the full_results list full_results.extend(page_results) #return the finalized list return full_results####this is our main program down here####search_results = full_search("cool stuff")#print our resultsfor result in search_results: print(result)
search_page()
now takes three arguments: query
, page
, and location
page
and location
have been added into the formatted urllast_link
and use it to prevent doubles from getting into our resultsfull_search()
functionfull_search()
simply runs search_page()
on a list of pages and returns a full list of resultsdict
objects from each of our functions. The reason for using these dictionaries is simple, when you hold object data in a dict
of key-value pairs, it's really easy to transform it into something else.Not all libraries are build to handle all data formats, but almost all of them support JSON or dictionaries (both of these formats are key-value pairs).Now, we'll remove the following code from the bottom of the script:#print our resultsfor result in search_results: print(result)
import csv
#path to the csv filepath_to_csv = "search-results.csv"#open the file in write modewith open(path_to_csv, "w") as file: #format the file based on the keys of the first result writer = csv.DictWriter(file, search_results[0].keys()) #write the headers writer.writeheader() #write each object as a row in the file writer.writerows(search_results)
path_to_csv
variablepath_to_csv
and "w"
as arguments to open the file in write modecsv.DictWriter(file, search_results[0].keys())
tells the writer
object to format our file based on the keys of the first dict
object in our listwriter.writeheader()
writes the actual headers to the documentwriter.writerows(search_results)
writes our actual search results to the csv filefull_search()
function so that things are done concurrently.Here is our modified full_search()
function:#function to search multiple pages, calls search_page() on eachdef full_search(query, pages=3, location="United States"): #list for our full results full_results = [] #list of page numbers page_numbers = list(range(0, pages)) #open with a max of 5 threads with ThreadPoolExecutor(max_workers=5) as executor: #call search page, pass all the following aruments into it future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages) #for each thread result for page_result in future_results: #add it to the full_results full_results.extend(page_result) #return the finalized list return full_results
ThreadPoolExecutor
instance with a max of 5 workersexecutor.map(search_page, [query] * pages, page_numbers, [location] * pages)
calls search_page()
and passes in lists of arguments to itpage_result
and use extend()
to add it to the full_results
listfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport csvfrom concurrent.futures import ThreadPoolExecutor#create a custom options instanceoptions = webdriver.ChromeOptions()#add headless mode to our optionsoptions.add_argument("--headless")#this function performs a search and parses the resultsdef search_page(query, page, location): #start Chrome with our custom options driver = webdriver.Chrome(options=options) #go to the page driver.get(f"https://www.google.com/search?q={query}&start={page * 10}&location={location}") #find each div containing site info...THEY'RE SUPER NESTED!!! divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div") #list to hold our results results = [] #index, this will be used to number the results index = 0 #last link last_link = "" #iterate through our divs for div in divs: #find the title element title = div.find_elements(By.CSS_SELECTOR, "h3") #find the link element link = div.find_elements(By.CSS_SELECTOR, "a") #result number on the page result_number = index #if we have a result if len(title) > 0: #site info object site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page} if site_info["link"] != last_link: #add the object to our list results.append(site_info) #increment the index index += 1 #update the last link last_link = site_info["link"] #the scrape has finished, close the browser driver.quit() #return the result list return results#function to search multiple pages, calls search_page() on eachdef full_search(query, pages=3, location="United States"): #list for our full results full_results = [] #list of page numbers page_numbers = list(range(0, pages)) #open with a max of 5 threads with ThreadPoolExecutor(max_workers=5) as executor: #call search page, pass all the following aruments into it future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages) #for each thread result for page_result in future_results: #add it to the full_results full_results.extend(page_result) #return the finalized list return full_results####this is our main program down here#####results from the searchsearch_results = full_search("cool stuff")#path to the csv filepath_to_csv = "concurrency.csv"#open the file in write modewith open(path_to_csv, "w") as file: #format the file based on the keys of the first result writer = csv.DictWriter(file, search_results[0].keys()) #write the headers writer.writeheader() #write each object as a row in the file writer.writerows(search_results)
def get_scrapeops_url(url): payload = {'api_key': API_KEY, 'url': url, 'country': 'us'} proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload) return proxy_url
driver.get()
this new proxied url just like we would with a non-proxied url. When scraping at scale, we need to use proxies consistently.The ScrapeOps Proxy rotates IP addresses and always uses the best proxy available for each request. This actually allows each of our requests to show up as a different user with potentially a different browser, OS and often a different location as well.When using a proxy, no one can block you based on your location, because your location changes whenever you make a new request to the site.Here is a proxied version of our script:from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom time import sleepimport csvfrom concurrent.futures import ThreadPoolExecutorfrom urllib.parse import urlencode#create a custom options instanceoptions = webdriver.ChromeOptions()#add headless mode to our optionsoptions.add_argument("--headless") API_KEY = "YOUR-SUPER-SECRET-API-KEY"def get_scrapeops_url(url): payload = {'api_key': API_KEY, 'url': url, 'country': 'us'} proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload) return proxy_url #this function performs a search and parses the resultsdef search_page(query, page, location): #start Chrome with our custom options driver = webdriver.Chrome(options=options) #go to the page driver.get(get_scrapeops_url(f"https://www.google.com/search?q={query}&start={page * 10}")) #find each div containing site info...THEY'RE SUPER NESTED!!! divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div") #list to hold our results results = [] #index, this will be used to number the results index = 0 #last link last_link = "" #iterate through our divs for div in divs: #find the title element title = div.find_elements(By.CSS_SELECTOR, "h3") link = div.find_elements(By.CSS_SELECTOR, "a") if len(title) > 0 and len(link) > 0: #result number on the page result_number = index #site info object site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page} if site_info["link"] != last_link: #add the object to our list results.append(site_info) #increment the index index += 1 #update the last link last_link = site_info["link"] #the scrape has finished, close the browser driver.quit() #return the result list return results#function to search multiple pages, calls search_page() on eachdef full_search(query, pages=3, location="United States"): #list for our full results full_results = [] #list of page numbers page_numbers = list(range(0, pages)) #open with a max of 5 threads with ThreadPoolExecutor(max_workers=5) as executor: #call search page, pass all the following aruments into it future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages) #for each thread result for page_result in future_results: #add it to the full_results full_results.extend(page_result) #return the finalized list return full_results if __name__ == "__main__": search_results = full_search("cool stuff") #path to the csv file path_to_csv = "proxied.csv" #open the file in write mode with open(path_to_csv, "w") as file: #format the file based on the keys of the first result writer = csv.DictWriter(file, search_results[0].keys()) #write the headers writer.writeheader() #write each object as a row in the file writer.writerows(search_results)
"YOUR-SUPER-SECRET-API-KEY"
should be replaced by your API keyget_scrapeops_url()
converts normal urls into proxied onesmain
code block at the end of the script, this is because we're closer to productionSearchData
class and a DataPipeline
class. SearchData
doesn't do much other than hold and format individual results. The DataPipeline
is where the real heavy lifting gets done as far as our production storage.Here is our production scraper:from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom time import sleepimport csvfrom concurrent.futures import ThreadPoolExecutorfrom urllib.parse import urlencodeimport osimport loggingfrom dataclasses import dataclass, field, fields, asdict #create a custom options instanceoptions = webdriver.ChromeOptions()#add headless mode to our optionsoptions.add_argument("--headless") API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str link: str result_number: int page_number: int def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True self.data_to_save = [] self.data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not self.data_to_save: return keys = [field.name for field in fields(self.data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="UTF-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in self.data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url): payload = {'api_key': API_KEY, 'url': url, 'country': 'us'} proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload) return proxy_url #this function performs a search and parses the resultsdef search_page(query, page, location): #start Chrome with our custom options driver = webdriver.Chrome(options=options) #go to the page driver.get(get_scrapeops_url(f"https://www.google.com/search?q={query}&start={page * 10}")) #find each div containing site info...THEY'RE SUPER NESTED!!! divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div") #list to hold our results results = [] #index, this will be used to number the results index = 0 #last link last_link = "" #iterate through our divs for div in divs: #find the title element title = div.find_elements(By.CSS_SELECTOR, "h3") link = div.find_elements(By.CSS_SELECTOR, "a") if len(title) > 0 and len(link) > 0: #result number on the page result_number = index #site info object site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page} if site_info["link"] != last_link: #add the object to our list results.append(site_info) #increment the index index += 1 #update the last link last_link = site_info["link"] #the scrape has finished, close the browser driver.quit() #return the result list return results#function to search multiple pages, calls search_page() on eachdef full_search(query, pages=3, location="United States"): #list for our full results full_results = [] #list of page numbers page_numbers = list(range(0, pages)) #open with a max of 5 threads with ThreadPoolExecutor(max_workers=5) as executor: #call search page, pass all the following aruments into it future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages) #for each thread result for page_result in future_results: #add it to the full_results full_results.extend(page_result) #return the finalized list return full_results if __name__ == "__main__": logger.info("Starting scrape") data_pipeline = DataPipeline(csv_filename="production-search.csv") search_results = full_search("cool stuff") for result in search_results: search_data = SearchData(name=result["title"], link=result["link"], result_number=result["result_number"] , page_number=result["page"]) data_pipeline.add_data(search_data) data_pipeline.close_pipeline() logger.info("Scrape Complete")
"production-search.csv"
to your desired filename"cool stuff"
to whatever query you'd like to performpages
kwarg in the full_search()
function:full_search("boring stuff", pages=1000)
robots.txt
. You can view Google's robots.txt
here.Another thing to consider is the terms & service (T&C) policies of the websites. Unauthorized scraping or violating terms of service may result in legal action or being blocked from accessing services.According to the T&C policy, Google reserves the right to suspend or terminate your access to the services or delete your Google Account if they reasonably believe that your conduct causes harm or liability to a user, third party, or Google — for example, by hacking, phishing, harassing, spamming, misleading others, or scraping content that doesn’t belong to you.It's crucial to consider not only the legality of scraping data but also how the scraped data will be used. Data scraped from Google or other websites may be subject to copyright laws or regulations governing personal data, depending on the jurisdiction and intended use.Then check out ScrapeOps, the complete toolkit for web scraping.
name
and a link
const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const fs = require('fs'); const API_KEY = 'YOUR-SUPER-SECRET-API-KEY';const outputFile = 'production.csv';const fileExists = fs.existsSync(outputFile); //set up the csv writerconst csvWriter = createCsvWriter({ path: outputFile, header: [ { id: 'name', title: 'Name' }, { id: 'link', title: 'Link' }, { id: 'result_number', title: 'Result Number' }, { id: 'page', title: 'Page Number' }, ], append: fileExists,});//convert regular urls into proxied onesfunction getScrapeOpsURL(url, location) { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}//scrape page, this is our main logicasync function scrapePage( browser, query, pageNumber, location, retries = 3, num = 100) { let tries = 0; while (tries <= retries) { const page = await browser.newPage(); try { const url = `https://www.google.com/search?q=${query}&start=${pageNumber * num}&num=${num}`; const proxyUrl = getScrapeOpsURL(url, location); //set a long timeout, sometimes the server take awhile await page.goto(proxyUrl, { timeout: 300000 }); //find the nested divs const divs = await page.$$( 'div > div > div > div > div > div > div > div' ); const scrapeContent = []; seenLinks = []; let index = 0; for (const div of divs) { const h3s = await div.$('h3'); const links = await div.$('a'); //if we have the required info if (h3s && links) { //pull the name const name = await div.$eval('h3', (h3) => h3.textContent); //pull the link const linkHref = await div.$eval('a', (a) => a.href); //filter out bad links if ( !linkHref.includes('https://proxy.scrapeops.io/') && !seenLinks.includes(linkHref) ) { scrapeContent.push({ name: name, link: linkHref, page: pageNumber, result_number: index, }); seenLinks.push(linkHref); index++; } } } //we failed to get a result, throw an error and attempt a retry if (scrapeContent.length === 0) { throw new Error(`Failed to scrape page ${pageNumber}`); //we have a page result, write it to the csv } else { await csvWriter.writeRecords(scrapeContent); //exit the function return; } } catch (err) { console.log(`ERROR: ${err}`); console.log(`Retries left: ${retries - tries}`); tries++; } finally { await page.close(); } } throw new Error(`Max retries reached: ${tries}`);}//function to launch a browser and scrape each page concurrentlyasync function concurrentScrape( query, totalPages, location, num = 10, retries = 3) { const browser = await puppeteer.launch(); const tasks = []; for (let i = 0; i < totalPages; i++) { tasks.push(scrapePage(browser, query, i, location, retries, num)); } await Promise.all(tasks); await browser.close();}//main functionasync function main() { const queries = ['cool stuff']; const location = 'us'; const totalPages = 3; const batchSize = 20; const retries = 5; console.log('Starting scrape...'); for (const query of queries) { await concurrentScrape( query, totalPages, location, (num = batchSize), retries ); console.log(`Scrape complete, results saved to: ${outputFile}`); }}//run the main functionmain();
location
and totalPages
(or any of the other constants in the main function) variables to change your results as well."YOUR-SUPER-SECRET-API-KEY"
with your ScrapeOps API key.puppeteer
to perform and interpret our results. We'll use csv-writer
and fs
for handling the filesystem and storing our data.These dependencies give us the power to not only extract page data, but also filter and store our data safely and efficiently.geo_location
inside the url, but later on in development, we remove this and let the ScrapeOps Proxy handle our location for us.https://www.google.com/search?q=${query}
cool stuff
, our url would be:https://www.google.com/search?q=cool+stuff
num
query.The num
query tends to get mixed results since most normal users are on default settings with approximately 10 results. If you choose to use the num
query, exercise caution.Google does block suspicious traffic and the num
query does make you look less human.&
and the the query name and value. We'll explore these additional queries in the coming sections.&
. In the olden days, Google gave us actual pages. In modern day, Google gives us all of our results on a single page.At first glance, this would make our scrape much more difficult, however, our results come in batches. This makes it incredibly simple to simulate pages.To control which result we start at, we can use the start
parameter. If we want to start at result 0, our url would be:https://www.google.com/search?q=$cool+stuff&start=0
GET
https://www.google.com/search?q=$cool+stuff&start=10
geo_location
parameter. If we want to look up cool stuff and use a location of Japan, our url would look like this:https://www.google.com/search?q=cool+stuff&geo_location=japan
puppeteer
for web browsing and parsing HTML, csv-writer
to store our data, and fs
for basic file operations.You can start by making a new folder in your file explorer or you can create one from the command line with the command below:mkdir puppeteer-google-search
cd puppeteer-google-search
npm init --y
npm install puppeteer
npm install csv-writer
fs
, because it comes with NodeJS. In our scraper, we simply require
it.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const fs = require('fs'); async function scrapePage(query) { //set up our page and browser const url = `https://www.google.com/search?q=${query}`; const browser = await puppeteer.launch(); const page = await browser.newPage(); //go to the site await page.goto(url); //extract the nested divs const divs = await page.$$('div > div > div > div > div > div > div > div'); const scrapeContent = []; seenLinks = []; let index = 0; for (const div of divs) { const h3s = await div.$('h3'); const links = await div.$('a'); //if we have the required info if (h3s && links) { //pull the name const name = await div.$eval('h3', (h3) => h3.textContent); //pull the link const linkHref = await div.$eval('a', (a) => a.href); //filter out bad links if ( !linkHref.includes('https://proxy.scrapeops.io/') && !seenLinks.includes(linkHref) ) { scrapeContent.push({ name: name, link: linkHref, result_number: index, }); //add the link to our list of seen links seenLinks.push(linkHref); index++; } } } await browser.close(); return scrapeContent;}//main functionasync function main() { const results = await scrapePage('cool stuff'); for (const result of results) { console.log(result); }}//run the main functionmain();
scrapeContent
is an array that holds our results to returnseenLinks
array is strictly for holding links we've already scrapedindex
holds our index on the pageconst divs = await page.$$("div > div > div > div > div > div > div > div");
finds all of our super nested divsfor
each div, we:
div.$()
to check for presence of h3
and a
elementsdiv.$eval()
scrapedContent
seenLinks
so we don't scrape them againhttps://www.google.com/search?q=$cool+stuff&start=0
pageNumber
by 10.Taking pagination into account, our url will now look like this:https://www.google.com/search?q=${query}&start=${pageNumber}
const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const fs = require('fs'); async function scrapePage(query, pageNumber) { //set up our page and browser const url = `https://www.google.com/search?q=${query}&start=${pageNumber}`; const browser = await puppeteer.launch(); const page = await browser.newPage(); //go to the site await page.goto(url); //extract the nested divs const divs = await page.$$('div > div > div > div > div > div > div > div'); const scrapeContent = []; seenLinks = []; let index = 0; for (const div of divs) { const h3s = await div.$('h3'); const links = await div.$('a'); //if we have the required info if (h3s && links) { //pull the name const name = await div.$eval('h3', (h3) => h3.textContent); //pull the link const linkHref = await div.$eval('a', (a) => a.href); //filter out bad links if ( !linkHref.includes('https://proxy.scrapeops.io/') && !seenLinks.includes(linkHref) ) { scrapeContent.push({ name: name, link: linkHref, pageNumber: pageNumber, result_number: index, }); //add the link to our list of seen links seenLinks.push(linkHref); index++; } } } await browser.close(); return scrapeContent;}//main functionasync function main() { const results = await scrapePage('cool stuff', 0); for (const result of results) { console.log(result); }}//run the main functionmain();
scrapePage()
now takes two arguments, query
and pageNumber
pageNumber
multiplied by our typical batch size (10)const results = await scrapePage("cool stuff", 0)
says we want our results to start at zeropageNumber
argument is the foundation to everything we'll add in the coming sections. It's really hard for your scraper to organize its tasks and data if it has no idea which page its on.csv-writer
and fs
. Now it's time to use them. We'll use fs
to check the existence of our outputFile
and csv-writer
to write the results to the actual CSV file.Pay close attention to fileExists
in this section. If our file already exists, we do not want to overwrite it. If it doesn't exist, we need to create a new file. The csvWriter
in the code below does exactly this.Here's our adjusted code:const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const fs = require('fs'); const outputFile = 'add-storage.csv';const fileExists = fs.existsSync(outputFile); //set up the csv writerconst csvWriter = createCsvWriter({ path: outputFile, header: [ { id: 'name', title: 'Name' }, { id: 'link', title: 'Link' }, { id: 'result_number', title: 'Result Number' }, { id: 'page', title: 'Page Number' }, ], append: fileExists,}); async function scrapePage(query, pageNumber) { //set up our page and browser const url = `https://www.google.com/search?q=${query}&start=${pageNumber}`; const browser = await puppeteer.launch(); const page = await browser.newPage(); //go to the site await page.goto(url); //extract the nested divs const divs = await page.$$('div > div > div > div > div > div > div > div'); const scrapeContent = []; seenLinks = []; let index = 0; for (const div of divs) { const h3s = await div.$('h3'); const links = await div.$('a'); //if we have the required info if (h3s && links) { //pull the name const name = await div.$eval('h3', (h3) => h3.textContent); //pull the link const linkHref = await div.$eval('a', (a) => a.href); //filter out bad links if ( !linkHref.includes('https://proxy.scrapeops.io/') && !seenLinks.includes(linkHref) ) { scrapeContent.push({ name: name, link: linkHref, pageNumber: pageNumber, result_number: index, }); //add the link to our list of seen links seenLinks.push(linkHref); index++; } } } await browser.close(); await csvWriter.writeRecords(scrapeContent);}//main functionasync function main() { console.log('Starting scrape...'); await scrapePage('cool stuff', 0); console.log(`Scrape complete, results save to: ${outputFile}`);}//run the main functionmain();
fileExists
is a boolean, true
if our file exists and false
if it doesn'tcsvWriter
opens the file in append mode if the file exists, otherwise it creates the fileoutputFile
as soon as it has been processed. This helps us write everything we possibly can even in the event of a crash.Once we're scraping multiple pages at once, if our scraper succeeds on page 1, but fails on page 2 or page 0, we will still have some results that we can review!async
support makes this completely doable. In this section, let's add a concurrentScrape()
function. The goal of this function is simple, run the scrapePage()
function on multiple pages at the same time.Since we're dealing with Promise
objects, it's a good idea to add some error handling in scrapePage()
. We don't want a Promise
to resolve with bad results.The code below adds concurrency and error handling to ensure our scrape completes properly.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const fs = require('fs'); const outputFile = 'add-concurrency.csv';const fileExists = fs.existsSync(outputFile); //set up the csv writerconst csvWriter = createCsvWriter({ path: outputFile, header: [ { id: 'name', title: 'Name' }, { id: 'link', title: 'Link' }, { id: 'result_number', title: 'Result Number' }, { id: 'page', title: 'Page Number' }, ], append: fileExists,}); async function scrapePage(browser, query, pageNumber, location, retries = 3) { let tries = 0; while (tries <= retries) { const page = await browser.newPage(); try { const url = `https://www.google.com/search?q=${query}&start=${pageNumber * 10}`; //set a long timeout, sometimes the server take awhile await page.goto(url, { timeout: 300000 }); //find the nested divs const divs = await page.$$( 'div > div > div > div > div > div > div > div' ); const scrapeContent = []; seenLinks = []; let index = 0; for (const div of divs) { const h3s = await div.$('h3'); const links = await div.$('a'); //if we have the required info if (h3s && links) { //pull the name const name = await div.$eval('h3', (h3) => h3.textContent); //pull the link const linkHref = await div.$eval('a', (a) => a.href); //filter out bad links if ( !linkHref.includes('https://proxy.scrapeops.io/') && !seenLinks.includes(linkHref) ) { scrapeContent.push({ name: name, link: linkHref, page: pageNumber, result_number: index, }); seenLinks.push(linkHref); index++; } } } //we failed to get a result, throw an error and attempt a retry if (scrapeContent.length === 0) { throw new Error(`Failed to scrape page ${pageNumber}`); //we have a page result, write it to the csv } else { await csvWriter.writeRecords(scrapeContent); //exit the function return; } } catch (err) { console.log(`ERROR: ${err}`); console.log(`Retries left: ${retries - tries}`); tries++; } finally { await page.close(); } } throw new Error(`Max retries reached: ${tries}`);}//scrape multiple pages at onceasync function concurrentScrape(query, totalPages) { const browser = await puppeteer.launch(); const tasks = []; for (let i = 0; i < totalPages; i++) { tasks.push(scrapePage(browser, query, i)); } await Promise.all(tasks); await browser.close();}//main functionasync function main() { console.log('Starting scrape...'); await concurrentScrape('cool stuff', 3); console.log(`Scrape complete, results save to: ${outputFile}`);}//run the main functionmain();
scrapePage()
now takes our browser as a argument and instead opening and closing a browser
, it opens and closes a page
throw
an error and retry the scrapetry
/catch
logic has completed, we use finally
to close the page and free up some memoryconcurrentScrape()
runs scrapePage()
on a bunch of separate pages asynchronously to speed up our resultspage.goto(url)
.In this section, we'll bring our scraper up to production quality and integrate it with the ScrapeOps proxy.getScrapeOpsURL()
.location
parameter to scrapePage()
and concurrentScrape()
as well.location
to the ScrapeOps Proxy because they can then route us through an actual server in that location.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const fs = require('fs'); const API_KEY = 'YOUR-SUPER-SECRET-API-KEY';const outputFile = 'production.csv';const fileExists = fs.existsSync(outputFile); //set up the csv writerconst csvWriter = createCsvWriter({ path: outputFile, header: [ { id: 'name', title: 'Name' }, { id: 'link', title: 'Link' }, { id: 'result_number', title: 'Result Number' }, { id: 'page', title: 'Page Number' }, ], append: fileExists,});//convert regular urls into proxied onesfunction getScrapeOpsURL(url, location) { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}//scrape page, this is our main logicasync function scrapePage( browser, query, pageNumber, location, retries = 3, num = 100) { let tries = 0; while (tries <= retries) { const page = await browser.newPage(); try { const url = `https://www.google.com/search?q=${query}&start=${pageNumber * num}&num=${num}`; const proxyUrl = getScrapeOpsURL(url, location); //set a long timeout, sometimes the server take awhile await page.goto(proxyUrl, { timeout: 300000 }); //find the nested divs const divs = await page.$$( 'div > div > div > div > div > div > div > div' ); const scrapeContent = []; seenLinks = []; let index = 0; for (const div of divs) { const h3s = await div.$('h3'); const links = await div.$('a'); //if we have the required info if (h3s && links) { //pull the name const name = await div.$eval('h3', (h3) => h3.textContent); //pull the link const linkHref = await div.$eval('a', (a) => a.href); //filter out bad links if ( !linkHref.includes('https://proxy.scrapeops.io/') && !seenLinks.includes(linkHref) ) { scrapeContent.push({ name: name, link: linkHref, page: pageNumber, result_number: index, }); seenLinks.push(linkHref); index++; } } } //we failed to get a result, throw an error and attempt a retry if (scrapeContent.length === 0) { throw new Error(`Failed to scrape page ${pageNumber}`); //we have a page result, write it to the csv } else { await csvWriter.writeRecords(scrapeContent); //exit the function return; } } catch (err) { console.log(`ERROR: ${err}`); console.log(`Retries left: ${retries - tries}`); tries++; } finally { await page.close(); } } throw new Error(`Max retries reached: ${tries}`);}//function to launch a browser and scrape each page concurrentlyasync function concurrentScrape( query, totalPages, location, num = 10, retries = 3) { const browser = await puppeteer.launch(); const tasks = []; for (let i = 0; i < totalPages; i++) { tasks.push(scrapePage(browser, query, i, location, retries, num)); } await Promise.all(tasks); await browser.close();}//main functionasync function main() { const queries = ['cool stuff']; const location = 'us'; const totalPages = 3; const batchSize = 20; const retries = 5; console.log('Starting scrape...'); for (const query of queries) { await concurrentScrape( query, totalPages, location, (num = batchSize), retries ); console.log(`Scrape complete, results saved to: ${outputFile}`); }}//run the main functionmain();
getScrapeOpsURL()
location
into concurrentScrape()
, scrapePage()
and getScrapeOpsURL()
page.goto()
a site, we pass the url into getScrapeOpsURL()
and pass the result into page.goto()
num
parameter so we can tell Google how many results we want.num
with caution. Google sometimes bans suspicious traffic and the num
query can make your scraper look abnormal. Even if they choose not to ban you, they hold the right to send you less than 100 results, causing your scraper to miss important data!main()
function. Take a look at our main()
:async function main() { const queries = ['cool stuff']; const location = 'us'; const totalPages = 3; const batchSize = 20; const retries = 5; console.log('Starting scrape...'); for (const query of queries) { await concurrentScrape( query, totalPages, location, (num = batchSize), retries ); console.log(`Scrape complete, results saved to: ${outputFile}`); }}
100
pages of boring stuff
, we'd change query
to 'boring stuff'
and totalPages
to 100
. To change the location, simply change the location
variable from 'us'
to whatever you'd like. I named my production scraper, production.js
. I can run it with the node
command.The image below shows both the command to run it and the console ouput. In fact, feel free to change any of the constants declared in main()
. That's exactly why they're there! These constants make it easy to tweak our results.robots.txt
file if you're not sure about something. You can view Google's robot.txt
here. If you're scraping as a guest (not logged into any site), the information your scraper sees is public and therefore fair game. If a site requires you to login, the information you see afterward is considered private. Don't log in with scrapers!!!Also, always pay attention to the Terms and Conditions of the site you're scraping. You can view Google's Terms here.Google does reserve the right to suspend, block, and/or delete your account if you violate their terms. Always check a site's Terms before you attempt to scrape it.Also, if you turn your Google Scraper into a crawler that scrapes the sites in your results, remember, you are subject to the Terms and Conditions of those sites as well!async
and Promise
to improve speed and concurrency. Go build something!!!If you'd like to learn more about the tech stack used in this article, you can find some links below: