#!/usr/bin/python import requests, re, json, os, yaml, time from db import * from selenium import webdriver from selenium.webdriver.firefox.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.firefox.options import Options from time import sleep import random def load_cookies_from_file_selenium(file_path): '''Load cookies from a file and return a list usable in a browser session''' with open(file_path, 'r') as file: cookies_data = json.load(file) cookies_list = [] for cookie_data in cookies_data: name_raw = cookie_data.get("Name raw", "") content_raw = cookie_data.get("Content raw", "") cookie_value = f"{content_raw}" cookie_path = cookie_data.get("Path raw", "") cookie_dict = {"name": name_raw, "value": content_raw, "path": cookie_path} if len(cookie_value) > 0: cookies_list.append(cookie_dict) return cookies_list def check_item(settings_item): ''' return a dict with items data extracted from aliexpress. extracted data: skuId, quantity, discount_percentage, price, currency, choice_delivery, image parameter settings_item is a list (string(itemid), attributes) itemid is in aliexpress link to item page. attributes is a list of string. Each string is a choice value (for example which length, or which colour) if multiple items are on the same page, only one by category, order doesn't matter. ''' punish_regex = re.compile(r'(pid: \'punish-page\')|(Deny from x5)|(FAIL_SYS_ILLEGAL_ACCESS)') number_regex = re.compile(r'[0-9]+') price_regex = re.compile(r'[0-9]*,[0-9]{0,2}') ##### to use with selenium firefox container firefox_options = webdriver.FirefoxOptions() firefox_options.add_argument("--width=2560") firefox_options.add_argument("--height=1440") driver = webdriver.Remote( command_executor='http://127.0.0.1:4444', options=firefox_options ) ##### for testing with local geckodriver # options=Options() # service = webdriver.FirefoxService(executable_path='/bin/geckodriver') # profile = webdriver.FirefoxProfile() # options.profile = profile # options.add_argument("--width=2560") # options.add_argument("--height=1440") # driver = webdriver.Firefox(service=service, options=options) # load login cookies cookies_file_path = os.path.dirname(os.path.realpath(__file__))+'/cookies.json' cookies = load_cookies_from_file_selenium(cookies_file_path) driver.get("https://aliexpress.com") sleep(random.uniform(3, 6)) driver.delete_all_cookies() for cookie in cookies: driver.add_cookie(cookie) sleep(random.uniform(3, 6)) # accept cookies try: WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CLASS_NAME, "global-gdpr-btn-wrap")) ) accept_button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.CLASS_NAME, "btn-accept")) ) sleep(random.uniform(2, 4)) accept_button.click() print("Cookies accepted") except Exception as e: print(f"An error occurred: {e}") print(settings_item) item = settings_item[0] filter_attributes = settings_item[1] url = 'https://aliexpress.com/item/'+str(item)+'.html' driver.get(url) # check if punish page hit punish = bool(re.search(punish_regex, driver.page_source)) if punish: print("punish page") driver.quit() raise ValueError("punish") else: # refresh page to have the price in a single span driver.refresh() # click on each attribute for attribute in filter_attributes: if bool(re.search(" ", attribute)): possible_attribute = [attribute, re.sub(" ", " ", attribute)] else: possible_attribute = [attribute] for attr in possible_attribute: try: # try to find an image with an alt text corresponding to the attribute img_element = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, f"//img[@alt='{attr}']")) ) location = img_element.location size = img_element.size # click on where the image is (images appear to always be 60x60px) center_x = (location['x'] + size['width'] / 2) + random.uniform(-10,10) center_y = (location['y'] + size['height'] / 2) + random.uniform(-10,10) sleep(random.uniform(2, 4)) actions = ActionChains(driver) actions.move_by_offset(center_x, center_y).click().perform() print(f"clicked on {attr}") break except Exception as e: try: # try to find a div with corresponding text instead div_element = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, f"//div[@title='{attr}']//span[text()='{attr}']")) ) sleep(random.uniform(2, 4)) actions = ActionChains(driver) actions.move_to_element(div_element).click().perform() print(f"clicked on {attr}") break except Exception as e: print(f"Div or image {attr} not found: {e}") ### scrapping data # price and currency try: div_element = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, "//div[contains(@class, 'product-price-current')]")) ) span_element = div_element.find_element(By.XPATH, ".//span[contains(@class, 'product-price-value')]") price_text = span_element.text price = float(re.sub(",", ".", re.findall(price_regex, price_text)[0])) currency = re.sub(" ", "", re.sub(price_regex, "", price_text)) print(f"The extracted price is: {price}, the extracted currency is: {currency}") except Exception as e: print(f"An error occurred: {e}") print("item not found") driver.quit() return {} # discount percentage discount = 0 try: div_element = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, "//div[contains(@class, 'price--original--wEueRiZ')]")) ) span_element = div_element.find_element(By.XPATH, ".//span[contains(@class, 'price--discount--Y9uG2LK')]") discount_text = span_element.text discount = re.findall(number_regex, discount_text)[0] print(f"The extracted discount is: {discount}") except Exception as e: print(f"An error occurred: {e}") # quantity quantity = '0' try: div_element = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, "//div[contains(@class, 'quantity--info--jnoo_pD')]")) ) span_element = div_element.find_element(By.XPATH, ".//span") quantity_text = span_element.text quantity = re.findall(number_regex, quantity_text)[0] print(f"The extracted quantity is: {quantity}") except Exception as e: print(f"An error occurred: {e}") # image link try: div_element = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, "//div[contains(@class, 'magnifier--wrap--cF4cafd')]")) ) img_element = div_element.find_element(By.XPATH, ".//img[contains(@class, 'magnifier--image--EYYoSlr')]") img_src = img_element.get_attribute("src") image_link = re.sub(r'(jpg|png)_[0-9]+x[0-9]+', r'\1_800x800', img_src) # get bigger image instead of preview print(f"The extracted image URL is: {image_link}") except Exception as e: try: # if the image in the magnifier wrap doesn't exist it might be a video instead (image is video preview) div_element = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, "//div[contains(@class, 'video--wrap--EhkqzuR')]")) ) img_element = div_element.find_element(By.XPATH, ".//video[contains(@class, 'video--video--lsI7y97')]") img_src = img_element.get_attribute("poster") image_link = re.sub(r'(jpg|png)_[0-9]+x[0-9]+', r'\1_800x800', img_src) # get bigger image instead of preview print(f"The extracted image URL is: {image_link}") except Exception as e: print(f"An error occurred: {e}") # choice choice = False try: div_element = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, "//div[contains(@class, 'choice-mind--box--fJKH05M')]")) ) img_element = div_element.find_element(By.XPATH, ".//img") if img_element.get_attribute("src") == "https://ae01.alicdn.com/kf/S2eef439ead604da69a385490d86118c97/98x42.png": choice = True except Exception as e: print(f"An error occurred: {e}") print(f"Choice delivery: {choice}") # return the data scraped extract = dict() key = (item,tuple(filter_attributes)) # skuId, quantity, discount_percentage, price, currency, choice_delivery, image extract[key] = {"skuid": '0', "quantity": quantity, "discount_percentage": discount, "price": price, "currency": currency, "choice_delivery": choice, "image": image_link} print(extract) driver.quit() return extract def get_attributes(attributes_raw): '''return a list of attributes from attributes raw string''' # id_regex = re.compile(r'([0-9]*)=') attr_regex = re.compile(r'#([0-9a-zA-Z \.\-]*)') # item_id = re.search(id_regex, attributes_raw).group(1) attributes = re.findall(attr_regex, attributes_raw) # remove multpiple spaces spaces_regex = re.compile(r' {2,}') attributes = [re.sub(spaces_regex, " ", attr) for attr in attributes] return attributes def fill_db(items_dict): '''add new history entries in database with data extracted in item_dict''' for key,value in items_dict.items(): add_history_entry(key[0], value["skuid"], value["choice_delivery"], list(key[1]), value["image"], value["price"], value["currency"], value["quantity"], value["discount_percentage"]) def update_items(): '''add new history entries for items in database''' in_db = get_item_keys() retry = [] for item in in_db: time.sleep(2) try: new_entry = check_item(item) fill_db(new_entry) except ValueError: retry.append(item) return retry def retry_update(retry_list): '''update entries from the retry list only''' retry = [] for item in retry_list: time.sleep(2) try: new_entry = check_item(item) fill_db(new_entry) except ValueError: retry.append(item) return retry