Then check out ScrapeOps, the complete toolkit for web scraping.
'https://www.amazon.com/2021-Apple-10-2-inch-iPad-Wi-Fi/dp/B09G9FPHY6/ref=sr_1_1'
## URL Structure
'https://www.amazon.com/dp/ASIN'
## Example
'https://www.amazon.com/dp/B09G9FPHY6'
'https://www.amazon.com/s?k=iPads&page=1'
k
stands for the search keyword. In our case, k=ipad
. Note: If you want to search for a keyword that contains spaces or special characters then remember you need to encode this value.page
stands for the page number. In our cases, we've requested page=1
.import requestsfrom parsel import Selectorfrom urllib.parse import urljoin product_urls = [] keyword_list = ['ipad'] for keyword in keyword_list: url = f'https://www.amazon.com/s?k={keyword}&page=1' try: response = requests.get(url) if response.status_code == 200: sel = Selector(text=response.text) ## Extract Product Page URLs search_products = sel.css("div.s-result-item[data-component-type=s-search-result]") for product in search_products: relative_url = product.css("h2>a::attr(href)").get() product_url = urljoin('https://www.amazon.com/', relative_url).split("?")[0] product_urls.append(product_url) except Exception as e: print("Error", e)
import requestsfrom parsel import Selector product_asins = [] keyword_list = ['ipad'] for keyword in keyword_list: url = f'https://www.amazon.com/s?k={keyword}&page=1' try: response = requests.get(url) if response.status_code == 200: sel = Selector(text=response.text) ## Extract Product ASINS search_products = sel.css("div.s-result-item[data-component-type=s-search-result]") for product in search_products: relative_url = product.css("h2>a::attr(href)").get() asin = relative_url.split('/')[3] if len(relative_url.split('/')) >= 4 else None product_asins.append(asin) except Exception as e: print("Error", e)
url_list
.import requestsfrom parsel import Selectorfrom urllib.parse import urljoin product_urls = [] keyword_list = ['ipad'] for keyword in keyword_list: url_list = [f'https://www.amazon.com/s?k={keyword}&page=1'] for url in url_list: try: response = requests.get(url) if response.status_code == 200: sel = Selector(text=response.text) ## Extract Product Page URLs search_products = sel.css("div.s-result-item[data-component-type=s-search-result]") for product in search_products: relative_url = product.css("h2>a::attr(href)").get() product_url = urljoin('https://www.amazon.com/', relative_url).split("?")[0] product_urls.append(product_url) ## Get All Pages if "&page=1" in url: available_pages = sel.xpath( '//a[has-class("s-pagination-item")][not(has-class("s-pagination-separator"))]/text()' ).getall() for page in available_pages: search_url_paginated = f'https://www.amazon.com/s?k={keyword}&page={page}' url_list.append(search_url_paginated) except Exception as e: print("Error", e)
import requestsfrom parsel import Selectorfrom urllib.parse import urljoin keyword_list = ['ipad']product_overview_data = [] for keyword in keyword_list: url_list = [f'https://www.amazon.com/s?k={keyword}&page=1'] for url in url_list: try: response = requests.get(url) if response.status_code == 200: sel = Selector(text=response.text) ## Extract Product Page search_products = sel.css("div.s-result-item[data-component-type=s-search-result]") for product in search_products: relative_url = product.css("h2>a::attr(href)").get() #print(relative_url.split('/')) asin = relative_url.split('/')[3] if len(relative_url.split('/')) >= 4 else None product_url = urljoin('https://www.amazon.com/', relative_url).split("?")[0] product_overview_data.append( { "keyword": keyword, "asin": asin, "url": product_url, "ad": True if "/slredirect/" in product_url else False, "title": product.css("h2>a>span::text").get(), "price": product.css(".a-price[data-a-size=xl] .a-offscreen::text").get(), "real_price": product.css(".a-price[data-a-size=b] .a-offscreen::text").get(), "rating": (product.css("span[aria-label~=stars]::attr(aria-label)").re(r"(\d+\.*\d*) out") or [None])[0], "rating_count": product.css("span[aria-label~=stars] + span::attr(aria-label)").get(), "thumbnail_url": product.xpath("//img[has-class('s-image')]/@src").get(), } ) ## Get All Pages if "&page=1" in url: available_pages = sel.xpath( '//a[has-class("s-pagination-item")][not(has-class("s-pagination-separator"))]/text()' ).getall() for page in available_pages: search_url_paginated = f'https://www.amazon.com/s?k={keyword}&page={page}' url_list.append(search_url_paginated) except Exception as e: print("Error", e)
'https://www.amazon.com/2021-Apple-10-2-inch-iPad-Wi-Fi/dp/B09G9FPHY6/ref=sr_1_1'
'https://www.amazon.com/dp/B09G9FPHY6'
import reimport requestsfrom parsel import Selectorfrom urllib.parse import urljoin product_urls = [ 'https://www.amazon.com/2021-Apple-10-2-inch-iPad-Wi-Fi/dp/B09G9FPHY6/ref=sr_1_1',] product_data_list = [] for product_url in product_urls: try: response = requests.get(product_url) if response.status_code == 200: sel = Selector(text=response.text) image_data = json.loads(re.findall(r"colorImages':.*'initial':\s*(\[.+?\])},\n", response.text)[0]) variant_data = re.findall(r'dimensionValuesDisplayData"\s*:\s* ({.+?}),\n', response.text) feature_bullets = [bullet.strip() for bullet in sel.css("#feature-bullets li ::text").getall()] price = sel.css('.a-price span[aria-hidden="true"] ::text').get("") if not price: price = sel.css('.a-price .a-offscreen ::text').get("") product_data_list.append({ "name": sel.css("#productTitle::text").get("").strip(), "price": price, "stars": sel.css("i[data-hook=average-star-rating] ::text").get("").strip(), "rating_count": sel.css("div[data-hook=total-review-count] ::text").get("").strip(), "feature_bullets": feature_bullets, "images": image_data, "variant_data": variant_data, }) except Exception as e: print("Error", e)
{"name": "Apple iPad 9.7inch with WiFi 32GB- Space Gray (2017 Model) (Renewed)", "price": "$137.00", "stars": "4.6 out of 5 stars", "rating_count": "8,532 global ratings", "feature_bullets": [ "Make sure this fits by entering your model number.", "9.7-Inch Retina Display, wide Color and True Tone", "A9 third-generation chip with 64-bit architecture", "M9 motion coprocessor, 1.2MP FaceTime HD Camera", "8MP insight Camera, touch ID, Apple Pay"], "images": [{"hiRes": "https://m.media-amazon.com/images/I/51dBcW+NXPL._AC_SL1000_.jpg", "thumb": "https://m.media-amazon.com/images/I/51pGtRLfaZL._AC_US40_.jpg", "large": "https://m.media-amazon.com/images/I/51pGtRLfaZL._AC_.jpg", "main": {...}, "variant": "MAIN", "lowRes": None, "shoppableScene": None}, {"hiRes": "https://m.media-amazon.com/images/I/51c43obovcL._AC_SL1000_.jpg", "thumb": "https://m.media-amazon.com/images/I/415--n36L8L._AC_US40_.jpg", "large": "https://m.media-amazon.com/images/I/415--n36L8L._AC_.jpg", "main": {...}, "variant": "PT01", "lowRes": None, "shoppableScene": None}, "variant_data": ["{`B074PXZ5GC`:[`9.7 inches`,`Wi-Fi`,`Silver`],`B00TJGN4NG`:[`16GB`,`Wi-Fi`,`White`],`B07F93611L`:[`5 Pack`,`Wi-Fi`,`Space grey`],`B074PWW6NS`:[`Refurbished`,`Wi-Fi`,`Black`],`B0725LCLYQ`:[`9.7`,`Wi-Fi`,`Space Gray`],`B07D3DDJ4L`:[`32GB`,`Wi-Fi`,`Space Gray`],`B07G9N7J3S`:[`32GB`,`Wi-Fi`,`Gold`]}"]}
'https://www.amazon.com/product-reviews/B09G9FPHY6/'
import requestsfrom parsel import Selectorfrom urllib.parse import urljoin reviews = [] product_review_url_list = ['https://www.amazon.com/product-reviews/B09G9FPHY6/'] for product_review_url in product_review_url_list: try: response = requests.get(product_review_url) if response.status_code == 200: sel = Selector(text=response.text) ## Parse Product Reviews review_elements = sel.css("#cm_cr-review_list div.review") for review_element in review_elements: reviews.append({ "text": "".join(review_element.css("span[data-hook=review-body] ::text").getall()).strip(), "title": review_element.css("*[data-hook=review-title]>span::text").get(), "location_and_date": review_element.css("span[data-hook=review-date] ::text").get(), "verified": bool(review_element.css("span[data-hook=avp-badge] ::text").get()), "rating": review_element.css("*[data-hook*=review-star-rating] ::text").re(r"(\d+\.*\d*) out")[0], }) except Exception as e: print("Error", e)
[ { "text": "Ok..little old lady here, whose working life consisted of nothing but years and years of Windows, android phones, etc. Just in last several years jumped hesitantly into Apple (phone, Ipad mini, etc.)LOVE LOVE LOVE my iPad mini but, thought..might be time to think about replacement..so, I saw the great price on this 10 inch tablet and thought Id take a chance. I am much more partial to the mini sized tablets, but thought Id go for it...soooo, even after reading all the bu.......t comments here, thought Id try, if i didnt like it., Id return it. 1. Delivered on time, yayyy! 2. Package well protected, sealed, unblemished...perfect condition (and yeah..no fingerprints on screen) 3. Ipad fired right up...70% charged 4. Ipad immediately began transferring info from iPhone that was sitting nearby. Yayyyyy!! No need for reams of books, booklets, warnings, etc., etc.!! 5. EVERYTHING transferred from iPhone and IPad Mini...and I still had some 15 gig storage left on new 64 gig iPad (just remember ...this is for my entertainment...not for work with diagrams, idiotic work related emails about cleaning up my workspace, or 20 specs for items no one will ever use) 6. Did a test run...everything worked exactly as I required, expected. 7. Ultimate test...watched old Morse/Poirot shows I have in Prime..excellent quality! love love love 8. After 8 full hours...I had to recharge for a bit before I went to bed. (charged fairly fast!)sooooooo...Im keeping this jewel!!!!!Risk is there...evidently, if you believe the nutso crowd and their comments here. Its a GREAT item, its a fabulous deal, Christmas is coming...or if you need to have a worthy backup..,...DO IT!!!!", "title": "EXCELLENT buy!", "location_and_date": "Reviewed in the United States 🇺🇸 on October 10, 2022", "verified": true, "rating": "5.0" }, { "text": "If you’re anything like me you want something to watch shows on in you living room or bed, but you don’t like the TV, and your phone is too small. Well this is the perfect thing for you, the screen is just the right size and very crisp and clear(maybe better then my iPhone X), the responsiveness is excellent, and all of the streaming sites work with this perfectly. On top of that, my AirPods automatically switch between this and my phone, so I don’t have to worry about messing with the settings every time. However, the camera is only OK. And it feels very delicate, so I would pick up a case and get AppleCare+. The battery isn’t the best either, but should be enough to get through the day. Overall I definitely recommend this, especially for the price.", "title": "Perfect", "location_and_date": "Reviewed in the United States 🇺🇸 on October 13, 2022", "verified": true, "rating": "5.0" }, { "text": "My old IPad was acting up, wouldn’t hold a charge etc. This iPad arrived the very next day after I ordered it. What a great surprise. The one corner of the outer box it arrived in was damaged, but the inner box containing the iPad was in perfect condition. It was so simple to transfer everything from my old iPad to this one, just laid the new one on the old (iPad 2019) and it did pretty much everything on its own. I am very pleased with my purchase, I hope it lasts longer than my 2019 model.", "title": "Great purchase", "location_and_date": "Reviewed in the United States 🇺🇸 on October 15, 2022", "verified": true, "rating": "5.0" }, { "text": "Im not much of an apple product person but I do buy them for people I dont want to provide tech support to. (Parents, In-laws, Wife, and Kids)I used to use the fire tablets because they were cheap and I thought that would keep the kids entertained, especially on road trips. This worked for movies and some games but there were always problems with how slow they become with updates, loss of battery life, etc.This ipad was a game changer. I always knew they were the best tablets but I was also a bit in denial as well as just being somewhat anti-apple. With this on sale during prime day 2022 (July) I took a chance and bought one for the kids.This does everything as well or better (usually better) than previous tablets I had purchased because they were cheaper.I also didnt buy a case for it and my kids are brutal with these types of devices. To date, it is still in one piece, operational, and has no cracks in the screen.Sometimes it is worth paying a bit more for the name brand product and in this case Im a believer.", "title": "Kids love it", "location_and_date": "Reviewed in the United States 🇺🇸 on October 2, 2022", "verified": true, "rating": "5.0" }, { "text": "For those who wonder, this is brand new in the box, 2021 9th generation. It is NOT refurbed or an exchange. It is never opened and shrink wrapped by Apple. (See my photos.) The reason it is so much cheaper than the other 2021 iPads is the 64gb storage. But with iCloud so ridiculously cheap for cloud storage, I just cannot see this 64gb as not getting the job done. I myself was curious about this low price buying me a refurb/exchange, but that is simply not the case here. I do, however, recommend you not go with 32gb. I believe even with an iCloud account, you will be sorry you didn’t go 64gb.And the ease of setting this up cannot be understated. I simply sat my iPhone 13 Pro Max next to it and all relevant files and Wi-Fi passwords were transferred over with no input from me. It looked to me that it will do that with Android and most laptops also, though I did not test that out. All photos also came over, and the ones I took after that transfer, I simply Air-Dropped them into this iPad. All in all, this is as simple as it gets for transferring files and photos. Apple has this stuff down to a science, believe me.This screen is incredible. If you are looking at a pre-Retina screen, you will be amazed at this 2021 version. This thing is very fast, the on screen keyboard is fast, accurate and very concise. Dealing with apps is easy, and Apple doesn’t load you down with bloat you’ll never use. It is claimed this has about 12 hours on a charge; what I’ve seen thus far leads me to believe that is accurate.All in all, I am extremely pleased with this purchase. You can’t always say you got what you paid for. But I can definitely say that with this. This is the entry level 2021 9th generation iPad, and it is exactly what I need. Go and get you one…", "title": "Incredible deal on incredible machine", "location_and_date": "Reviewed in the United States 🇺🇸 on September 21, 2022", "verified": true, "rating": "5.0" }, { "text": "I bought this for my husband. He loves it! It is the gift that really does keep on giving. It arrived quickly, well packaged and I didn’t have to leave my house to get it. It was great to use my iPad to purchase this one as a gift and have it arrive safe.y. Thank you, Amazon!", "title": "Best gift 🎁", "location_and_date": "Reviewed in the United States 🇺🇸 on October 15, 2022", "verified": true, "rating": "5.0" }, { "text": "I have had an ipad air since they came out. I used hotel points to get it and its served me well as a book and simple internet use. Recently I noticed that it was no updating and some of my favorite apps were telling me they were using an old version because my IOS was outdated. Without being able to update it I decided to pass my old one on and get a new one. Then I thought Id get a mini 6 but after comparing the prices and the ability I could not justify a double price for it. I ordered this Ipad 9 and it came quicker than expected. Out of the box it performs much better than my old one, screen appears clearer and I like the new IOS it uses. My old one will live on as a small tv for my wife when shes in the kitchen and for that it does very well. I have no complaints about my new one. Its easy to talk yourself into the top of the curve, but sometimes being a bit behind it makes better fiscal sense", "title": "My old Ipad was too old to update, so it was passed down,", "location_and_date": "Reviewed in the United States 🇺🇸 on September 25, 2022", "verified": true, "rating": "5.0" }, { "text": "I have always been an android user. I finally dipped my toe into Apple. There is a learning curve, I do not speak Apple. Thankfully I have grandchildren and they have taught me a lot. Dont snooze on this one, I love it, fast, images clearer, pics, videos, pen, everything about this one is great. I now get the Apple craze.", "title": "Perfect size and performance", "location_and_date": "Reviewed in the United States 🇺🇸 on October 13, 2022", "verified": true, "rating": "5.0" }, { "text": "El iPad es una tableta muy fácil de usar y muy práctica puedes hacer casi todo lo que necesitas en el día a día, oficina, escuela, entretenimiento, productividad, y con 256gb tengo para almacenar mucha información.", "title": "El iPad es la mejor tableta que existe", "location_and_date": "Reviewed in the United States 🇺🇸 on October 14, 2022", "verified": true, "rating": "5.0" }, { "text": "Thought I was gonna get a knock off for the price but came brand new, no problems what so ever. Amazing battery life I charge it every two days and use it constantly at school and work for studying and job demands.", "title": "Excellent product", "location_and_date": "Reviewed in the United States 🇺🇸 on October 14, 2022", "verified": true, "rating": "5.0" }]
import requestsfrom parsel import Selectorfrom urllib.parse import urljoin reviews = [] product_review_url_list = ['https://www.amazon.com/product-reviews/B09G9FPHY6/'] for product_review_url in product_review_url_list: try: response = requests.get(product_review_url) if response.status_code == 200: sel = Selector(text=response.text) ## Get Next Page Url next_page_relative_url = sel.css(".a-pagination .a-last>a::attr(href)").get() if next_page_relative_url is not None: next_page = urljoin('https://www.amazon.com/', next_page_relative_url) product_review_url_list.append(next_page) ## Parse Product Reviews review_elements = sel.css("#cm_cr-review_list div.review") for review_element in review_elements: reviews.append({ "text": "".join(review_element.css("span[data-hook=review-body] ::text").getall()).strip(), "title": review_element.css("*[data-hook=review-title]>span::text").get(), "location_and_date": review_element.css("span[data-hook=review-date] ::text").get(), "verified": bool(review_element.css("span[data-hook=avp-badge] ::text").get()), "rating": review_element.css("*[data-hook*=review-star-rating] ::text").re(r"(\d+\.*\d*) out")[0], }) except Exception as e: print("Error", e)
'User-Agent': 'python-requests/2.26.0',
SCRAPEOPS_API_KEY = 'YOUR_API_KEY' def scrapeops_url(url): payload = {'api_key': SCRAPEOPS_API_KEY, 'url': url, 'country': 'us'} proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload) return proxy_url amazon_url = 'https://www.amazon.com/s?k=iPads&page=1' ## Send URL To ScrapeOps Instead of Amazonresponse = requests.get(scrapeops_url(amazon_url))
import requestsfrom parsel import Selectorfrom urllib.parse import urlencode, urljoin API_KEY = 'YOUR_API_KEY' def scrapeops_url(url): payload = {'api_key': API_KEY, 'url': url, 'country': 'us'} proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload) return proxy_url keyword_list = ['ipad']product_overview_data = [] for keyword in keyword_list: url_list = [f'https://www.amazon.com/s?k={keyword}&page=1'] for url in url_list: try: response = requests.get(scrapeops_url(url)) if response.status_code == 200: sel = Selector(text=response.text) ## Extract Product Data From Search Page search_products = sel.css("div.s-result-item[data-component-type=s-search-result]") for product in search_products: relative_url = product.css("h2>a::attr(href)").get() asin = relative_url.split('/')[3] if len(relative_url.split('/')) >= 4 else None product_url = urljoin('https://www.amazon.com/', relative_url).split("?")[0] product_overview_data.append( { "keyword": keyword, "asin": asin, "url": product_url, "ad": True if "/slredirect/" in product_url else False, "title": product.css("h2>a>span::text").get(), "price": product.css(".a-price[data-a-size=xl] .a-offscreen::text").get(), "real_price": product.css(".a-price[data-a-size=b] .a-offscreen::text").get(), "rating": (product.css("span[aria-label~=stars]::attr(aria-label)").re(r"(\d+\.*\d*) out") or [None])[0], "rating_count": product.css("span[aria-label~=stars] + span::attr(aria-label)").get(), "thumbnail_url": product.xpath("//img[has-class('s-image')]/@src").get(), } ) ## Get All Pages if "&page=1" in url: available_pages = sel.xpath( '//a[has-class("s-pagination-item")][not(has-class("s-pagination-separator"))]/text()' ).getall() for page in available_pages: search_url_paginated = f'https://www.amazon.com/s?k={keyword}&page={page}' url_list.append(search_url_paginated) except Exception as e: print("Error", e) print(product_overview_data)
Then check out ScrapeOps, the complete toolkit for web scraping.
import requestsfrom bs4 import BeautifulSoupimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) API_KEY = "YOUR-SUPER-SECRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: float = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ProductPageData: name: str = "" title: str = "" url: str = "", pricing_unit: str = "", price: float = None, feature_1: str = "", feature_2: str = "", feature_3: str = "", feature_4: str = "", images_1: str = "", images_2: str = "", images_3: str = "", images_4: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location) resp = requests.get(url) if resp.status_code == 200: logger.info("Successfully fetched page") soup = BeautifulSoup(resp.text, "html.parser") bad_divs = soup.find_all("div", class_="AdHolder") for bad_div in bad_divs: bad_div.decompose() divs = soup.find_all("div") last_title = "" for div in divs: parsable = True if div is not None else False h2 = div.find("h2") if h2 and h2.text.strip() and h2.text.strip() and parsable: title = h2.text.strip() a = h2.find("a") product_url = a.get("href") if a else "" ad_status = False if "sspa" in product_url: ad_status = True asin = div.get("data-asin") symbol_element = div.find("span", class_="a-price-symbol") symbol_presence = symbol_element.text if symbol_element else None if symbol_presence is not None: pricing_unit = symbol_presence prices = div.find_all("span", class_="a-offscreen") rating_element = div.find("span", class_="a-icon-alt") rating_present = rating_element.text[0:3] if rating_element else "0.0" rating = float(rating_present) price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0" price = float(price_present) if price_present else 0.0 real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price if symbol_presence and rating_present and price_present: product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True else: raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") print(f"Exited scrape_products for :{product_name}") def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline() def parse_product(product_object, location="us", retries=3): url = product_object["url"] tries = 0 success = False product_url = f"https://www.amazon.com/{url}" url_array = product_url.split("/") title = url_array[-4] product_pipeline = DataPipeline(csv_filename=f"{title}.csv") asin = url_array[-2] while tries <= retries and not success: try: resp = requests.get(get_scrapeops_url(product_url, location=location)) if resp.status_code == 200: soup = BeautifulSoup(resp.text, "html.parser") #find all the images spans = soup.find_all("span") images_to_save = [] for span in spans: image_array = span.find_all("span") for item in image_array: image_span = item.find("span") if image_span is not None: images = image_span.find_all("img") for image in images: image_link = image.get("src") if "https://m.media-amazon.com/images/" in image_link not in images_to_save: images_to_save.append(image_link) features = [] feature_bullets = soup.find_all("li", class_="a-spacing-mini") for feature in feature_bullets: text = feature.find("span").text if text not in features: features.append(text) price_symbol = soup.find("span", class_="a-price-symbol").text whole_number = soup.find("span", class_="a-price-whole").text.replace(",", "").replace(".", "") decimal = soup.find("span", class_="a-price-fraction").text price = float(f"{whole_number}.{decimal}") item_data = ProductPageData( name=asin, title=title, url=product_url, pricing_unit=price_symbol, price=price, feature_1=features[0] if len(features) > 0 else "n/a", feature_2=features[1] if len(features) > 1 else "n/a", feature_3=features[2] if len(features) > 2 else "n/a", feature_4=features[3] if len(features) > 3 else "n/a", images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a", images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a", images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a", images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a" ) product_pipeline.add_data(item_data) product_pipeline.close_pipeline() success = True else: raise Exception(f"Failed response from server, status code: {resp.status_code}") except Exception as e: logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}") tries += 1 return None def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3): with open(csv_filename) as csvfile: reader = list(csv.DictReader(csvfile)) print(len(reader)) with ThreadPoolExecutor(max_workers=threads) as executor: executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader)) if __name__ == "__main__": PRODUCTS = ["phone"] AGGREGATE_PRODUCTS = [] MAX_RETRIES = 2 PAGES = 20 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv" AGGREGATE_PRODUCTS.append(filename) for product in AGGREGATE_PRODUCTS: threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
PAGES
to 1. If you wish to run with 10 threads, change MAX_THREADS
to 10... use caution with this one, each thread opens up another page in the proxy and ScrapeOps proxy does have a concurrency limit.https://www.amazon.com/s?k=phone
https://www.amazon.com/
is our base url.s?
shows that we're performing a search query.k=phone
tells the Amazon server that we want to look at phones.span
element.span
elements nested within li
(list) elements.https://www.amazon.com/s?k={product_name}&page={page_number}
1
of phones, this would be our URL:https://www.amazon.com/s?k=phone&page=1
$
. If we're in the UK, Amazon will give us our prices in the pound, GBP
.To control our location effectively, we'll be using the ScrapeOps Proxy API. The ScrapeOps API will route our traffic through servers in whichever country we ask for.If we want to be in the UK, ScrapeOps will put us in the UK. If we want to be from the US, ScrapeOps will route us through servers in the US.mkdir amazon-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
div
elements on the page.div
is parsable
.div
is parsable, we find the h2
element and strip out the whitespace and newlines.None
value, we move on and extract the following from each listing:
asin
title
url
is_ad
pricing_unit
price
real_price
rating
import requestsfrom bs4 import BeautifulSoupimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) API_KEY = "YOUR-SUPER-SECRET-API-KEY" def search_products(product_name: str, retries=3): tries = 0 success = False while tries < retries and not success: try: url = f"https://www.amazon.com/s?k={product_name}" resp = requests.get(url) if resp.status_code == 200: logger.info("Successfully fetched page") soup = BeautifulSoup(resp.text, "html.parser") divs = soup.find_all("div") last_title = "" for div in divs: parsable = True if div is not None else False h2 = div.find("h2") if h2 and h2.text.strip() and h2.text.strip() != last_title and parsable: title = h2.text.strip() a = h2.find("a") product_url = a.get("href") if a else "" ad_status = False if "sspa" in product_url: ad_status = True asin = div.get("data-asin") symbol_element = div.find("span", class_="a-price-symbol") symbol_presence = symbol_element.text if symbol_element else None if symbol_presence is not None: pricing_unit = symbol_presence prices = div.find_all("span", class_="a-offscreen") rating_element = div.find("span", class_="a-icon-alt") rating_present = rating_element.text[0:3] if rating_element else "0.0" rating = float(rating_present) price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0" price = float(price_present) if price_present else 0.0 real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price if symbol_presence and rating_present and price_present: product = { "name": asin, "title": title, "url": product_url, "is_ad": ad_status, "pricing_unit": pricing_unit, "price": price, "real_price": real_price, "rating": rating } print(product) last_title = title else: continue success = True else: raise Exception(f"Failed to scrape the page, Status Code {resp.status_code}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") print(f"Exited scrape_products for :{product_name}") if __name__ == "__main__": PRODUCTS = ["phone"] MAX_RETRIES = 2 for product in PRODUCTS: search_products(product, retries=MAX_RETRIES)
page_number
added to both our function arguments and our URL.import requestsfrom bs4 import BeautifulSoupimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) API_KEY = "YOUR-SUPER-SECRET-API-KEY" def search_products(product_name: str, page_number=1, retries=3): tries = 0 success = False while tries < retries and not success: try: url = f"https://www.amazon.com/s?k={product_name}&page={page_number}" resp = requests.get(url) if resp.status_code == 200: logger.info("Successfully fetched page") soup = BeautifulSoup(resp.text, "html.parser") divs = soup.find_all("div") last_title = "" for div in divs: parsable = True if div is not None else False h2 = div.find("h2") if h2 and h2.text.strip() and h2.text.strip() != last_title and parsable: title = h2.text.strip() a = h2.find("a") product_url = a.get("href") if a else "" ad_status = False if "sspa" in product_url: ad_status = True asin = div.get("data-asin") symbol_element = div.find("span", class_="a-price-symbol") symbol_presence = symbol_element.text if symbol_element else None if symbol_presence is not None: pricing_unit = symbol_presence prices = div.find_all("span", class_="a-offscreen") rating_element = div.find("span", class_="a-icon-alt") rating_present = rating_element.text[0:3] if rating_element else "0.0" rating = float(rating_present) price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0" price = float(price_present) if price_present else 0.0 real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price if symbol_presence and rating_present and price_present: product = { "name": asin, "title": title, "url": product_url, "is_ad": ad_status, "pricing_unit": pricing_unit, "price": price, "real_price": real_price, "rating": rating } print(product) last_title = title else: continue success = True else: raise Exception(f"Failed to scrape the page, Status Code {resp.status_code}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") print(f"Exited scrape_products for :{product_name}") if __name__ == "__main__": PRODUCTS = ["phone"] MAX_RETRIES = 2 for product in PRODUCTS: search_products(product, retries=MAX_RETRIES)
page_number
and inserts it into our url.ProductData
and DataPipeline
.Here is our updated code example.import requestsfrom bs4 import BeautifulSoupimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) API_KEY = "YOUR-SUPER-SECRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: float = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def search_products(product_name: str, page_number=1, retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: url = f"https://www.amazon.com/s?k={product_name}&page={page_number}" print(url) resp = requests.get(url) if resp.status_code == 200: logger.info("Successfully fetched page") soup = BeautifulSoup(resp.text, "html.parser") bad_divs = soup.find_all("div", class_="AdHolder") for bad_div in bad_divs: bad_div.decompose() divs = soup.find_all("div") last_title = "" for div in divs: parsable = True if div is not None else False h2 = div.find("h2") if h2 and h2.text.strip() and h2.text.strip() and parsable: title = h2.text.strip() a = h2.find("a") product_url = a.get("href") if a else "" ad_status = False if "sspa" in product_url: ad_status = True asin = div.get("data-asin") symbol_element = div.find("span", class_="a-price-symbol") symbol_presence = symbol_element.text if symbol_element else None if symbol_presence is not None: pricing_unit = symbol_presence prices = div.find_all("span", class_="a-offscreen") rating_element = div.find("span", class_="a-icon-alt") rating_present = rating_element.text[0:3] if rating_element else "0.0" print(rating_present) print(title) rating = float(rating_present) price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0" price = float(price_present) if price_present else 0.0 real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price if symbol_presence and rating_present and price_present: product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True else: raise Exception(f"Failed to scrape the page, Status Code {resp.status_code}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") print(f"Exited scrape_products for :{product_name}") if __name__ == "__main__": PRODUCTS = ["phone"] MAX_RETRIES = 2 for product in PRODUCTS: product_pipeline = DataPipeline(csv_filename=f"{product}.csv") search_products(product, retries=MAX_RETRIES, data_pipeline=product_pipeline) product_pipeline.close_pipeline()
ProductData
class to hold individual product data. We add a DataPipeline
as well.Our DataPipeline
does all the heavy lifting of removing duplicates and saving our information to a CSV file.threaded_search()
function.def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline()
ThreadPoolExecutor
to manage our threads. This function will use 5 threads to perform our searches by default, so we'll have a maximum of 5 searches going simultaneously.Here is our updated code. We also added a location
argument to search_products()
. While we don't use the location in this example, we'll be using it in the next section when we add proxy support.import requestsfrom bs4 import BeautifulSoupimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) API_KEY = "YOUR-SUPER-SECRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: float = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: url = f"https://www.amazon.com/s?k={product_name}&page={page_number}" print(url) resp = requests.get(url) if resp.status_code == 200: logger.info("Successfully fetched page") soup = BeautifulSoup(resp.text, "html.parser") bad_divs = soup.find_all("div", class_="AdHolder") for bad_div in bad_divs: bad_div.decompose() divs = soup.find_all("div") last_title = "" for div in divs: parsable = True if div is not None else False h2 = div.find("h2") if h2 and h2.text.strip() and h2.text.strip() and parsable: title = h2.text.strip() a = h2.find("a") product_url = a.get("href") if a else "" ad_status = False if "sspa" in product_url: ad_status = True asin = div.get("data-asin") symbol_element = div.find("span", class_="a-price-symbol") symbol_presence = symbol_element.text if symbol_element else None if symbol_presence is not None: pricing_unit = symbol_presence prices = div.find_all("span", class_="a-offscreen") rating_element = div.find("span", class_="a-icon-alt") rating_present = rating_element.text[0:3] if rating_element else "0.0" print(rating_present) print(title) rating = float(rating_present) price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0" price = float(price_present) if price_present else 0.0 real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price if symbol_presence and rating_present and price_present: product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True else: raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") print(f"Exited scrape_products for :{product_name}") def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline() if __name__ == "__main__": PRODUCTS = ["phone"] MAX_RETRIES = 2 PAGES = 5 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
get_scrapeops_url()
.This function takes in a regular URL and uses basic string formatting to convert it into a URL that uses the ScrapeOps API. Take a look below:def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import requestsfrom bs4 import BeautifulSoupimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) API_KEY = "YOUR-SUPER-SECRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: float = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location) print(url) resp = requests.get(url) if resp.status_code == 200: logger.info("Successfully fetched page") soup = BeautifulSoup(resp.text, "html.parser") bad_divs = soup.find_all("div", class_="AdHolder") for bad_div in bad_divs: bad_div.decompose() divs = soup.find_all("div") last_title = "" for div in divs: parsable = True if div is not None else False h2 = div.find("h2") if h2 and h2.text.strip() and h2.text.strip() and parsable: title = h2.text.strip() a = h2.find("a") product_url = a.get("href") if a else "" ad_status = False if "sspa" in product_url: ad_status = True asin = div.get("data-asin") symbol_element = div.find("span", class_="a-price-symbol") symbol_presence = symbol_element.text if symbol_element else None if symbol_presence is not None: pricing_unit = symbol_presence prices = div.find_all("span", class_="a-offscreen") rating_element = div.find("span", class_="a-icon-alt") rating_present = rating_element.text[0:3] if rating_element else "0.0" print(rating_present) print(title) rating = float(rating_present) price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0" price = float(price_present) if price_present else 0.0 real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price if symbol_presence and rating_present and price_present: product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True else: raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") print(f"Exited scrape_products for :{product_name}") def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline() if __name__ == "__main__": PRODUCTS = ["phone"] MAX_RETRIES = 2 PAGES = 5 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
if __name__ == "__main__": PRODUCTS = ["phone"] MAX_RETRIES = 2 PAGES = 10 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
PRODUCTS
MAX_RETRIES
PAGES
MAX_THREADS
LOCATION
python crawler-proxy.py
def parse_product(product_object, location="us", retries=3): url = product_object["url"] tries = 0 success = False product_url = f"https://www.amazon.com/{url}" url_array = product_url.split("/") title = url_array[-4] asin = url_array[-2] print("asin", asin, title) while tries <= retries and not success: try: resp = requests.get(url) if resp.status_code == 200: print("Content Fetched") soup = BeautifulSoup(resp.text, "html.parser") #find all the images spans = soup.find_all("span") images_to_save = [] for span in spans: image_array = span.find_all("span") for item in image_array: image_span = item.find("span") if image_span is not None: images = image_span.find_all("img") for image in images: image_link = image.get("src") if "https://m.media-amazon.com/images/" in image_link not in images_to_save: images_to_save.append(image_link) features = [] feature_bullets = soup.find_all("li", class_="a-spacing-mini") for feature in feature_bullets: text = feature.find("span").text if text not in features: features.append(text) price_symbol = soup.find("span", class_="a-price-symbol").text whole_number = soup.find("span", class_="a-price-whole").text.replace(",", ".") decimal = soup.find("span", class_="a-price-fraction").text price = float(f"{whole_number}{decimal}") item_data = { "name": asin, "title": title, "url": product_url, "pricing_unit": price_symbol, "price": price, "feature_1": features[0] if len(features) > 0 else "n/a", "feature_2": features[1] if len(features) > 1 else "n/a", "feature_3": features[2] if len(features) > 2 else "n/a", "feature_4": features[3] if len(features) > 3 else "n/a", "images_1": images_to_save[0] if len(images_to_save) > 0 else "n/a", "images_2": images_to_save[1] if len(images_to_save) > 1 else "n/a", "images_3": images_to_save[2] if len(images_to_save) > 2 else "n/a", "images_4": images_to_save[3] if len(images_to_save) > 3 else "n/a" } print("Product Page Data:", item_data) success = True else: raise Exception(f"Failed response from server, status code: {resp.status_code}") except Exception as e: logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}") tries += 1
threaded_item_lookup()
function.At the moment, this function does not use threading. We just have a for
loop as a placeholder. This function reads the CSV file and then passes each object from the file into parse_product()
.import requestsfrom bs4 import BeautifulSoupimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) API_KEY = "YOUR-SUPER-SECRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: float = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location) resp = requests.get(url) if resp.status_code == 200: logger.info("Successfully fetched page") soup = BeautifulSoup(resp.text, "html.parser") bad_divs = soup.find_all("div", class_="AdHolder") for bad_div in bad_divs: bad_div.decompose() divs = soup.find_all("div") last_title = "" for div in divs: parsable = True if div is not None else False h2 = div.find("h2") if h2 and h2.text.strip() and h2.text.strip() and parsable: title = h2.text.strip() a = h2.find("a") product_url = a.get("href") if a else "" ad_status = False if "sspa" in product_url: ad_status = True asin = div.get("data-asin") symbol_element = div.find("span", class_="a-price-symbol") symbol_presence = symbol_element.text if symbol_element else None if symbol_presence is not None: pricing_unit = symbol_presence prices = div.find_all("span", class_="a-offscreen") rating_element = div.find("span", class_="a-icon-alt") rating_present = rating_element.text[0:3] if rating_element else "0.0" print(rating_present) print(title) rating = float(rating_present) price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0" price = float(price_present) if price_present else 0.0 real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price if symbol_presence and rating_present and price_present: product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True else: raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") print(f"Exited scrape_products for :{product_name}") def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline() def parse_product(product_object, location="us", retries=3): url = product_object["url"] tries = 0 success = False product_url = f"https://www.amazon.com/{url}" url_array = product_url.split("/") title = url_array[-4] asin = url_array[-2] print("asin", asin, title) while tries <= retries and not success: try: resp = requests.get(product_url) if resp.status_code == 200: print("Content Fetched") soup = BeautifulSoup(resp.text, "html.parser") #find all the images spans = soup.find_all("span") images_to_save = [] for span in spans: image_array = span.find_all("span") for item in image_array: image_span = item.find("span") if image_span is not None: images = image_span.find_all("img") for image in images: image_link = image.get("src") if "https://m.media-amazon.com/images/" in image_link not in images_to_save: images_to_save.append(image_link) features = [] feature_bullets = soup.find_all("li", class_="a-spacing-mini") for feature in feature_bullets: text = feature.find("span").text if text not in features: features.append(text) price_symbol = soup.find("span", class_="a-price-symbol").text whole_number = soup.find("span", class_="a-price-whole").text.replace(",", ".") decimal = soup.find("span", class_="a-price-fraction").text price = float(f"{whole_number}{decimal}") item_data = { "name": asin, "title": title, "url": product_url, "pricing_unit": price_symbol, "price": price, "feature_1": features[0] if len(features) > 0 else "n/a", "feature_2": features[1] if len(features) > 1 else "n/a", "feature_3": features[2] if len(features) > 2 else "n/a", "feature_4": features[3] if len(features) > 3 else "n/a", "images_1": images_to_save[0] if len(images_to_save) > 0 else "n/a", "images_2": images_to_save[1] if len(images_to_save) > 1 else "n/a", "images_3": images_to_save[2] if len(images_to_save) > 2 else "n/a", "images_4": images_to_save[3] if len(images_to_save) > 3 else "n/a" } print("Product Page Data:", item_data) success = True else: raise Exception(f"Failed response from server, status code: {resp.status_code}") except Exception as e: logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}") tries += 1 def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3): with open(csv_filename) as csvfile: reader = list(csv.DictReader(csvfile)) for product_object in reader: parse_product(product_object, location=location, retries=retries) if __name__ == "__main__": PRODUCTS = ["phone"] AGGREGATE_PRODUCTS = [] MAX_RETRIES = 2 PAGES = 1 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv" AGGREGATE_PRODUCTS.append(filename) for product in AGGREGATE_PRODUCTS: threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS)
@dataclass
, our fields won't be able to hold mutable size. In Python, arrays are mutable by default.DataPipeline
to both filter and store our data. The code below adds a ProductPageData
class and passes it into the pipeline.import requestsfrom bs4 import BeautifulSoupimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) API_KEY = "YOUR-SUPER-SECRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: float = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ProductPageData: name: str = "" title: str = "" url: str = "", pricing_unit: str = "", price: float = None, feature_1: str = "", feature_2: str = "", feature_3: str = "", feature_4: str = "", images_1: str = "", images_2: str = "", images_3: str = "", images_4: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location) resp = requests.get(url) if resp.status_code == 200: logger.info("Successfully fetched page") soup = BeautifulSoup(resp.text, "html.parser") bad_divs = soup.find_all("div", class_="AdHolder") for bad_div in bad_divs: bad_div.decompose() divs = soup.find_all("div") last_title = "" for div in divs: parsable = True if div is not None else False h2 = div.find("h2") if h2 and h2.text.strip() and h2.text.strip() and parsable: title = h2.text.strip() a = h2.find("a") product_url = a.get("href") if a else "" ad_status = False if "sspa" in product_url: ad_status = True asin = div.get("data-asin") symbol_element = div.find("span", class_="a-price-symbol") symbol_presence = symbol_element.text if symbol_element else None if symbol_presence is not None: pricing_unit = symbol_presence prices = div.find_all("span", class_="a-offscreen") rating_element = div.find("span", class_="a-icon-alt") rating_present = rating_element.text[0:3] if rating_element else "0.0" print(rating_present) print(title) rating = float(rating_present) price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0" price = float(price_present) if price_present else 0.0 real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price if symbol_presence and rating_present and price_present: product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True else: raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") print(f"Exited scrape_products for :{product_name}") def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline() def parse_product(product_object, location="us", retries=3): url = product_object["url"] tries = 0 success = False product_url = f"https://www.amazon.com/{url}" url_array = product_url.split("/") title = url_array[-4] product_pipeline = DataPipeline(csv_filename=f"{title}.csv") asin = url_array[-2] while tries <= retries and not success: try: resp = requests.get(product_url) if resp.status_code == 200: soup = BeautifulSoup(resp.text, "html.parser") #find all the images spans = soup.find_all("span") images_to_save = [] for span in spans: image_array = span.find_all("span") for item in image_array: image_span = item.find("span") if image_span is not None: images = image_span.find_all("img") for image in images: image_link = image.get("src") if "https://m.media-amazon.com/images/" in image_link not in images_to_save: images_to_save.append(image_link) features = [] feature_bullets = soup.find_all("li", class_="a-spacing-mini") for feature in feature_bullets: text = feature.find("span").text if text not in features: features.append(text) price_symbol = soup.find("span", class_="a-price-symbol").text whole_number = soup.find("span", class_="a-price-whole").text.replace(",", ".") decimal = soup.find("span", class_="a-price-fraction").text price = float(f"{whole_number}{decimal}") item_data = ProductPageData( name=asin, title=title, url=product_url, pricing_unit=price_symbol, price=price, feature_1=features[0] if len(features) > 0 else "n/a", feature_2=features[1] if len(features) > 1 else "n/a", feature_3=features[2] if len(features) > 2 else "n/a", feature_4=features[3] if len(features) > 3 else "n/a", images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a", images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a", images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a", images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a" ) product_pipeline.add_data(item_data) product_pipeline.close_pipeline() success = True else: raise Exception(f"Failed response from server, status code: {resp.status_code}") except Exception as e: logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}") tries += 1 def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3): with open(csv_filename) as csvfile: reader = list(csv.DictReader(csvfile)) for product_object in reader: parse_product(product_object, location=location, retries=retries) if __name__ == "__main__": PRODUCTS = ["phone"] AGGREGATE_PRODUCTS = [] MAX_RETRIES = 2 PAGES = 1 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv" AGGREGATE_PRODUCTS.append(filename) for product in AGGREGATE_PRODUCTS: threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
parse_product()
, we open up an individual pipeline for each product. This way, we generate an individual report for each one of the products we scraped earlier with the crawler.threaded_item_lookup()
:def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3): with open(csv_filename) as csvfile: reader = list(csv.DictReader(csvfile)) with ThreadPoolExecutor(max_workers=threads) as executor: executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader))
resp = requests.get(get_scrapeops_url(product_url, location=location))
parse_product()
we simply convert our url in to a proxied one.Here is the full code:import requestsfrom bs4 import BeautifulSoupimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) API_KEY = "YOUR-SUPER-SECRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: float = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ProductPageData: name: str = "" title: str = "" url: str = "", pricing_unit: str = "", price: float = None, feature_1: str = "", feature_2: str = "", feature_3: str = "", feature_4: str = "", images_1: str = "", images_2: str = "", images_3: str = "", images_4: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location) resp = requests.get(url) if resp.status_code == 200: logger.info("Successfully fetched page") soup = BeautifulSoup(resp.text, "html.parser") bad_divs = soup.find_all("div", class_="AdHolder") for bad_div in bad_divs: bad_div.decompose() divs = soup.find_all("div") last_title = "" for div in divs: parsable = True if div is not None else False h2 = div.find("h2") if h2 and h2.text.strip() and h2.text.strip() and parsable: title = h2.text.strip() a = h2.find("a") product_url = a.get("href") if a else "" ad_status = False if "sspa" in product_url: ad_status = True asin = div.get("data-asin") symbol_element = div.find("span", class_="a-price-symbol") symbol_presence = symbol_element.text if symbol_element else None if symbol_presence is not None: pricing_unit = symbol_presence prices = div.find_all("span", class_="a-offscreen") rating_element = div.find("span", class_="a-icon-alt") rating_present = rating_element.text[0:3] if rating_element else "0.0" print(rating_present) print(title) rating = float(rating_present) price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0" price = float(price_present) if price_present else 0.0 real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price if symbol_presence and rating_present and price_present: product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True else: raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") print(f"Exited scrape_products for :{product_name}") def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline() def parse_product(product_object, location="us", retries=3): url = product_object["url"] tries = 0 success = False product_url = f"https://www.amazon.com/{url}" url_array = product_url.split("/") title = url_array[-4] product_pipeline = DataPipeline(csv_filename=f"{title}.csv") asin = url_array[-2] while tries <= retries and not success: try: resp = requests.get(get_scrapeops_url(product_url, location=location)) if resp.status_code == 200: soup = BeautifulSoup(resp.text, "html.parser") #find all the images spans = soup.find_all("span") images_to_save = [] for span in spans: image_array = span.find_all("span") for item in image_array: image_span = item.find("span") if image_span is not None: images = image_span.find_all("img") for image in images: image_link = image.get("src") if "https://m.media-amazon.com/images/" in image_link not in images_to_save: images_to_save.append(image_link) features = [] feature_bullets = soup.find_all("li", class_="a-spacing-mini") for feature in feature_bullets: text = feature.find("span").text if text not in features: features.append(text) price_symbol = soup.find("span", class_="a-price-symbol").text whole_number = soup.find("span", class_="a-price-whole").text.replace(",", "").replace(".", "") decimal = soup.find("span", class_="a-price-fraction").text price = float(f"{whole_number}.{decimal}") item_data = ProductPageData( name=asin, title=title, url=product_url, pricing_unit=price_symbol, price=price, feature_1=features[0] if len(features) > 0 else "n/a", feature_2=features[1] if len(features) > 1 else "n/a", feature_3=features[2] if len(features) > 2 else "n/a", feature_4=features[3] if len(features) > 3 else "n/a", images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a", images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a", images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a", images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a" ) product_pipeline.add_data(item_data) product_pipeline.close_pipeline() success = True else: raise Exception(f"Failed response from server, status code: {resp.status_code}") except Exception as e: logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}") tries += 1 return None def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3): with open(csv_filename) as csvfile: reader = list(csv.DictReader(csvfile)) print(len(reader)) with ThreadPoolExecutor(max_workers=threads) as executor: executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader)) if __name__ == "__main__": PRODUCTS = ["phone"] AGGREGATE_PRODUCTS = [] MAX_RETRIES = 2 PAGES = 1 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv" AGGREGATE_PRODUCTS.append(filename) for product in AGGREGATE_PRODUCTS: threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
PAGES
to 20 and time the operation from start to finish. If you'd like to change your results, feel free to change any of the constants from the main.if __name__ == "__main__": PRODUCTS = ["phone"] AGGREGATE_PRODUCTS = [] MAX_RETRIES = 2 PAGES = 20 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv" AGGREGATE_PRODUCTS.append(filename) for product in AGGREGATE_PRODUCTS: threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
Then check out ScrapeOps, the complete toolkit for web scraping.
from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) OPTIONS = ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "YOUR-SUPER-SECRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: float = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ProductPageData: name: str = "" title: str = "" url: str = "", pricing_unit: str = "", price: float = None, feature_1: str = "", feature_2: str = "", feature_3: str = "", feature_4: str = "", images_1: str = "", images_2: str = "", images_3: str = "", images_4: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) url = f"https://www.amazon.com/s?k={product_name}&page={page_number}" proxy_url = get_scrapeops_url(url, location) driver.get(proxy_url) logger.info("Successfully fetched page") bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder") last_title = "" for bad_div in bad_divs: driver.execute_script(""" var element = arguments[0]; element.parentNode.removeChild(element); """, bad_div) divs = driver.find_elements(By.TAG_NAME, "div") copied_divs = divs last_title = "" for div in copied_divs: h2s = div.find_elements(By.TAG_NAME, "h2") parsable = len(h2s) > 0 if parsable: h2 = div.find_element(By.TAG_NAME, "h2") if h2 and parsable: title = h2.text if title == last_title: continue a = h2.find_element(By.TAG_NAME, "a") product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com") ad_status = False if "sspa" in product_url: ad_status = True url_array = product_url.split("/") asin = url_array[5] price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol") has_price = len(price_symbols_array) > 0 if not has_price: continue symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol") pricing_unit = symbol_element.text price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole") price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction") price_str = f"{price_whole.text}.{price_decimal.text}" rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt") rating = rating_element.get_attribute("innerHTML") price = float(price_str) real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price") real_price = 0.0 if len(real_price_array) > 0: real_price_str = real_price_array[0].text.replace(pricing_unit, "") real_price = float(real_price_str) else: real_price = price product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True if not success: raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 finally: driver.quit() if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline() def parse_product(product_object, location="us", retries=3): product_url = product_object["url"] proxy_url = get_scrapeops_url(product_url, location=location) tries = 0 success = False url_array = product_url.split("/") title = url_array[-4] print(title) product_pipeline = DataPipeline(csv_filename=f"{title}.csv") asin = url_array[-2] while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(proxy_url) images_to_save = [] features = [] images = driver.find_elements(By.CSS_SELECTOR, "li img") for image in images: image_link = image.get_attribute("src") if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save: images_to_save.append(image_link) feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini") for feature in feature_bullets: text = feature.find_element(By.TAG_NAME, "span").text if text not in features: features.append(text) price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "") decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text price = float(f"{whole_number}.{decimal}") if len(images_to_save) > 0 and len(features) > 0: item_data = ProductPageData( name=asin, title=title, url=product_url, pricing_unit=price_symbol, price=price, feature_1=features[0] if len(features) > 0 else "n/a", feature_2=features[1] if len(features) > 1 else "n/a", feature_3=features[2] if len(features) > 2 else "n/a", feature_4=features[3] if len(features) > 3 else "n/a", images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a", images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a", images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a", images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a" ) product_pipeline.add_data(item_data) product_pipeline.close_pipeline() success = True except Exception as e: driver.save_screenshot("PARSE_ERROR.png") logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}") tries += 1 finally: driver.quit() return None def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3): with open(csv_filename) as csvfile: reader = list(csv.DictReader(csvfile)) with ThreadPoolExecutor(max_workers=threads) as executor: executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader)) if __name__ == "__main__": PRODUCTS = ["phone"] AGGREGATE_PRODUCTS = [] MAX_RETRIES = 2 PAGES = 1 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv" AGGREGATE_PRODUCTS.append(filename) for product in AGGREGATE_PRODUCTS: threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
PAGES
to 1.MAX_THREADS
to 10... use caution with this one, each thread opens up another page in the proxy and ScrapeOps proxy does have a concurrency limit.https://www.amazon.com/s?k=phone
is the portion you really need to pay attention to.https://www.amazon.com/
is our base URL.s?
shows that we're performing a search query.k=phone
tells the Amazon server that we want to look at phones.span
element.span
elements nested within li
(list) elements.https://www.amazon.com/s?k={product_name}&page={page_number}
1
of phones, this would be our URL:https://www.amazon.com/s?k=phone&page=1
$
. If we're in the UK, Amazon will give us our prices in the pound, GBP
.To control our location effectively, we'll be using the ScrapeOps Proxy API. The ScrapeOps API will route our traffic through servers in whichever country we ask for.If we want to be in the UK, ScrapeOps will put us in the UK. If we want to be from the US, ScrapeOps will route us through servers in the US.The ScrapeOps API is a perfect way to control your location because our requests are actually routed through the location we want.mkdir amazon-scraper
python -m venv venv
source venv/bin/activate
pip install selenium
div
elements on the page.div
is parsable
.div
is parsable, we use its text
as our title
.asin
title
url
is_ad
pricing_unit
price
real_price
rating
from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) OPTIONS = ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "YOUR-SUPER-SECRET-API-KEY" def search_products(product_name: str, retries=3): tries = 0 success = False while tries < retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) url = f"https://www.amazon.com/s?k={product_name}" driver.get(url) logger.info("Successfully fetched page") #remove the bad divs bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder") for bad_div in bad_divs: driver.execute_script(""" var element = arguments[0]; element.parentNode.removeChild(element); """, bad_div) #find the regular divs divs = driver.find_elements(By.TAG_NAME, "div") #copy them to help with stale elements copied_divs = divs last_title = "" for div in copied_divs: h2s = div.find_elements(By.TAG_NAME, "h2") parsable = len(h2s) > 0 if parsable: h2 = div.find_element(By.TAG_NAME, "h2") if h2 and parsable: title = h2.text if title == last_title: continue a = h2.find_element(By.TAG_NAME, "a") product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com") ad_status = False if "sspa" in product_url: ad_status = True url_array = product_url.split("/") asin = url_array[5] price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol") has_price = len(price_symbols_array) > 0 if not has_price: continue symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol") pricing_unit = symbol_element.text price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole") price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction") price_str = f"{price_whole.text}.{price_decimal.text}" rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt") rating = rating_element.get_attribute("innerHTML") price = float(price_str) real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price") real_price = 0.0 if len(real_price_array) > 0: real_price_str = real_price_array[0].text.replace(pricing_unit, "") real_price = float(real_price_str) else: real_price = price product = { "name": asin, "title": title, "url": product_url, "is_ad": ad_status, "pricing_unit": pricing_unit, "price": price, "real_price": real_price, "rating": rating } print(product) last_title = title else: continue success = True if not success: raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 finally: driver.quit() if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") if __name__ == "__main__": PRODUCTS = ["phone"] MAX_RETRIES = 2 for product in PRODUCTS: search_products(product)
page_number
added to both our function arguments and our url.from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) OPTIONS = ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "YOUR-SUPER-SECRET-API-KEY" def search_products(product_name: str, page_number=1, retries=3): tries = 0 success = False while tries < retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) url = f"https://www.amazon.com/s?k={product_name}&page={page_number}" driver.get(url) logger.info("Successfully fetched page") #remove the bad divs bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder") for bad_div in bad_divs: driver.execute_script(""" var element = arguments[0]; element.parentNode.removeChild(element); """, bad_div) #find the regular divs divs = driver.find_elements(By.TAG_NAME, "div") #copy them to help with stale elements copied_divs = divs last_title = "" for div in copied_divs: h2s = div.find_elements(By.TAG_NAME, "h2") parsable = len(h2s) > 0 if parsable: h2 = div.find_element(By.TAG_NAME, "h2") if h2 and parsable: title = h2.text if title == last_title: continue a = h2.find_element(By.TAG_NAME, "a") product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com") ad_status = False if "sspa" in product_url: ad_status = True url_array = product_url.split("/") asin = url_array[5] price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol") has_price = len(price_symbols_array) > 0 if not has_price: continue symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol") pricing_unit = symbol_element.text price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole") price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction") price_str = f"{price_whole.text}.{price_decimal.text}" rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt") rating = rating_element.get_attribute("innerHTML") price = float(price_str) real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price") real_price = 0.0 if len(real_price_array) > 0: real_price_str = real_price_array[0].text.replace(pricing_unit, "") real_price = float(real_price_str) else: real_price = price product = { "name": asin, "title": title, "url": product_url, "is_ad": ad_status, "pricing_unit": pricing_unit, "price": price, "real_price": real_price, "rating": rating } print(product) last_title = title else: continue success = True if not success: raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 finally: driver.quit() if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") if __name__ == "__main__": PRODUCTS = ["phone"] MAX_RETRIES = 2 PAGE = 2 for product in PRODUCTS: search_products(product, page_number=PAGE)
page_number
and inserts it into our url.ProductData
and DataPipeline
.ProductData
simply holds information from the objects we scrape.DataPipeline
does the job of filtering out duplicates and safely storing our data.from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) OPTIONS = ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "YOUR-SUPER-SECRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def search_products(product_name: str, page_number=1, retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) url = f"https://www.amazon.com/s?k={product_name}&page={page_number}" driver.get(url) logger.info("Successfully fetched page") #remove the bad divs bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder") for bad_div in bad_divs: driver.execute_script(""" var element = arguments[0]; element.parentNode.removeChild(element); """, bad_div) #find the regular divs divs = driver.find_elements(By.TAG_NAME, "div") #copy them to help with stale elements copied_divs = divs last_title = "" for div in copied_divs: h2s = div.find_elements(By.TAG_NAME, "h2") parsable = len(h2s) > 0 if parsable: h2 = div.find_element(By.TAG_NAME, "h2") if h2 and parsable: title = h2.text if title == last_title: continue a = h2.find_element(By.TAG_NAME, "a") product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com") ad_status = False if "sspa" in product_url: ad_status = True url_array = product_url.split("/") asin = url_array[5] price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol") has_price = len(price_symbols_array) > 0 if not has_price: continue symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol") pricing_unit = symbol_element.text price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole") price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction") price_str = f"{price_whole.text}.{price_decimal.text}" rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt") rating = rating_element.get_attribute("innerHTML") price = float(price_str) real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price") real_price = 0.0 if len(real_price_array) > 0: real_price_str = real_price_array[0].text.replace(pricing_unit, "") real_price = float(real_price_str) else: real_price = price product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True if not success: raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 finally: driver.quit() if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") if __name__ == "__main__": PRODUCTS = ["phone"] MAX_RETRIES = 2 for product in PRODUCTS: product_pipeline = DataPipeline(csv_filename=f"{product}.csv") search_products(product, retries=MAX_RETRIES, data_pipeline=product_pipeline) product_pipeline.close_pipeline()
ProductData
class to hold individual product data. We add a DataPipeline
as well.Our DataPipeline
does all the heavy lifting of removing duplicates and saving our information to a CSV file.threaded_search()
function.def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline()
ThreadPoolExecutor
to manage our threads. This function will use 5 threads by default when performing searches, so we'll have a maximum of 5 searches going simultaneously. Be mindful when choosing how many threads to use. Not only does your machine have limits, but your ScrapeOps API key will likely also have a concurrency limit. You don't want to run threads past your limit... you'd just be wasting resources!Here is our updated code. We also added a location
argument to search_products()
. While we don't use the location in this example, we'll be using it in the next section when we add proxy support.from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) OPTIONS = ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "YOUR-SUPER-SECRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) url = f"https://www.amazon.com/s?k={product_name}&page={page_number}" driver.get(url) logger.info("Successfully fetched page") #remove the bad divs bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder") for bad_div in bad_divs: driver.execute_script(""" var element = arguments[0]; element.parentNode.removeChild(element); """, bad_div) #find the regular divs divs = driver.find_elements(By.TAG_NAME, "div") #copy them to help with stale elements copied_divs = divs last_title = "" for div in copied_divs: h2s = div.find_elements(By.TAG_NAME, "h2") parsable = len(h2s) > 0 if parsable: h2 = div.find_element(By.TAG_NAME, "h2") if h2 and parsable: title = h2.text if title == last_title: continue a = h2.find_element(By.TAG_NAME, "a") product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com") ad_status = False if "sspa" in product_url: ad_status = True url_array = product_url.split("/") asin = url_array[5] price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol") has_price = len(price_symbols_array) > 0 if not has_price: continue symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol") pricing_unit = symbol_element.text price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole") price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction") price_str = f"{price_whole.text}.{price_decimal.text}" rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt") rating = rating_element.get_attribute("innerHTML") price = float(price_str) real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price") real_price = 0.0 if len(real_price_array) > 0: real_price_str = real_price_array[0].text.replace(pricing_unit, "") real_price = float(real_price_str) else: real_price = price product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True if not success: raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 finally: driver.quit() if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline() if __name__ == "__main__": PRODUCTS = ["phone"] MAX_RETRIES = 2 PAGES = 2 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv"
get_scrapeops_url()
.This function takes in a regular URL and uses basic string formatting to convert it into a URL that uses the ScrapeOps Proxy API. Take a look below:def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) OPTIONS = ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "YOUR-SUPER-SECRET-API-KEY" def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) url = f"https://www.amazon.com/s?k={product_name}&page={page_number}" driver.get(get_scrapeops_url(url)) logger.info("Successfully fetched page") #remove the bad divs bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder") for bad_div in bad_divs: driver.execute_script(""" var element = arguments[0]; element.parentNode.removeChild(element); """, bad_div) #find the regular divs divs = driver.find_elements(By.TAG_NAME, "div") #copy them to help with stale elements copied_divs = divs last_title = "" for div in copied_divs: h2s = div.find_elements(By.TAG_NAME, "h2") parsable = len(h2s) > 0 if parsable: h2 = div.find_element(By.TAG_NAME, "h2") if h2 and parsable: title = h2.text if title == last_title: continue a = h2.find_element(By.TAG_NAME, "a") product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com") ad_status = False if "sspa" in product_url: ad_status = True url_array = product_url.split("/") asin = url_array[5] price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol") has_price = len(price_symbols_array) > 0 if not has_price: continue symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol") pricing_unit = symbol_element.text price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole") price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction") price_str = f"{price_whole.text}.{price_decimal.text}".replace(",", "") rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt") rating = rating_element.get_attribute("innerHTML") price = float(price_str) real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price") real_price = 0.0 if len(real_price_array) > 0: real_price_str = real_price_array[0].text.replace(pricing_unit, "").replace(",", "") real_price = float(real_price_str) else: real_price = price product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True if not success: raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 finally: driver.quit() if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline() if __name__ == "__main__": PRODUCTS = ["phone"] MAX_RETRIES = 2 PAGES = 2 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv"
if __name__ == "__main__": PRODUCTS = ["phone"] MAX_RETRIES = 4 PAGES = 3 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv"
PRODUCTS
MAX_RETRIES
PAGES
MAX_THREADS
LOCATION
python crawler-proxy.py
MAX_THREADS
.Selenium can be vulernable to both thread locking and "stale elements". If you are noticing stale element errors, decrease your MAX_THREADS
. Each thread is running its own browser and this can get resource intensive.def parse_product(product_object, location="us", retries=3): product_url = product_object["url"] tries = 0 success = False url_array = product_url.split("/") title = url_array[-4] asin = url_array[-2] while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(product_url) images_to_save = [] features = [] images = driver.find_elements(By.CSS_SELECTOR, "li img") for image in images: image_link = image.get_attribute("src") if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save: images_to_save.append(image_link) feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini") for feature in feature_bullets: text = feature.find_element(By.TAG_NAME, "span").text if text not in features: features.append(text) price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "") decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text price = float(f"{whole_number}.{decimal}") if len(images_to_save) > 0 and len(features) > 0: item_data = { "name": asin, "title": title, "url": product_url, "pricing_unit": price_symbol, "price": price, "feature_1": features[0] if len(features) > 0 else "n/a", "feature_2": features[1] if len(features) > 1 else "n/a", "feature_3": features[2] if len(features) > 2 else "n/a", "feature_4": features[3] if len(features) > 3 else "n/a", "images_1": images_to_save[0] if len(images_to_save) > 0 else "n/a", "images_2": images_to_save[1] if len(images_to_save) > 1 else "n/a", "images_3": images_to_save[2] if len(images_to_save) > 2 else "n/a", "images_4": images_to_save[3] if len(images_to_save) > 3 else "n/a" } print(item_data) success = True except Exception as e: driver.save_screenshot("PARSE_ERROR.png") logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}") tries += 1 finally: driver.quit() return None
threaded_item_lookup()
function.At the moment, this function does not use threading. We just have a for
loop as a placeholder.This function reads the CSV file and then passes each object from the file into parse_product()
.from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) OPTIONS = ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "YOUR-SUPER-SECRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) url = f"https://www.amazon.com/s?k={product_name}&page={page_number}" driver.get(url) logger.info("Successfully fetched page") bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder") last_title = "" for bad_div in bad_divs: driver.execute_script(""" var element = arguments[0]; element.parentNode.removeChild(element); """, bad_div) divs = driver.find_elements(By.TAG_NAME, "div") copied_divs = divs last_title = "" for div in copied_divs: h2s = div.find_elements(By.TAG_NAME, "h2") parsable = len(h2s) > 0 if parsable: h2 = div.find_element(By.TAG_NAME, "h2") if h2 and parsable: title = h2.text if title == last_title: continue a = h2.find_element(By.TAG_NAME, "a") product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com") ad_status = False if "sspa" in product_url: ad_status = True url_array = product_url.split("/") asin = url_array[5] price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol") has_price = len(price_symbols_array) > 0 if not has_price: continue symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol") pricing_unit = symbol_element.text price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole") price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction") price_str = f"{price_whole.text}.{price_decimal.text}" rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt") rating = rating_element.get_attribute("innerHTML") price = float(price_str) real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price") real_price = 0.0 if len(real_price_array) > 0: real_price_str = real_price_array[0].text.replace(pricing_unit, "") real_price = float(real_price_str) else: real_price = price product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True if not success: raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 finally: driver.quit() if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline() def parse_product(product_object, location="us", retries=3): product_url = product_object["url"] tries = 0 success = False url_array = product_url.split("/") title = url_array[-4] asin = url_array[-2] while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(product_url) images_to_save = [] features = [] images = driver.find_elements(By.CSS_SELECTOR, "li img") for image in images: image_link = image.get_attribute("src") if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save: images_to_save.append(image_link) feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini") for feature in feature_bullets: text = feature.find_element(By.TAG_NAME, "span").text if text not in features: features.append(text) price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "") decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text price = float(f"{whole_number}.{decimal}") if len(images_to_save) > 0 and len(features) > 0: item_data = { "name": asin, "title": title, "url": product_url, "pricing_unit": price_symbol, "price": price, "feature_1": features[0] if len(features) > 0 else "n/a", "feature_2": features[1] if len(features) > 1 else "n/a", "feature_3": features[2] if len(features) > 2 else "n/a", "feature_4": features[3] if len(features) > 3 else "n/a", "images_1": images_to_save[0] if len(images_to_save) > 0 else "n/a", "images_2": images_to_save[1] if len(images_to_save) > 1 else "n/a", "images_3": images_to_save[2] if len(images_to_save) > 2 else "n/a", "images_4": images_to_save[3] if len(images_to_save) > 3 else "n/a" } print(item_data) success = True except Exception as e: driver.save_screenshot("PARSE_ERROR.png") logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}") tries += 1 finally: driver.quit() return None def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3): with open(csv_filename) as csvfile: reader = list(csv.DictReader(csvfile)) for row in reader: parse_product(row) if __name__ == "__main__": PRODUCTS = ["phone"] AGGREGATE_PRODUCTS = [] MAX_RETRIES = 2 PAGES = 1 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv" AGGREGATE_PRODUCTS.append(filename) for product in AGGREGATE_PRODUCTS: threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
@dataclass
, our fields won't be able to hold mutable size. In Python, arrays are mutable by default.DataPipeline
to both filter and store our data.The code below adds a ProductPageData
class and passes it into our new pipeline for safe storage.from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) OPTIONS = ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "YOUR-SUPER-SECRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ProductPageData: name: str = "" title: str = "" url: str = "", pricing_unit: str = "", price: float = None, feature_1: str = "", feature_2: str = "", feature_3: str = "", feature_4: str = "", images_1: str = "", images_2: str = "", images_3: str = "", images_4: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) url = f"https://www.amazon.com/s?k={product_name}&page={page_number}" driver.get(url) logger.info("Successfully fetched page") bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder") last_title = "" for bad_div in bad_divs: driver.execute_script(""" var element = arguments[0]; element.parentNode.removeChild(element); """, bad_div) divs = driver.find_elements(By.TAG_NAME, "div") copied_divs = divs last_title = "" for div in copied_divs: h2s = div.find_elements(By.TAG_NAME, "h2") parsable = len(h2s) > 0 if parsable: h2 = div.find_element(By.TAG_NAME, "h2") if h2 and parsable: title = h2.text if title == last_title: continue a = h2.find_element(By.TAG_NAME, "a") product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com") ad_status = False if "sspa" in product_url: ad_status = True url_array = product_url.split("/") asin = url_array[5] price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol") has_price = len(price_symbols_array) > 0 if not has_price: continue symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol") pricing_unit = symbol_element.text price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole") price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction") price_str = f"{price_whole.text}.{price_decimal.text}" rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt") rating = rating_element.get_attribute("innerHTML") price = float(price_str) real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price") real_price = 0.0 if len(real_price_array) > 0: real_price_str = real_price_array[0].text.replace(pricing_unit, "") real_price = float(real_price_str) else: real_price = price product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True if not success: raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 finally: driver.quit() if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline() def parse_product(product_object, location="us", retries=3): product_url = product_object["url"] tries = 0 success = False url_array = product_url.split("/") title = url_array[-4] print(title) product_pipeline = DataPipeline(csv_filename=f"{title}.csv") asin = url_array[-2] while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(product_url) images_to_save = [] features = [] images = driver.find_elements(By.CSS_SELECTOR, "li img") for image in images: image_link = image.get_attribute("src") if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save: images_to_save.append(image_link) feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini") for feature in feature_bullets: text = feature.find_element(By.TAG_NAME, "span").text if text not in features: features.append(text) price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "") decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text price = float(f"{whole_number}.{decimal}") if len(images_to_save) > 0 and len(features) > 0: item_data = ProductPageData( name=asin, title=title, url=product_url, pricing_unit=price_symbol, price=price, feature_1=features[0] if len(features) > 0 else "n/a", feature_2=features[1] if len(features) > 1 else "n/a", feature_3=features[2] if len(features) > 2 else "n/a", feature_4=features[3] if len(features) > 3 else "n/a", images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a", images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a", images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a", images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a" ) product_pipeline.add_data(item_data) product_pipeline.close_pipeline() success = True except Exception as e: driver.save_screenshot("PARSE_ERROR.png") logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}") tries += 1 finally: driver.quit() return None def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3): with open(csv_filename) as csvfile: reader = list(csv.DictReader(csvfile)) for row in reader: parse_product(row) if __name__ == "__main__": PRODUCTS = ["phone"] AGGREGATE_PRODUCTS = [] MAX_RETRIES = 2 PAGES = 1 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv" AGGREGATE_PRODUCTS.append(filename) for product in AGGREGATE_PRODUCTS: threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
parse_product()
, we open up an individual pipeline for each product.This way, we generate an individual report for each one of the products we scraped earlier with the crawler. If you want to see details about a specific item, you can just open the report for that item!!!threaded_item_lookup()
:def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3): with open(csv_filename) as csvfile: reader = list(csv.DictReader(csvfile)) with ThreadPoolExecutor(max_workers=threads) as executor: executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader))
resp = requests.get(get_scrapeops_url(product_url, location=location))
parse_product()
, we driver.get()
the proxied url instead of the normal one.Here is the full code:from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byimport logging, osimport json, csvfrom dataclasses import dataclass, field, fields, asdictfrom urllib.parse import urlencodefrom concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) OPTIONS = ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "YOUR-SUPER-SCRET-API-KEY" @dataclassclass ProductData: name: str = "" title: str = "" url: str = "", is_ad: bool = False, pricing_unit: str = "", price: float = None, real_price: float = None, rating: float = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ProductPageData: name: str = "" title: str = "" url: str = "", pricing_unit: str = "", price: float = None, feature_1: str = "", feature_2: str = "", feature_3: str = "", feature_4: str = "", images_1: str = "", images_2: str = "", images_3: str = "", images_4: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None): tries = 0 success = False while tries < retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) url = f"https://www.amazon.com/s?k={product_name}&page={page_number}" proxy_url = get_scrapeops_url(url, location) driver.get(proxy_url) logger.info("Successfully fetched page") bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder") last_title = "" for bad_div in bad_divs: driver.execute_script(""" var element = arguments[0]; element.parentNode.removeChild(element); """, bad_div) divs = driver.find_elements(By.TAG_NAME, "div") copied_divs = divs last_title = "" for div in copied_divs: h2s = div.find_elements(By.TAG_NAME, "h2") parsable = len(h2s) > 0 if parsable: h2 = div.find_element(By.TAG_NAME, "h2") if h2 and parsable: title = h2.text if title == last_title: continue a = h2.find_element(By.TAG_NAME, "a") product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com") ad_status = False if "sspa" in product_url: ad_status = True url_array = product_url.split("/") asin = url_array[5] price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol") has_price = len(price_symbols_array) > 0 if not has_price: continue symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol") pricing_unit = symbol_element.text price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole") price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction") price_str = f"{price_whole.text}.{price_decimal.text}" rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt") rating = rating_element.get_attribute("innerHTML") price = float(price_str) real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price") real_price = 0.0 if len(real_price_array) > 0: real_price_str = real_price_array[0].text.replace(pricing_unit, "") real_price = float(real_price_str) else: real_price = price product = ProductData( name=asin, title=title, url=product_url, is_ad=ad_status, pricing_unit=pricing_unit, price=price, real_price=real_price, rating=rating ) data_pipeline.add_data(product) last_title = title else: continue success = True if not success: raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}") except Exception as e: logger.warning(f"Failed to scrape page, {e}") tries += 1 finally: driver.quit() if not success: logger.warning(f"Failed to scrape page, retries exceeded: {retries}") def threaded_search(product_name, pages, max_workers=5, location="us", retries=3): search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv") pages = list(range(1, pages+1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map( search_products, [product_name] * len(pages), pages, [location] * len(pages), [retries] * len(pages), [search_pipeline] * len(pages) ) search_pipeline.close_pipeline() def parse_product(product_object, location="us", retries=3): product_url = product_object["url"] proxy_url = get_scrapeops_url(product_url, location=location) tries = 0 success = False url_array = product_url.split("/") title = url_array[-4] print(title) product_pipeline = DataPipeline(csv_filename=f"{title}.csv") asin = url_array[-2] while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(proxy_url) images_to_save = [] features = [] images = driver.find_elements(By.CSS_SELECTOR, "li img") for image in images: image_link = image.get_attribute("src") if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save: images_to_save.append(image_link) feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini") for feature in feature_bullets: text = feature.find_element(By.TAG_NAME, "span").text if text not in features: features.append(text) price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "") decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text price = float(f"{whole_number}.{decimal}") if len(images_to_save) > 0 and len(features) > 0: item_data = ProductPageData( name=asin, title=title, url=product_url, pricing_unit=price_symbol, price=price, feature_1=features[0] if len(features) > 0 else "n/a", feature_2=features[1] if len(features) > 1 else "n/a", feature_3=features[2] if len(features) > 2 else "n/a", feature_4=features[3] if len(features) > 3 else "n/a", images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a", images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a", images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a", images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a" ) product_pipeline.add_data(item_data) product_pipeline.close_pipeline() success = True except Exception as e: driver.save_screenshot("PARSE_ERROR.png") logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}") tries += 1 finally: driver.quit() return None def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3): with open(csv_filename) as csvfile: reader = list(csv.DictReader(csvfile)) with ThreadPoolExecutor(max_workers=threads) as executor: executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader)) if __name__ == "__main__": PRODUCTS = ["phone"] AGGREGATE_PRODUCTS = [] MAX_RETRIES = 2 PAGES = 3 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv" AGGREGATE_PRODUCTS.append(filename) for product in AGGREGATE_PRODUCTS: threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
PAGES
to 20 and time the operation from start to finish. If you'd like to change your results, feel free to change any of the constants from the main.if __name__ == "__main__": PRODUCTS = ["phone"] AGGREGATE_PRODUCTS = [] MAX_RETRIES = 4 PAGES = 3 MAX_THREADS = 3 LOCATION = "us" for product in PRODUCTS: threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION) filename = f"{product}.csv" AGGREGATE_PRODUCTS.append(filename) for product in AGGREGATE_PRODUCTS: threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
ThreadPoolExecutor
for powerful multithreading. Take all these new skills and go build something cool!Check the links below to know more about the tech stack used in this article.Then check out ScrapeOps, the complete toolkit for web scraping.
const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = 'YOUR-SUPER-SECRET-API-KEY'; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function resultCrawl( browser, productName, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries < retries && !success) { const page = await browser.newPage(); try { const url = `https://www.amazon.com/s?k=${productName}&page=${pageNumber}`; const proxyUrl = getScrapeOpsUrl(url, location); console.log(proxyUrl); await page.goto(proxyUrl); console.log(`Successfully fetched page: ${pageNumber}`); const badDivs = await page.$$('div.AdHolder'); for (const div of badDivs) { await page.evaluate((element) => { element.parentNode.removeChild(element); }, div); } const divs = await page.$$('div > span > div'); console.log(`Div count: ${divs.length}`); let lastTitle = ''; for (const div of divs) { const h2 = await div.$('h2'); if (h2 === null) { continue; } const a = await h2.$('a'); const parsable = h2 !== null && a !== null; if (parsable) { const title = await page.evaluate( (element) => element.textContent, h2 ); if (title === lastTitle) { continue; } console.log(`Title: ${title}`); const productUrl = await page.evaluate((a) => { const url = a.getAttribute('href'); if (url.includes('https')) { return url; } else { return `https://www.amazon.com${url}`; } }, a); console.log(`Product url: ${productUrl}`); const adStatus = productUrl.includes('sspa'); console.log(`Ad Status: ${adStatus}`); const urlArray = productUrl.split('/'); const asin = urlArray[urlArray.length - 2]; console.log(`Asin: ${asin}`); const pricingUnit = await div.$('span.a-price-symbol'); const wholePrice = await div.$('span.a-price-whole'); const decimalPrice = await div.$('span.a-price-fraction'); if ( pricingUnit === null || wholePrice === null || decimalPrice === null ) { console.log('Failed to find price!'); continue; } const priceSymbol = await page.evaluate( (pricingUnit) => pricingUnit.textContent, pricingUnit ); const wholeNumber = await page.evaluate( (wholePrice) => wholePrice.textContent, wholePrice ); const decimalNumber = await page.evaluate( (decimalPrice) => decimalPrice.textContent, decimalPrice ); const formattedWholeNumber = wholeNumber .replace(',', '') .replace('.', ''); const price = Number(`${formattedWholeNumber}.${decimalNumber}`); const realPricePresence = await div.$( 'span.a-price.a-text-price span' ); let realPrice = 0.0; if (realPricePresence !== null) { const realPriceStr = await page.evaluate( (realPricePresence) => realPricePresence.textContent, realPricePresence ); realPrice = Number(realPriceStr.replace(priceSymbol, '')); } else { realPrice = price; } let rating = 'n/a'; ratingPresence = await div.$('span.a-icon-alt'); if (ratingPresence !== null) { rating = await page.evaluate( (ratingPresence) => ratingPresence.textContent, ratingPresence ); } const item = { asin: asin, title: title, url: productUrl, is_ad: adStatus, pricing_unit: priceSymbol, price: price, real_price: realPrice, rating: rating, }; await writeToCsv([item], `${productName}.csv`); console.log('Item:', item); lastTitle = title; } } success = true; } catch (err) { console.log(`ERROR: ${err}, PAGE ${pageNumber}`); tries++; } finally { await page.close(); if (success) { console.log(`Finished scraping page: ${pageNumber}`); } } }} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} async function concurrentCrawl( browser, query, pages, concurrencyLimit, location = 'us', retries = 3) { console.log('Concurrent crawl started'); const pageList = range(1, pages + 1); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => resultCrawl(browser, query, page, location, retries) ); try { await Promise.all(tasks); } catch (e) { console.log(`Failed to process batch: ${e}`); } } console.log('Concurrent crawl finished');} async function parseProduct( browser, productObject, location = 'us', retries = 3) { const productUrl = productObject.url; const proxyUrl = getScrapeOpsUrl(productUrl, location); console.log('Proxy url:', proxyUrl); let tries = 0; let success = false; const urlArray = productUrl.split('/'); const title = urlArray[urlArray.length - 4]; const asin = urlArray[urlArray.length - 2]; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(proxyUrl, { timeout: 60000 }); const imagesToSave = []; const features = []; const images = await page.$$('li img'); for (const image of images) { const imageLink = await page.evaluate( (element) => element.getAttribute('src'), image ); if (imageLink.includes('https://m.media-amazon.com/images/I/')) { imagesToSave.push(imageLink); } } const featureBullets = await page.$$('li.a-spacing-mini'); for (const feature of featureBullets) { const span = await feature.$('span'); const text = await page.evaluate((span) => span.textContent, span); if (!features.includes(text)) { features.push(text); } } const priceSymbolElement = await page.$('span.a-price-symbol'); const priceWholeElement = await page.$('span.a-price-whole'); const priceDecimalElement = await page.$('span.a-price-fraction'); const priceSymbol = await page.evaluate( (element) => element.textContent, priceSymbolElement ); const priceWhole = ( await page.evaluate((element) => element.textContent, priceWholeElement) ) .replace(',', '') .replace('.', ''); const priceDecimal = await page.evaluate( (element) => element.textContent, priceDecimalElement ); const price = Number(`${priceWhole}.${priceDecimal}`); if (imagesToSave.length > 0) { const item = { asin: asin, title: title, url: productUrl, pricing_unit: priceSymbol, price: price, feature_1: features[0], feature_2: features[1], feature_3: features[2], feature_4: features[3], images_1: imagesToSave[0], images_2: imagesToSave[1], images_3: imagesToSave[2], images_4: imagesToSave[3], }; await writeToCsv([item], `${item.title}.csv`); console.log('Wrote to csv'); success = true; } else { await page.screenshot({ path: `ERROR-${title}.png` }); throw new Error('Failed to find item details!'); } } catch (e) { console.log('ERROR:', e); await page.screenshot({ path: 'error.png', fullPage: true }); console.log(`Failed page, Tries left: ${retries - tries}`); tries++; } finally { await page.close(); } } return;} async function concurrentProductScrape( browser, inputFile, concurrencyLimit, location = 'us', retries = 3) { const productObjects = await readCsv(inputFile); while (productObjects.length > 0) { const currentBatch = productObjects.splice(0, concurrencyLimit); const tasks = currentBatch.map((productObject) => parseProduct(browser, productObject, location, retries) ); try { await Promise.all(tasks); } catch (e) { console.log('Failed to process batch'); } }} async function main() { const PRODUCTS = ['phone']; const MAX_RETRIES = 4; const PAGES = 5; const CONCURRENCY_LIMIT = 4; const LOCATION = 'us'; for (const product of PRODUCTS) { const browser = await puppeteer.launch(); const fileName = `./${product}.csv`; await concurrentCrawl( browser, product, PAGES, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await concurrentProductScrape( browser, fileName, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await browser.close(); }} main();
PAGES
to 1. If you wish to run with 10 threads, change CONCURRENCY_LIMIT
to 10... use caution with this one, each thread opens up another page in the proxy and ScrapeOps proxy does have a concurrency limit.https://www.amazon.com/s?k=phone
is the portion you really need to pay attention to.https://www.amazon.com/
is our base url.s?
shows that we're performing a search query.k=phone
tells the Amazon server that we want to look at phones. Their server takes all this information from the URL and sends us back a page of phones.span
element.span
elements nested within li
(list) elements.https://www.amazon.com/s?k={product_name}&page={page_number}
1
of phones, this would be our URL:https://www.amazon.com/s?k=phone&page=1
$
. If we're in the UK, Amazon will give us our prices in the pound, GBP
.To control our location effectively, we'll be using the ScrapeOps Proxy API. The ScrapeOps API will route our traffic through servers in whichever country we ask for.If we want to be in the UK, ScrapeOps will put us in the UK. If we want to be from the US, ScrapeOps will route us through servers in the US.The ScrapeOps API is a perfect way to control your location because our requests are actually routed through the location we want.mkdir amazon-scraper
npm init --y
npm install puppeteer
npm install csv-writer
npm install csv-parse
npm install fs
div
elements on the page.div
is parsable
. If the div
is parsable, we find its h2
and use its textContent
as our title
.asin
title
url
is_ad
pricing_unit
price
real_price
rating
const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = 'YOUR-SUPER-SECRET-API-KEY'; async function resultCrawl(browser, productName, retries = 3) { let tries = 0; let success = false; while (tries < retries && !success) { const page = await browser.newPage(); try { const url = `https://www.amazon.com/s?k=${productName}`; await page.goto(url); console.log(`Successfully fetched page: ${pageNumber}`); const badDivs = await page.$$('div.AdHolder'); for (const div of badDivs) { await page.evaluate((element) => { element.parentNode.removeChild(element); }, div); } const divs = await page.$$('div > span > div'); console.log(`Div count: ${divs.length}`); let lastTitle = ''; for (const div of divs) { const h2 = await div.$('h2'); if (h2 === null) { continue; } const a = await h2.$('a'); const parsable = h2 !== null && a !== null; if (parsable) { const title = await page.evaluate( (element) => element.textContent, h2 ); if (title === lastTitle) { continue; } console.log(`Title: ${title}`); const productUrl = await page.evaluate((a) => { const url = a.getAttribute('href'); if (url.includes('https')) { return url; } else { return `https://www.amazon.com${url}`; } }, a); console.log(`Product url: ${productUrl}`); const adStatus = productUrl.includes('sspa'); console.log(`Ad Status: ${adStatus}`); const urlArray = productUrl.split('/'); const asin = urlArray[urlArray.length - 2]; console.log(`Asin: ${asin}`); const pricingUnit = await div.$('span.a-price-symbol'); const wholePrice = await div.$('span.a-price-whole'); const decimalPrice = await div.$('span.a-price-fraction'); if ( pricingUnit === null || wholePrice === null || decimalPrice === null ) { console.log('Failed to find price!'); continue; } const priceSymbol = await page.evaluate( (pricingUnit) => pricingUnit.textContent, pricingUnit ); const wholeNumber = await page.evaluate( (wholePrice) => wholePrice.textContent, wholePrice ); const decimalNumber = await page.evaluate( (decimalPrice) => decimalPrice.textContent, decimalPrice ); const formattedWholeNumber = wholeNumber .replace(',', '') .replace('.', ''); const price = Number(`${formattedWholeNumber}.${decimalNumber}`); const realPricePresence = await div.$( 'span.a-price.a-text-price span' ); let realPrice = 0.0; if (realPricePresence !== null) { const realPriceStr = await page.evaluate( (realPricePresence) => realPricePresence.textContent, realPricePresence ); realPrice = Number(realPriceStr.replace(priceSymbol, '')); } else { realPrice = price; } let rating = 'n/a'; ratingPresence = await div.$('span.a-icon-alt'); if (ratingPresence !== null) { rating = await page.evaluate( (ratingPresence) => ratingPresence.textContent, ratingPresence ); } const item = { asin: asin, title: title, url: productUrl, is_ad: adStatus, pricing_unit: priceSymbol, price: price, real_price: realPrice, rating: rating, }; console.log('Item:', item); lastTitle = title; } } success = true; } catch (err) { console.log(`ERROR: ${err}, PAGE ${pageNumber}`); tries++; } finally { await page.close(); if (success) { console.log(`Finished scraping page: ${pageNumber}`); } } }} async function main() { const PRODUCTS = ['phone']; const MAX_RETRIES = 4; for (const product of PRODUCTS) { const browser = await puppeteer.launch(); await resultCrawl(browser, product, MAX_RETRIES); await browser.close(); }} main();
pageNumber
added to both our function arguments and our url.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = 'YOUR-SUPER-SECRET-API-KEY'; async function resultCrawl(browser, productName, pageNumber, retries = 3) { let tries = 0; let success = false; while (tries < retries && !success) { const page = await browser.newPage(); try { const url = `https://www.amazon.com/s?k=${productName}&page=${pageNumber}`; await page.goto(url); console.log(`Successfully fetched page: ${pageNumber}`); const badDivs = await page.$$('div.AdHolder'); for (const div of badDivs) { await page.evaluate((element) => { element.parentNode.removeChild(element); }, div); } const divs = await page.$$('div > span > div'); console.log(`Div count: ${divs.length}`); let lastTitle = ''; for (const div of divs) { const h2 = await div.$('h2'); if (h2 === null) { continue; } const a = await h2.$('a'); const parsable = h2 !== null && a !== null; if (parsable) { const title = await page.evaluate( (element) => element.textContent, h2 ); if (title === lastTitle) { continue; } console.log(`Title: ${title}`); const productUrl = await page.evaluate((a) => { const url = a.getAttribute('href'); if (url.includes('https')) { return url; } else { return `https://www.amazon.com${url}`; } }, a); console.log(`Product url: ${productUrl}`); const adStatus = productUrl.includes('sspa'); console.log(`Ad Status: ${adStatus}`); const urlArray = productUrl.split('/'); const asin = urlArray[urlArray.length - 2]; console.log(`Asin: ${asin}`); const pricingUnit = await div.$('span.a-price-symbol'); const wholePrice = await div.$('span.a-price-whole'); const decimalPrice = await div.$('span.a-price-fraction'); if ( pricingUnit === null || wholePrice === null || decimalPrice === null ) { console.log('Failed to find price!'); continue; } const priceSymbol = await page.evaluate( (pricingUnit) => pricingUnit.textContent, pricingUnit ); const wholeNumber = await page.evaluate( (wholePrice) => wholePrice.textContent, wholePrice ); const decimalNumber = await page.evaluate( (decimalPrice) => decimalPrice.textContent, decimalPrice ); const formattedWholeNumber = wholeNumber .replace(',', '') .replace('.', ''); const price = Number(`${formattedWholeNumber}.${decimalNumber}`); const realPricePresence = await div.$( 'span.a-price.a-text-price span' ); let realPrice = 0.0; if (realPricePresence !== null) { const realPriceStr = await page.evaluate( (realPricePresence) => realPricePresence.textContent, realPricePresence ); realPrice = Number(realPriceStr.replace(priceSymbol, '')); } else { realPrice = price; } let rating = 'n/a'; ratingPresence = await div.$('span.a-icon-alt'); if (ratingPresence !== null) { rating = await page.evaluate( (ratingPresence) => ratingPresence.textContent, ratingPresence ); } const item = { asin: asin, title: title, url: productUrl, is_ad: adStatus, pricing_unit: priceSymbol, price: price, real_price: realPrice, rating: rating, }; console.log('Item:', item); lastTitle = title; } } success = true; } catch (err) { console.log(`ERROR: ${err}, PAGE ${pageNumber}`); tries++; } finally { await page.close(); if (success) { console.log(`Finished scraping page: ${pageNumber}`); } } }} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} async function concurrentCrawl( browser, query, pages, location = 'us', retries = 3) { console.log('Concurrent crawl started'); const pageList = range(1, pages + 1); for (const page of pageList) { await resultCrawl(browser, query, location, retries); } console.log('Concurrent crawl finished');} async function main() { const PRODUCTS = ['phone']; const MAX_RETRIES = 4; const PAGES = 1; const LOCATION = 'us'; for (const product of PRODUCTS) { const browser = await puppeteer.launch(); await concurrentCrawl(browser, product, PAGES, LOCATION, MAX_RETRIES); await browser.close(); }} main();
pageNumber
and inserts it into our URL.concurrentCrawl()
function which acts as a placeholder... we'll add real concurrency later on.range()
simply allows us to create a range
element similar to the one commonly used in Python.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = 'YOUR-SUPER-SECRET-API-KEY'; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function resultCrawl( browser, productName, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries < retries && !success) { const page = await browser.newPage(); try { const url = `https://www.amazon.com/s?k=${productName}&page=${pageNumber}`; await page.goto(url); console.log(`Successfully fetched page: ${pageNumber}`); const badDivs = await page.$$('div.AdHolder'); for (const div of badDivs) { await page.evaluate((element) => { element.parentNode.removeChild(element); }, div); } const divs = await page.$$('div > span > div'); console.log(`Div count: ${divs.length}`); let lastTitle = ''; for (const div of divs) { const h2 = await div.$('h2'); if (h2 === null) { continue; } const a = await h2.$('a'); const parsable = h2 !== null && a !== null; if (parsable) { const title = await page.evaluate( (element) => element.textContent, h2 ); if (title === lastTitle) { continue; } console.log(`Title: ${title}`); const productUrl = await page.evaluate((a) => { const url = a.getAttribute('href'); if (url.includes('https')) { return url; } else { return `https://www.amazon.com${url}`; } }, a); console.log(`Product url: ${productUrl}`); const adStatus = productUrl.includes('sspa'); console.log(`Ad Status: ${adStatus}`); const urlArray = productUrl.split('/'); const asin = urlArray[urlArray.length - 2]; console.log(`Asin: ${asin}`); const pricingUnit = await div.$('span.a-price-symbol'); const wholePrice = await div.$('span.a-price-whole'); const decimalPrice = await div.$('span.a-price-fraction'); if ( pricingUnit === null || wholePrice === null || decimalPrice === null ) { console.log('Failed to find price!'); continue; } const priceSymbol = await page.evaluate( (pricingUnit) => pricingUnit.textContent, pricingUnit ); const wholeNumber = await page.evaluate( (wholePrice) => wholePrice.textContent, wholePrice ); const decimalNumber = await page.evaluate( (decimalPrice) => decimalPrice.textContent, decimalPrice ); const formattedWholeNumber = wholeNumber .replace(',', '') .replace('.', ''); const price = Number(`${formattedWholeNumber}.${decimalNumber}`); const realPricePresence = await div.$( 'span.a-price.a-text-price span' ); let realPrice = 0.0; if (realPricePresence !== null) { const realPriceStr = await page.evaluate( (realPricePresence) => realPricePresence.textContent, realPricePresence ); realPrice = Number(realPriceStr.replace(priceSymbol, '')); } else { realPrice = price; } let rating = 'n/a'; ratingPresence = await div.$('span.a-icon-alt'); if (ratingPresence !== null) { rating = await page.evaluate( (ratingPresence) => ratingPresence.textContent, ratingPresence ); } const item = { asin: asin, title: title, url: productUrl, is_ad: adStatus, pricing_unit: priceSymbol, price: price, real_price: realPrice, rating: rating, }; await writeToCsv([item], `${productName}.csv`); console.log('Item:', item); lastTitle = title; } } success = true; } catch (err) { console.log(`ERROR: ${err}, PAGE ${pageNumber}`); tries++; } finally { await page.close(); if (success) { console.log(`Finished scraping page: ${pageNumber}`); } } }} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} async function concurrentCrawl( browser, query, pages, location = 'us', retries = 3) { console.log('Concurrent crawl started'); const pageList = range(1, pages + 1); for (const page of pageList) { await resultCrawl(browser, query, location, retries); } console.log('Concurrent crawl finished');} async function main() { const PRODUCTS = ['phone']; const MAX_RETRIES = 4; const PAGES = 1; const LOCATION = 'us'; for (const product of PRODUCTS) { const browser = await puppeteer.launch(); await concurrentCrawl(browser, product, PAGES, LOCATION, MAX_RETRIES); await browser.close(); }} main();
item
object to hold individual product data. We add a writeToCsv()
function as well. We call the write function on each objecct individually for the safest and best results. If the CSV file doesn't exist, a new one is created. If it does exist, we append the CSV file so we don't overwrite important data.concurrentCrawl()
function.async function concurrentCrawl( browser, query, pages, concurrencyLimit, location = 'us', retries = 3) { console.log('Concurrent crawl started'); const pageList = range(1, pages + 1); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => resultCrawl(browser, query, page, location, retries) ); try { await Promise.all(tasks); } catch (e) { console.log(`Failed to process batch: ${e}`); } } console.log('Concurrent crawl finished');}
concurrencyLimit
to manage our pages. If our concurrencyLimit
is set to 5, we'll scrape 5 pages at once. Be mindful this limit.Not only does your machine have limits, but your ScrapeOps API key will likely also have a concurrency limit. You don't want to run past your limit... you'd just be wasting resources and you'll probably crash Puppeteer!Here is our updated code. We also added a location
argument to resultCrawl()
. While we don't use the location in this example, we'll be using it in the next section when we add proxy support.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = 'YOUR-SUPER-SECRET-API-KEY'; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function resultCrawl( browser, productName, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries < retries && !success) { const page = await browser.newPage(); try { const url = `https://www.amazon.com/s?k=${productName}&page=${pageNumber}`; await page.goto(url); console.log(`Successfully fetched page: ${pageNumber}`); const badDivs = await page.$$('div.AdHolder'); for (const div of badDivs) { await page.evaluate((element) => { element.parentNode.removeChild(element); }, div); } const divs = await page.$$('div > span > div'); console.log(`Div count: ${divs.length}`); let lastTitle = ''; for (const div of divs) { const h2 = await div.$('h2'); if (h2 === null) { continue; } const a = await h2.$('a'); const parsable = h2 !== null && a !== null; if (parsable) { const title = await page.evaluate( (element) => element.textContent, h2 ); if (title === lastTitle) { continue; } console.log(`Title: ${title}`); const productUrl = await page.evaluate((a) => { const url = a.getAttribute('href'); if (url.includes('https')) { return url; } else { return `https://www.amazon.com${url}`; } }, a); console.log(`Product url: ${productUrl}`); const adStatus = productUrl.includes('sspa'); console.log(`Ad Status: ${adStatus}`); const urlArray = productUrl.split('/'); const asin = urlArray[urlArray.length - 2]; console.log(`Asin: ${asin}`); const pricingUnit = await div.$('span.a-price-symbol'); const wholePrice = await div.$('span.a-price-whole'); const decimalPrice = await div.$('span.a-price-fraction'); if ( pricingUnit === null || wholePrice === null || decimalPrice === null ) { console.log('Failed to find price!'); continue; } const priceSymbol = await page.evaluate( (pricingUnit) => pricingUnit.textContent, pricingUnit ); const wholeNumber = await page.evaluate( (wholePrice) => wholePrice.textContent, wholePrice ); const decimalNumber = await page.evaluate( (decimalPrice) => decimalPrice.textContent, decimalPrice ); const formattedWholeNumber = wholeNumber .replace(',', '') .replace('.', ''); const price = Number(`${formattedWholeNumber}.${decimalNumber}`); const realPricePresence = await div.$( 'span.a-price.a-text-price span' ); let realPrice = 0.0; if (realPricePresence !== null) { const realPriceStr = await page.evaluate( (realPricePresence) => realPricePresence.textContent, realPricePresence ); realPrice = Number(realPriceStr.replace(priceSymbol, '')); } else { realPrice = price; } let rating = 'n/a'; ratingPresence = await div.$('span.a-icon-alt'); if (ratingPresence !== null) { rating = await page.evaluate( (ratingPresence) => ratingPresence.textContent, ratingPresence ); } const item = { asin: asin, title: title, url: productUrl, is_ad: adStatus, pricing_unit: priceSymbol, price: price, real_price: realPrice, rating: rating, }; await writeToCsv([item], `${productName}.csv`); console.log('Item:', item); lastTitle = title; } } success = true; } catch (err) { console.log(`ERROR: ${err}, PAGE ${pageNumber}`); tries++; } finally { await page.close(); if (success) { console.log(`Finished scraping page: ${pageNumber}`); } } }} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} async function concurrentCrawl( browser, query, pages, concurrencyLimit, location = 'us', retries = 3) { console.log('Concurrent crawl started'); const pageList = range(1, pages + 1); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => resultCrawl(browser, query, page, location, retries) ); try { await Promise.all(tasks); } catch (e) { console.log(`Failed to process batch: ${e}`); } } console.log('Concurrent crawl finished');} async function main() { const PRODUCTS = ['phone']; const MAX_RETRIES = 4; const PAGES = 1; const CONCURRENCY_LIMIT = 4; const LOCATION = 'us'; for (const product of PRODUCTS) { const browser = await puppeteer.launch(); await concurrentCrawl( browser, product, PAGES, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await browser.close(); }} main();
getScrapeOpsUrl()
.This function takes in a regular url and uses basic string formatting to convert it into a url that uses the ScrapeOps API. Take a look below:function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}
const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = 'YOUR-SUPER-SECRET-API-KEY'; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function resultCrawl( browser, productName, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries < retries && !success) { const page = await browser.newPage(); try { const url = `https://www.amazon.com/s?k=${productName}&page=${pageNumber}`; const proxyUrl = getScrapeOpsUrl(url, location); console.log(proxyUrl); await page.goto(proxyUrl); console.log(`Successfully fetched page: ${pageNumber}`); const badDivs = await page.$$('div.AdHolder'); for (const div of badDivs) { await page.evaluate((element) => { element.parentNode.removeChild(element); }, div); } const divs = await page.$$('div > span > div'); console.log(`Div count: ${divs.length}`); let lastTitle = ''; for (const div of divs) { const h2 = await div.$('h2'); if (h2 === null) { continue; } const a = await h2.$('a'); const parsable = h2 !== null && a !== null; if (parsable) { const title = await page.evaluate( (element) => element.textContent, h2 ); if (title === lastTitle) { continue; } console.log(`Title: ${title}`); const productUrl = await page.evaluate((a) => { const url = a.getAttribute('href'); if (url.includes('https')) { return url; } else { return `https://www.amazon.com${url}`; } }, a); console.log(`Product url: ${productUrl}`); const adStatus = productUrl.includes('sspa'); console.log(`Ad Status: ${adStatus}`); const urlArray = productUrl.split('/'); const asin = urlArray[urlArray.length - 2]; console.log(`Asin: ${asin}`); const pricingUnit = await div.$('span.a-price-symbol'); const wholePrice = await div.$('span.a-price-whole'); const decimalPrice = await div.$('span.a-price-fraction'); if ( pricingUnit === null || wholePrice === null || decimalPrice === null ) { console.log('Failed to find price!'); continue; } const priceSymbol = await page.evaluate( (pricingUnit) => pricingUnit.textContent, pricingUnit ); const wholeNumber = await page.evaluate( (wholePrice) => wholePrice.textContent, wholePrice ); const decimalNumber = await page.evaluate( (decimalPrice) => decimalPrice.textContent, decimalPrice ); const formattedWholeNumber = wholeNumber .replace(',', '') .replace('.', ''); const price = Number(`${formattedWholeNumber}.${decimalNumber}`); const realPricePresence = await div.$( 'span.a-price.a-text-price span' ); let realPrice = 0.0; if (realPricePresence !== null) { const realPriceStr = await page.evaluate( (realPricePresence) => realPricePresence.textContent, realPricePresence ); realPrice = Number(realPriceStr.replace(priceSymbol, '')); } else { realPrice = price; } let rating = 'n/a'; ratingPresence = await div.$('span.a-icon-alt'); if (ratingPresence !== null) { rating = await page.evaluate( (ratingPresence) => ratingPresence.textContent, ratingPresence ); } const item = { asin: asin, title: title, url: productUrl, is_ad: adStatus, pricing_unit: priceSymbol, price: price, real_price: realPrice, rating: rating, }; await writeToCsv([item], `${productName}.csv`); console.log('Item:', item); lastTitle = title; } } success = true; } catch (err) { console.log(`ERROR: ${err}, PAGE ${pageNumber}`); tries++; } finally { await page.close(); if (success) { console.log(`Finished scraping page: ${pageNumber}`); } } }} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} async function concurrentCrawl( browser, query, pages, concurrencyLimit, location = 'us', retries = 3) { console.log('Concurrent crawl started'); const pageList = range(1, pages + 1); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => resultCrawl(browser, query, page, location, retries) ); try { await Promise.all(tasks); } catch (e) { console.log(`Failed to process batch: ${e}`); } } console.log('Concurrent crawl finished');} async function main() { const PRODUCTS = ['phone']; const MAX_RETRIES = 4; const PAGES = 1; const CONCURRENCY_LIMIT = 4; const LOCATION = 'us'; for (const product of PRODUCTS) { const browser = await puppeteer.launch(); await concurrentCrawl( browser, product, PAGES, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await browser.close(); }} main();
async function main() { const PRODUCTS = ['phone']; const MAX_RETRIES = 4; const PAGES = 5; const CONCURRENCY_LIMIT = 4; const LOCATION = 'us'; for (const product of PRODUCTS) { const browser = await puppeteer.launch(); await concurrentCrawl( browser, product, PAGES, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await browser.close(); }}
PRODUCTS
MAX_RETRIES
PAGES
CONCURRENCY_LIMIT
LOCATION
node crawler-proxy
CONCURRENCY_LIMIT
. If you begin running into errors, decrease your concurrency limit. The higher your limit is, the more resources you're using both on your machine and on your API key.async function parseProduct( browser, productObject, location = 'us', retries = 3) { const productUrl = productObject.url; let tries = 0; let success = false; const urlArray = productUrl.split('/'); const title = urlArray[urlArray.length - 4]; const asin = urlArray[urlArray.length - 2]; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(productUrl, { timeout: 60000 }); const imagesToSave = []; const features = []; const images = await page.$$('li img'); for (const image of images) { const imageLink = await page.evaluate( (element) => element.getAttribute('src'), image ); if (imageLink.includes('https://m.media-amazon.com/images/I/')) { imagesToSave.push(imageLink); } } const featureBullets = await page.$$('li.a-spacing-mini'); for (const feature of featureBullets) { const span = await feature.$('span'); const text = await page.evaluate((span) => span.textContent, span); if (!features.includes(text)) { features.push(text); } } const priceSymbolElement = await page.$('span.a-price-symbol'); const priceWholeElement = await page.$('span.a-price-whole'); const priceDecimalElement = await page.$('span.a-price-fraction'); const priceSymbol = await page.evaluate( (element) => element.textContent, priceSymbolElement ); const priceWhole = ( await page.evaluate((element) => element.textContent, priceWholeElement) ) .replace(',', '') .replace('.', ''); const priceDecimal = await page.evaluate( (element) => element.textContent, priceDecimalElement ); const price = Number(`${priceWhole}.${priceDecimal}`); if (imagesToSave.length > 0) { const item = { asin: asin, title: title, url: productUrl, pricing_unit: priceSymbol, price: price, feature_1: features[0], feature_2: features[1], feature_3: features[2], feature_4: features[3], images_1: imagesToSave[0], images_2: imagesToSave[1], images_3: imagesToSave[2], images_4: imagesToSave[3], }; console.log(`Item: ${JSON.stringify(item)}`); success = true; } else { await page.screenshot({ path: `ERROR-${title}.png` }); throw new Error('Failed to find item details!'); } } catch (e) { console.log('ERROR:', e); await page.screenshot({ path: 'error.png', fullPage: true }); console.log(`Failed page, Tries left: ${retries - tries}`); tries++; } finally { await page.close(); } } return;}
concurrentProductScrape()
function. At the moment, this function does not use concurrency. We just have a for
loop as a placeholder.This function reads the CSV file and then passes each object from the file into parseProduct()
.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = 'YOUR-SUPER-SECRET-API-KEY'; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function resultCrawl( browser, productName, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries < retries && !success) { const page = await browser.newPage(); try { const url = `https://www.amazon.com/s?k=${productName}&page=${pageNumber}`; const proxyUrl = getScrapeOpsUrl(url, location); console.log(proxyUrl); await page.goto(proxyUrl); console.log(`Successfully fetched page: ${pageNumber}`); const badDivs = await page.$$('div.AdHolder'); for (const div of badDivs) { await page.evaluate((element) => { element.parentNode.removeChild(element); }, div); } const divs = await page.$$('div > span > div'); console.log(`Div count: ${divs.length}`); let lastTitle = ''; for (const div of divs) { const h2 = await div.$('h2'); if (h2 === null) { continue; } const a = await h2.$('a'); const parsable = h2 !== null && a !== null; if (parsable) { const title = await page.evaluate( (element) => element.textContent, h2 ); if (title === lastTitle) { continue; } console.log(`Title: ${title}`); const productUrl = await page.evaluate((a) => { const url = a.getAttribute('href'); if (url.includes('https')) { return url; } else { return `https://www.amazon.com${url}`; } }, a); console.log(`Product url: ${productUrl}`); const adStatus = productUrl.includes('sspa'); console.log(`Ad Status: ${adStatus}`); const urlArray = productUrl.split('/'); const asin = urlArray[urlArray.length - 2]; console.log(`Asin: ${asin}`); const pricingUnit = await div.$('span.a-price-symbol'); const wholePrice = await div.$('span.a-price-whole'); const decimalPrice = await div.$('span.a-price-fraction'); if ( pricingUnit === null || wholePrice === null || decimalPrice === null ) { console.log('Failed to find price!'); continue; } const priceSymbol = await page.evaluate( (pricingUnit) => pricingUnit.textContent, pricingUnit ); const wholeNumber = await page.evaluate( (wholePrice) => wholePrice.textContent, wholePrice ); const decimalNumber = await page.evaluate( (decimalPrice) => decimalPrice.textContent, decimalPrice ); const formattedWholeNumber = wholeNumber .replace(',', '') .replace('.', ''); const price = Number(`${formattedWholeNumber}.${decimalNumber}`); const realPricePresence = await div.$( 'span.a-price.a-text-price span' ); let realPrice = 0.0; if (realPricePresence !== null) { const realPriceStr = await page.evaluate( (realPricePresence) => realPricePresence.textContent, realPricePresence ); realPrice = Number(realPriceStr.replace(priceSymbol, '')); } else { realPrice = price; } let rating = 'n/a'; ratingPresence = await div.$('span.a-icon-alt'); if (ratingPresence !== null) { rating = await page.evaluate( (ratingPresence) => ratingPresence.textContent, ratingPresence ); } const item = { asin: asin, title: title, url: productUrl, is_ad: adStatus, pricing_unit: priceSymbol, price: price, real_price: realPrice, rating: rating, }; await writeToCsv([item], `${productName}.csv`); console.log('Item:', item); lastTitle = title; } } success = true; } catch (err) { console.log(`ERROR: ${err}, PAGE ${pageNumber}`); tries++; } finally { await page.close(); if (success) { console.log(`Finished scraping page: ${pageNumber}`); } } }} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} async function concurrentCrawl( browser, query, pages, concurrencyLimit, location = 'us', retries = 3) { console.log('Concurrent crawl started'); const pageList = range(1, pages + 1); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => resultCrawl(browser, query, page, location, retries) ); try { await Promise.all(tasks); } catch (e) { console.log(`Failed to process batch: ${e}`); } } console.log('Concurrent crawl finished');} async function parseProduct( browser, productObject, location = 'us', retries = 3) { const productUrl = productObject.url; let tries = 0; let success = false; const urlArray = productUrl.split('/'); const title = urlArray[urlArray.length - 4]; const asin = urlArray[urlArray.length - 2]; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(productUrl, { timeout: 60000 }); const imagesToSave = []; const features = []; const images = await page.$$('li img'); for (const image of images) { const imageLink = await page.evaluate( (element) => element.getAttribute('src'), image ); if (imageLink.includes('https://m.media-amazon.com/images/I/')) { imagesToSave.push(imageLink); } } const featureBullets = await page.$$('li.a-spacing-mini'); for (const feature of featureBullets) { const span = await feature.$('span'); const text = await page.evaluate((span) => span.textContent, span); if (!features.includes(text)) { features.push(text); } } const priceSymbolElement = await page.$('span.a-price-symbol'); const priceWholeElement = await page.$('span.a-price-whole'); const priceDecimalElement = await page.$('span.a-price-fraction'); const priceSymbol = await page.evaluate( (element) => element.textContent, priceSymbolElement ); const priceWhole = ( await page.evaluate((element) => element.textContent, priceWholeElement) ) .replace(',', '') .replace('.', ''); const priceDecimal = await page.evaluate( (element) => element.textContent, priceDecimalElement ); const price = Number(`${priceWhole}.${priceDecimal}`); if (imagesToSave.length > 0) { const item = { asin: asin, title: title, url: productUrl, pricing_unit: priceSymbol, price: price, feature_1: features[0], feature_2: features[1], feature_3: features[2], feature_4: features[3], images_1: imagesToSave[0], images_2: imagesToSave[1], images_3: imagesToSave[2], images_4: imagesToSave[3], }; console.log(`Item: ${JSON.stringify(item)}`); success = true; } else { await page.screenshot({ path: `ERROR-${title}.png` }); throw new Error('Failed to find item details!'); } } catch (e) { console.log('ERROR:', e); await page.screenshot({ path: 'error.png', fullPage: true }); console.log(`Failed page, Tries left: ${retries - tries}`); tries++; } finally { await page.close(); } } return;} async function concurrentProductScrape( browser, inputFile, concurrencyLimit, location = 'us', retries = 3) { const productObjects = await readCsv(inputFile); for (const productObject of productObjects) { await parseProduct(browser, productObject, location, retries); }} async function main() { const PRODUCTS = ['phone']; const MAX_RETRIES = 4; const PAGES = 1; const CONCURRENCY_LIMIT = 4; const LOCATION = 'us'; for (const product of PRODUCTS) { const browser = await puppeteer.launch(); const fileName = `./${product}.csv`; await concurrentCrawl( browser, product, PAGES, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await concurrentProductScrape( browser, fileName, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await browser.close(); }} main();
writeToCsv()
function to store our data.We can safely pass each object into this write function just like we did earlier.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = 'YOUR-SUPER-SECRET-API-KEY'; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function resultCrawl( browser, productName, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries < retries && !success) { const page = await browser.newPage(); try { const url = `https://www.amazon.com/s?k=${productName}&page=${pageNumber}`; const proxyUrl = getScrapeOpsUrl(url, location); console.log(proxyUrl); await page.goto(proxyUrl); console.log(`Successfully fetched page: ${pageNumber}`); const badDivs = await page.$$('div.AdHolder'); for (const div of badDivs) { await page.evaluate((element) => { element.parentNode.removeChild(element); }, div); } const divs = await page.$$('div > span > div'); console.log(`Div count: ${divs.length}`); let lastTitle = ''; for (const div of divs) { const h2 = await div.$('h2'); if (h2 === null) { continue; } const a = await h2.$('a'); const parsable = h2 !== null && a !== null; if (parsable) { const title = await page.evaluate( (element) => element.textContent, h2 ); if (title === lastTitle) { continue; } console.log(`Title: ${title}`); const productUrl = await page.evaluate((a) => { const url = a.getAttribute('href'); if (url.includes('https')) { return url; } else { return `https://www.amazon.com${url}`; } }, a); console.log(`Product url: ${productUrl}`); const adStatus = productUrl.includes('sspa'); console.log(`Ad Status: ${adStatus}`); const urlArray = productUrl.split('/'); const asin = urlArray[urlArray.length - 2]; console.log(`Asin: ${asin}`); const pricingUnit = await div.$('span.a-price-symbol'); const wholePrice = await div.$('span.a-price-whole'); const decimalPrice = await div.$('span.a-price-fraction'); if ( pricingUnit === null || wholePrice === null || decimalPrice === null ) { console.log('Failed to find price!'); continue; } const priceSymbol = await page.evaluate( (pricingUnit) => pricingUnit.textContent, pricingUnit ); const wholeNumber = await page.evaluate( (wholePrice) => wholePrice.textContent, wholePrice ); const decimalNumber = await page.evaluate( (decimalPrice) => decimalPrice.textContent, decimalPrice ); const formattedWholeNumber = wholeNumber .replace(',', '') .replace('.', ''); const price = Number(`${formattedWholeNumber}.${decimalNumber}`); const realPricePresence = await div.$( 'span.a-price.a-text-price span' ); let realPrice = 0.0; if (realPricePresence !== null) { const realPriceStr = await page.evaluate( (realPricePresence) => realPricePresence.textContent, realPricePresence ); realPrice = Number(realPriceStr.replace(priceSymbol, '')); } else { realPrice = price; } let rating = 'n/a'; ratingPresence = await div.$('span.a-icon-alt'); if (ratingPresence !== null) { rating = await page.evaluate( (ratingPresence) => ratingPresence.textContent, ratingPresence ); } const item = { asin: asin, title: title, url: productUrl, is_ad: adStatus, pricing_unit: priceSymbol, price: price, real_price: realPrice, rating: rating, }; await writeToCsv([item], `${productName}.csv`); console.log('Item:', item); lastTitle = title; } } success = true; } catch (err) { console.log(`ERROR: ${err}, PAGE ${pageNumber}`); tries++; } finally { await page.close(); if (success) { console.log(`Finished scraping page: ${pageNumber}`); } } }} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} async function concurrentCrawl( browser, query, pages, concurrencyLimit, location = 'us', retries = 3) { console.log('Concurrent crawl started'); const pageList = range(1, pages + 1); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => resultCrawl(browser, query, page, location, retries) ); try { await Promise.all(tasks); } catch (e) { console.log(`Failed to process batch: ${e}`); } } console.log('Concurrent crawl finished');} async function parseProduct( browser, productObject, location = 'us', retries = 3) { const productUrl = productObject.url; let tries = 0; let success = false; const urlArray = productUrl.split('/'); const title = urlArray[urlArray.length - 4]; const asin = urlArray[urlArray.length - 2]; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(productUrl, { timeout: 60000 }); const imagesToSave = []; const features = []; const images = await page.$$('li img'); for (const image of images) { const imageLink = await page.evaluate( (element) => element.getAttribute('src'), image ); if (imageLink.includes('https://m.media-amazon.com/images/I/')) { imagesToSave.push(imageLink); } } const featureBullets = await page.$$('li.a-spacing-mini'); for (const feature of featureBullets) { const span = await feature.$('span'); const text = await page.evaluate((span) => span.textContent, span); if (!features.includes(text)) { features.push(text); } } const priceSymbolElement = await page.$('span.a-price-symbol'); const priceWholeElement = await page.$('span.a-price-whole'); const priceDecimalElement = await page.$('span.a-price-fraction'); const priceSymbol = await page.evaluate( (element) => element.textContent, priceSymbolElement ); const priceWhole = ( await page.evaluate((element) => element.textContent, priceWholeElement) ) .replace(',', '') .replace('.', ''); const priceDecimal = await page.evaluate( (element) => element.textContent, priceDecimalElement ); const price = Number(`${priceWhole}.${priceDecimal}`); if (imagesToSave.length > 0) { const item = { asin: asin, title: title, url: productUrl, pricing_unit: priceSymbol, price: price, feature_1: features[0], feature_2: features[1], feature_3: features[2], feature_4: features[3], images_1: imagesToSave[0], images_2: imagesToSave[1], images_3: imagesToSave[2], images_4: imagesToSave[3], }; await writeToCsv([item], `${item.title}.csv`); console.log('Wrote to csv'); success = true; } else { await page.screenshot({ path: `ERROR-${title}.png` }); throw new Error('Failed to find item details!'); } } catch (e) { console.log('ERROR:', e); await page.screenshot({ path: 'error.png', fullPage: true }); console.log(`Failed page, Tries left: ${retries - tries}`); tries++; } finally { await page.close(); } } return;} async function concurrentProductScrape( browser, inputFile, concurrencyLimit, location = 'us', retries = 3) { const productObjects = await readCsv(inputFile); for (const productObject of productObjects) { await parseProduct(browser, productObject, location, retries); }} async function main() { const PRODUCTS = ['phone']; const MAX_RETRIES = 4; const PAGES = 5; const CONCURRENCY_LIMIT = 4; const LOCATION = 'us'; for (const product of PRODUCTS) { const browser = await puppeteer.launch(); const fileName = `./${product}.csv`; await concurrentCrawl( browser, product, PAGES, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await concurrentProductScrape( browser, fileName, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await browser.close(); }} main();
parseProduct()
, we open up an individual CSV file for each product. This way, we generate an individual report for each one of the products we scraped earlier with the crawler. If you want to see details about a specific item, you can just open the report for that item!!!concurrentProductScrape()
. In this function, we read our CSV file into an array.We then continually shrink the array with splice()
. Each time a batch is scraped, the array shrinks and more memory is freed up. This allows for the scraper to actually speed up as time goes on.async function concurrentProductScrape( browser, inputFile, concurrencyLimit, location = 'us', retries = 3) { const productObjects = await readCsv(inputFile); while (productObjects.length > 0) { const currentBatch = productObjects.splice(0, concurrencyLimit); const tasks = currentBatch.map((productObject) => parseProduct(browser, productObject, location, retries) ); try { await Promise.all(tasks); } catch (e) { console.log('Failed to process batch'); } }}
const proxyUrl = getScrapeOpsUrl(url, location);
parseProduct()
, we page.goto()
the proxied url instead of the normal one.Here is the full code:const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = 'YOUR-SUPER-SECRET-API-KEY'; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function resultCrawl( browser, productName, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries < retries && !success) { const page = await browser.newPage(); try { const url = `https://www.amazon.com/s?k=${productName}&page=${pageNumber}`; const proxyUrl = getScrapeOpsUrl(url, location); console.log(proxyUrl); await page.goto(proxyUrl); console.log(`Successfully fetched page: ${pageNumber}`); const badDivs = await page.$$('div.AdHolder'); for (const div of badDivs) { await page.evaluate((element) => { element.parentNode.removeChild(element); }, div); } const divs = await page.$$('div > span > div'); console.log(`Div count: ${divs.length}`); let lastTitle = ''; for (const div of divs) { const h2 = await div.$('h2'); if (h2 === null) { continue; } const a = await h2.$('a'); const parsable = h2 !== null && a !== null; if (parsable) { const title = await page.evaluate( (element) => element.textContent, h2 ); if (title === lastTitle) { continue; } console.log(`Title: ${title}`); const productUrl = await page.evaluate((a) => { const url = a.getAttribute('href'); if (url.includes('https')) { return url; } else { return `https://www.amazon.com${url}`; } }, a); console.log(`Product url: ${productUrl}`); const adStatus = productUrl.includes('sspa'); console.log(`Ad Status: ${adStatus}`); const urlArray = productUrl.split('/'); const asin = urlArray[urlArray.length - 2]; console.log(`Asin: ${asin}`); const pricingUnit = await div.$('span.a-price-symbol'); const wholePrice = await div.$('span.a-price-whole'); const decimalPrice = await div.$('span.a-price-fraction'); if ( pricingUnit === null || wholePrice === null || decimalPrice === null ) { console.log('Failed to find price!'); continue; } const priceSymbol = await page.evaluate( (pricingUnit) => pricingUnit.textContent, pricingUnit ); const wholeNumber = await page.evaluate( (wholePrice) => wholePrice.textContent, wholePrice ); const decimalNumber = await page.evaluate( (decimalPrice) => decimalPrice.textContent, decimalPrice ); const formattedWholeNumber = wholeNumber .replace(',', '') .replace('.', ''); const price = Number(`${formattedWholeNumber}.${decimalNumber}`); const realPricePresence = await div.$( 'span.a-price.a-text-price span' ); let realPrice = 0.0; if (realPricePresence !== null) { const realPriceStr = await page.evaluate( (realPricePresence) => realPricePresence.textContent, realPricePresence ); realPrice = Number(realPriceStr.replace(priceSymbol, '')); } else { realPrice = price; } let rating = 'n/a'; ratingPresence = await div.$('span.a-icon-alt'); if (ratingPresence !== null) { rating = await page.evaluate( (ratingPresence) => ratingPresence.textContent, ratingPresence ); } const item = { asin: asin, title: title, url: productUrl, is_ad: adStatus, pricing_unit: priceSymbol, price: price, real_price: realPrice, rating: rating, }; await writeToCsv([item], `${productName}.csv`); console.log('Item:', item); lastTitle = title; } } success = true; } catch (err) { console.log(`ERROR: ${err}, PAGE ${pageNumber}`); tries++; } finally { await page.close(); if (success) { console.log(`Finished scraping page: ${pageNumber}`); } } }} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} async function concurrentCrawl( browser, query, pages, concurrencyLimit, location = 'us', retries = 3) { console.log('Concurrent crawl started'); const pageList = range(1, pages + 1); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => resultCrawl(browser, query, page, location, retries) ); try { await Promise.all(tasks); } catch (e) { console.log(`Failed to process batch: ${e}`); } } console.log('Concurrent crawl finished');} async function parseProduct( browser, productObject, location = 'us', retries = 3) { const productUrl = productObject.url; const proxyUrl = getScrapeOpsUrl(productUrl, location); console.log('Proxy url:', proxyUrl); let tries = 0; let success = false; const urlArray = productUrl.split('/'); const title = urlArray[urlArray.length - 4]; const asin = urlArray[urlArray.length - 2]; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(proxyUrl, { timeout: 60000 }); const imagesToSave = []; const features = []; const images = await page.$$('li img'); for (const image of images) { const imageLink = await page.evaluate( (element) => element.getAttribute('src'), image ); if (imageLink.includes('https://m.media-amazon.com/images/I/')) { imagesToSave.push(imageLink); } } const featureBullets = await page.$$('li.a-spacing-mini'); for (const feature of featureBullets) { const span = await feature.$('span'); const text = await page.evaluate((span) => span.textContent, span); if (!features.includes(text)) { features.push(text); } } const priceSymbolElement = await page.$('span.a-price-symbol'); const priceWholeElement = await page.$('span.a-price-whole'); const priceDecimalElement = await page.$('span.a-price-fraction'); const priceSymbol = await page.evaluate( (element) => element.textContent, priceSymbolElement ); const priceWhole = ( await page.evaluate((element) => element.textContent, priceWholeElement) ) .replace(',', '') .replace('.', ''); const priceDecimal = await page.evaluate( (element) => element.textContent, priceDecimalElement ); const price = Number(`${priceWhole}.${priceDecimal}`); if (imagesToSave.length > 0) { const item = { asin: asin, title: title, url: productUrl, pricing_unit: priceSymbol, price: price, feature_1: features[0], feature_2: features[1], feature_3: features[2], feature_4: features[3], images_1: imagesToSave[0], images_2: imagesToSave[1], images_3: imagesToSave[2], images_4: imagesToSave[3], }; await writeToCsv([item], `${item.title}.csv`); console.log('Wrote to csv'); success = true; } else { await page.screenshot({ path: `ERROR-${title}.png` }); throw new Error('Failed to find item details!'); } } catch (e) { console.log('ERROR:', e); await page.screenshot({ path: 'error.png', fullPage: true }); console.log(`Failed page, Tries left: ${retries - tries}`); tries++; } finally { await page.close(); } } return;} async function concurrentProductScrape( browser, inputFile, concurrencyLimit, location = 'us', retries = 3) { const productObjects = await readCsv(inputFile); while (productObjects.length > 0) { const currentBatch = productObjects.splice(0, concurrencyLimit); const tasks = currentBatch.map((productObject) => parseProduct(browser, productObject, location, retries) ); try { await Promise.all(tasks); } catch (e) { console.log('Failed to process batch'); } }} async function main() { const PRODUCTS = ['phone']; const MAX_RETRIES = 4; const PAGES = 5; const CONCURRENCY_LIMIT = 4; const LOCATION = 'us'; for (const product of PRODUCTS) { const browser = await puppeteer.launch(); const fileName = `./${product}.csv`; await concurrentCrawl( browser, product, PAGES, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await concurrentProductScrape( browser, fileName, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await browser.close(); }} main();
PAGES
to 5 and time the operation from start to finish. If you'd like to change your results, feel free to change any of the constants from the main.async function main() { const PRODUCTS = ['phone']; const MAX_RETRIES = 4; const PAGES = 5; const CONCURRENCY_LIMIT = 4; const LOCATION = 'us'; for (const product of PRODUCTS) { const browser = await puppeteer.launch(); const fileName = `./${product}.csv`; await concurrentCrawl( browser, product, PAGES, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await concurrentProductScrape( browser, fileName, CONCURRENCY_LIMIT, LOCATION, MAX_RETRIES ); await browser.close(); }}
async
and Promise
for powerful concurrency. Take all these new skills and go build something cool!Check the links below to know more about the tech stack used in this article.