X webautomatisering en -schrapen met Selenium
Voor kleine Selenium projecten voorkomen we detectie door automatisering niet, maar proberen we menselijk gedrag na te bootsen.
Als je gegevens van het web wilt schrapen, moet je weten wat je doet. Je wilt een doelserver niet overladen met verzoeken. Als je dit doet vanaf een enkele locatie, een IP address, kun je een (tijdelijke) ban krijgen.
Als je groot wilt scrapen, overweeg dan het gebruik van een speciale service zoals ZenRows, ScrapFly, WebScrapingAPI, ScrapingAnt, enz. Ze dist verdelen je verzoeken over een heleboel systemen, elk met een unieke IP address, wat betekent dat de doelserver denkt dat hij wordt benaderd door veel verschillende (menselijke) cliënten.
Maar hey, soms willen we gewoon een klein beetje gegevens scrapen, laten we zeggen dat we elke dag de nieuwe berichten van een paar mensen willen scrapen op een social media platform zoals X .
In deze post gebruiken we Selenium. Er zijn oplossingen zoals Puppeteer of Playwright, maar Selenium bestaat al sinds 2004, het is de meest gebruikte tool voor geautomatiseerd testen en heeft een grote community.
Ik zal beginnen met wat overwegingen en eindigen met wat werkende code. De code automatiseert het inloggen in X, en vervolgens het zoeken en schrappen van een aantal berichten. Ik weet dat alles morgen kan veranderen, maar vandaag werkt dit.
Zoals altijd draai ik Ubuntu 22.04.
Schrapen van X met Selenium
Onlangs heeft X besloten dat je moet inloggen om berichten te bekijken. Ik vind dit niet leuk. Meestal lees ik alleen berichten van een paar mensen, met behulp van de browserextensie 'Breakthrough Twitter Login Wall'. Dit werkt niet meer.
En soms scraap ik wat gegevens, maar geen honderdduizenden tweets. Ik denk dat veel ontwikkelaars dit doen. Er zijn bijna 30 miljoen software engineers in de wereld. Laten we nu eens aannemen dat één op de duizend soms eens per jaar wat X posts scraapt, die elk gemiddeld 1000 posts scrapen, voor kleine projecten, testen en plezier (leren). Dat zijn 30.000 scrapers per jaar, 100 scrapers per dag, is 100.000 posts per dag. Dit is niet echt veel gezien de miljoenen berichten per dag. X heeft nog steeds een gratis API account, maar dit is write-only. Als we gegevens willen scrapen, kunnen we alleen de X website gebruiken en deze openen met tools zoals Selenium.
Selenium is een zeer krachtig hulpmiddel voor geautomatiseerd testen, maar het kan ook worden gebruikt voor web scraping. Het is uitgebracht in 2004 en heeft een grote community. Andere populaire tools die je zou kunnen overwegen zijn Puppeteer en Playwright.
Menselijk gedrag nabootsen
Ik zou zeggen dat het niet illegaal is om een webpagina te scrapen zolang je de geschraapte gegevens niet in een herkenbare vorm vrijgeeft. Dat gezegd hebbende, kan ik me voorstellen dat sommige bedrijven de toegang tot hun webpagina's en diensten blokkeren voor geautomatiseerde software.
Wat is geautomatiseerde software precies en hoe kan het worden gedetecteerd? De meest voor de hand liggende controle is om te bepalen of het gedrag van een bezoeker afwijkt van menselijk gedrag. Dit betekent meestal dat je geautomatiseerde software zich als een mens moet gedragen. Typ iets in en wacht een paar seconden, klik op een knop en wacht een paar seconden, scroll naar beneden en wacht een paar seconden, enz. Een mens kan geen formulier invullen in 50 milliseconden ...
Beschouw een ander scenario, waarbij Selenium wordt gebruikt om pagina's van een website te openen met behulp van spraakbesturing. Zelfs als de website detecteert dat we Selenium gebruiken, lijkt dit volkomen legitiem omdat het heel duidelijk is dat de acties door een mens worden uitgevoerd.
Een website aflopen en een goedkoop product kopen is menselijk gedrag, maar alle goedkope producten in grote hoeveelheden kopen is verdacht. Vooral als dit dag na dag gebeurt.
Dit betekent dat onze software vooral menselijk gedrag moet nabootsen. Doe iets en wacht een willekeurig aantal seconden, doe iets anders en wacht een willekeurig aantal seconden, enz.
Browserselectie
Selenium ondersteunt veel browsers. Voor dit project gebruik ik Chrome. De reden is duidelijk: Chrome is de meest gebruikte browser, niet alleen door internet user's, maar ook door ontwikkelaars. Op mijn ontwikkelmachine gebruik ik Firefox, Chromium en (soms) Brave. Dit betekent dat ik Chrome uitsluitend voor dit project kan gebruiken. Als je Chrome ook gebruikt voor surfen op het web, dan kun je een apart profiel maken voor de scraping-applicatie.
Scraper-botdetectie
Een scraperbot is een hulpprogramma of stuk code dat gebruikt wordt om gegevens van webpagina's te halen. Web scraping is niet nieuw en veel organisaties hebben maatregelen genomen om te voorkomen dat geautomatiseerde tools toegang krijgen tot hun websites. In het artikel 'How to Avoid Bot Detection with Selenium', zie onderstaande links, staan verschillende manieren om botdetectie te voorkomen.
Als we niets doen, is het heel gemakkelijk om onze Selenium bot te detecteren. Zolang we ons als een mens gedragen, kan dit geen kwaad. We laten de doelwebsite zien dat we scriptkiddies zijn of dat we willen dat ze merken dat we automatisering gebruiken.
Toch kunnen sommige bedrijven ons om hun eigen legitieme redenen blokkeren. Laten we eerlijk zijn, een goede bot kan plotseling een hele slechte bot worden.
Er zijn enkele sites die we kunnen gebruiken om te bepalen of onze bot wordt herkend als geautomatiseerde software:
- https://fingerprint.com/products/bot-detection
- https://pixelscan.net
- https://nusecure.nl/#relax
Hier is wat voorbeeldcode die je kunt gebruiken om dit uit te proberen:
# bot_detect_test.py
import os
import time
# selenium
from selenium import webdriver
# chromedriver_binary, see pypi.org
import chromedriver_binary
# configure webdriver
from selenium.webdriver.chrome.options import Options
webdriver_options = Options()
# use a common screen format
webdriver_options.add_argument('window-size=1920,1080')
# profile
user_data_dir = os.path.join(os.getcwd(), 'selenium_data')
webdriver_options.add_argument('user-data-dir=' + user_data_dir)
# allow popups
webdriver_options.add_experimental_option('excludeSwitches', ['disable-popup-blocking']);
def main():
with webdriver.Chrome(options=webdriver_options) as wd:
wd.get('https://fingerprint.com/products/bot-detection')
time.sleep(8)
wd.save_screenshot('fingerprint.png')
print(f'ready')
if __name__ == '__main__':
main()
Worden we geblokkeerd?
Hoe meer we proberen om detectie van onze Selenium bot te voorkomen, hoe verdachter we lijken voor botdetectors.
Er is een project 'undetected_chromedriver', zie onderstaande links, dat een bot probeert te leveren, gebaseerd op Selenium, die niet gedetecteerd kan worden. Maar als het gedetecteerd wordt, dan kan de doelwebsite beslissen om u te blokkeren, want ... waarom zou iemand ondetecteerbaar willen zijn?
Bedrijven die botdetectors ontwikkelen of botdetectorservices aanbieden, volgen projecten zoals 'undetected_chromedriver' op de voet en proberen elke detectiepreventiemethode die wordt geïmplementeerd te verslaan.
Updates zijn ook een probleem. Een nieuwe versie van Selenium of Chrome kan de detectiepreventiemaatregelen doorbreken. Maar Chrome lange tijd niet bijwerken kan ook detecteerbaar en verdacht zijn.
Dus wat moeten we doen? Niets, een klein beetje of bijvoorbeeld 'undetected_chromedriver' gebruiken? Als je groots wilt scrapen, kun je het beste een scraping service gebruiken zoals de hierboven genoemde. Voor kleine projecten stel ik voor dat u niets doet om detectie te voorkomen en kijkt wat er gebeurt.
Moeten we het inloggen echt automatiseren?
In plaats van het inloggen en inloggen via ons script te automatiseren, kunnen we ook handmatig inloggen via de browser, en dan de gegevens van de browser (profielgegevens, sessie) gebruiken bij het verbinden met X. Dit bespaart een hoop code. Maar ... onze applicatie is in dit geval niet volledig geautomatiseerd. En hé, we gebruiken Selenium, de webautomatiseringstool (!).
Over de code
De onderstaande code automatiseert het inloggen op X en schraapt vervolgens een aantal berichten van een geselecteerde user. Zodra dit is gebeurd, wordt het script afgesloten. Het gebruikt een apart profiel voor de webbrowser, wat betekent dat het niet zou moeten interfereren met normaal browsergebruik. Als het aanmelden succesvol was en het script bestaat, dan wordt de volgende keer dat het script wordt gestart niet meer aangemeld, maar wordt direct begonnen met het selecteren van de user en het scrapen van de berichten.
De berichten van de geselecteerde user worden gedumpt in een bestand 'posts.txt'. Dit betekent dat de code de delen van de berichten niet extraheert. Je kunt hiervoor beter BeautifulSoup gebruiken.
Pagina's herkennen
Ons script herkent de volgende pagina's:
Wanneer niet ingelogd:
- Startpagina
- Inloggen - voer e-mail in
- Inloggen - wachtwoord invoeren
- Inloggen - voer usernaam in (bij ongebruikelijke activiteit)
Wanneer ingelogd:
- Ingelogde pagina
De pagina 'Ongebruikelijke activiteit' wordt soms getoond, bijvoorbeeld als je al bent ingelogd via een andere browser, vóór de pagina 'Wachtwoord invoeren'.
Wat we in een lus doen:
- De taal extraheren
- Detecteren welke pagina wordt getoond
- De actie voor de pagina uitvoeren
Op de startpagina verwijzen we gewoon door naar de aanmeldpagina. Alle andere niet-ingelogde pagina's hebben een titel, een 'formulierveld' en een 'formulierknop'. Dit betekent dat we ze op dezelfde manier kunnen behandelen.
En als we het over pagina's hebben, hebben we het niet over URL's, maar over wat er op het scherm wordt getoond. X is alles Javascript.
De 'op welke pagina zijn we' detector
Dit is waarschijnlijk het meest 'spannende' stukje code. We maken eerst een lijst van elkaar uitsluitende 'presence_of_element_located' items:
- Profiel-knop => ingelogd
- Inlogknop met tekst 'Aanmelden' => 'home_page'.
- <h1> tekst 'Aanmelden bij X' => 'login_page_1_email'
- <h1> tekst 'Voer uw wachtwoord in' => 'login_page_2_password'
- <h1> tekst 'Voer uw telefoonnummer of usernaam in' => 'login_page_3_username'.
Vervolgens wachten we tot ten minste een van de elementen zich bevindt:
wait = WebDriverWait(
....
)
elem = wait.until(EC.any_of(*tuple(or_conditions)))
Zodra we het element hebben, kunnen we de pagina bepalen. En zodra we de pagina hebben, kunnen we de juiste actie uitvoeren.
Oeps: 'Er ging iets mis. Probeer opnieuw te laden.
Bij het inloggen verschijnt deze melding soms. Ik zie het op willekeurige momenten, dus ik vraag me af of het expres is ingebouwd. De code genereert een time-outfout wanneer dit gebeurt. U moet het script opnieuw starten of uw eigen functie voor opnieuw proberen implementeren.
Talen
Er zijn twee talen waar we mee te maken hebben, de niet-ingelogde taal en de accounttaal. Vaak zijn ze hetzelfde. De onderstaande code draait voor de talen Engels en Nederlands, zelfs gemengd. De taal wordt uit de pagina gehaald.
Om een nieuwe taal toe te voegen, voegt u deze toe aan:
# languages
self.available_langs = ['en', 'nl']
en voeg je de teksten toe aan de pagina's:
# pages
self.pages = [
...
]
Natuurlijk moet je eerst de teksten opzoeken door handmatig in te loggen.
Om de browser in een andere taal te starten, verwijder je eerst de map met het browserprofiel:
rm -R browser_profile
en start dan het script na (!) het instellen van de nieuwe locale:
bash -c 'LANGUAGE=nl_NL.UTF-8 python x_get_posts.py'
XPath zoeken
Als je vanaf de root van het document wilt zoeken, begin je XPath met '//'.
Als je relatief ten opzichte van een bepaald element wilt zoeken, begin je XPath met './/'.
Berichten
Op dit moment worden de zichtbare berichten als HTML in het bestand 'posts.txt' geschreven. Om meer berichten te krijgen, implementeer je je eigen scroll-down functie.
Gegevens uit berichten halen
Dit is jouw keuze. Ik stel voor dat je BeautifulSoup gebruikt.
De code
Voor het geval je het wilt proberen, is hier de code. Voordat je de code uitvoert, moet je ervoor zorgen dat je je accountgegevens en zoeknaam toevoegt.
# x_get_posts.py
import logging
import os
import random
import sys
import time
# selenium
from selenium import webdriver
# using chromedriver_binary
import chromedriver_binary # Adds chromedriver binary to path
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
import selenium.common.exceptions as selenium_exceptions
from selenium.webdriver.chrome.options import Options
# configure webdriver: hide gui, window size, full-screen
webdriver_options = Options()
#webdriver_options.add_argument('--headless')
# use a common screen format
webdriver_options.add_argument('window-size=1920,1080')
# use a separate profile
user_data_dir = os.path.join(os.getcwd(), 'browser_profile')
webdriver_options.add_argument('user-data-dir=' + user_data_dir)
# allow popups
webdriver_options.add_experimental_option('excludeSwitches', ['disable-popup-blocking']);
# force language
webdriver_options.add_argument('--lang=nl-NL');
def get_logger(
console_log_level=logging.DEBUG,
file_log_level=logging.DEBUG,
log_file=os.path.splitext(__file__)[0] + '.log',
):
logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
if console_log_level:
# console
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(console_log_level)
console_handler.setFormatter(logging.Formatter(logger_format))
logger.addHandler(console_handler)
if file_log_level:
# file
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(file_log_level)
file_handler.setFormatter(logging.Formatter(logger_format))
logger.addHandler(file_handler)
return logger
logger = get_logger(
file_log_level=None,
)
logger.debug('START')
logger.debug(f'user_data_dir = {user_data_dir}')
class SeleniumBase:
def __init__(
self,
logger=None,
wd=None,
):
self.logger = logger
self.wd = wd
self.web_driver_wait_timeout = 10
self.web_driver_wait_poll_frequency = 1
self.min_wait_for_next_page = 3.
def get_elem_attrs(self, elem, dump=False, dump_header=None):
elem_attrs = self.wd.execute_script('var items = {}; for (index = 0; index < arguments[0].attributes.length; ++index) { items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value }; return items;', elem)
if dump:
if dump_header is not None:
self.logger.debug(f'{dump_header}')
for k, v in elem_attrs.items():
self.logger.debug(f'attrs {k}: {v}')
return elem_attrs
def random_wait(self, min_wait_secs=None):
self.logger.debug(f'(min_wait_secs = {min_wait_secs})')
min_wait_secs = min_wait_secs or 2.
wait_secs = min_wait_secs + 2 * random.uniform(0, 1)
self.logger.debug(f'wait_secs = {wait_secs:.1f}')
time.sleep(wait_secs)
def elem_click_wait(self, elem, before_min_wait=None, after_min_wait=None):
self.logger.debug(f'(, before_min_wait = {before_min_wait}, after_min_wait = {after_min_wait})')
if before_min_wait is not None:
self.random_wait(min_wait_secs=before_min_wait)
elem.click()
if after_min_wait is not None:
self.random_wait(min_wait_secs=after_min_wait)
def elem_send_keys_wait(self, elem, value, before_min_wait=None, after_min_wait=None):
self.logger.debug(f'(, value = {value}, before_min_wait = {before_min_wait}, after_min_wait = {after_min_wait})')
if before_min_wait is not None:
self.random_wait(min_wait_secs=before_min_wait)
elem.send_keys(value)
if after_min_wait is not None:
self.random_wait(min_wait_secs=after_min_wait)
def set_form_field_value_and_click(self, form_field_elem=None, form_field_value=None, form_button_elem=None):
self.logger.debug(f'(form_field_elem = {form_field_elem}, form_field_value = {form_field_value}, form_button_elem = {form_button_elem})')
if form_field_elem is not None and form_field_value is not None:
self.random_wait(min_wait_secs=1.)
form_field_elem.send_keys(form_field_value)
if form_button_elem is not None:
self.random_wait(min_wait_secs=.5)
form_button_elem.click()
self.random_wait(min_wait_secs=self.min_wait_for_next_page)
def wait_for_element(
self,
xpath,
parent_xpath=None,
parent_attr_check=None,
visible=True,
):
self.logger.debug(f'xpath = {xpath}, parent_xpath = {parent_xpath}, parent_attr_check = {parent_attr_check}, visible = {visible}')
wait = WebDriverWait(
self.wd,
timeout=self.web_driver_wait_timeout,
poll_frequency=self.web_driver_wait_poll_frequency,
ignored_exceptions=[
selenium_exceptions.ElementNotVisibleException,
selenium_exceptions.ElementNotSelectableException,
],
)
try:
if visible:
elem = wait.until(EC.visibility_of_element_located((By.XPATH, xpath)))
else:
elem = wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
except selenium_exceptions.TimeoutException as e:
self.logger.exception(f'TimeoutException, e = {e}, e.args = {e.args}')
raise
except Exception as e:
self.logger.exception(f'other exception, e = {e}')
raise
self.get_elem_attrs(elem, dump=True, dump_header='elem')
if parent_xpath is None:
self.logger.debug(f'elem.tag_name = {elem.tag_name}')
return elem
parent_elem = elem.find_element(By.XPATH, parent_xpath)
self.get_elem_attrs(parent_elem, dump=True, dump_header='parent_elem')
if parent_attr_check is not None:
for k, v_expected in parent_attr_check.items():
v = parent_elem.get_attribute(k)
if v != v_expected:
raise Exception(f'parent_elem = {parent_elem}, attr k = {k}, v = {v} != v_expected = {v_expected}')
self.logger.debug(f'parent_elem.tag_name = {parent_elem.tag_name}')
return parent_elem
def wait_for_elements(self, xpath):
self.logger.debug(f'xpath = {xpath}')
wait = WebDriverWait(
self.wd,
timeout=self.web_driver_wait_timeout,
poll_frequency=self.web_driver_wait_poll_frequency,
ignored_exceptions=[
selenium_exceptions.ElementNotVisibleException,
selenium_exceptions.ElementNotSelectableException,
],
)
try:
elems = wait.until(EC.presence_of_all_elements_located((By.XPATH, xpath)))
except selenium_exceptions.TimeoutException as e:
self.logger.exception(f'TimeoutException, e = {e}, e.args = {e.args}')
raise
except Exception as e:
self.logger.exception(f'other exception, e = {e}')
raise
elems_len = len(elems)
self.logger.debug(f'elems_len = {elems_len}')
return elems
class XGetPosts(SeleniumBase):
def __init__(
self,
logger=None,
wd=None,
account=None,
search_for=None,
):
super().__init__(
logger=logger,
wd=wd,
)
self.account = account
self.search_for = search_for
# langs are in the <html> tag
self.available_langs = ['en', 'nl']
# pages
self.pages = [
{
'name': 'home_page',
'texts': {
'en': {
'login_button': 'Sign in',
},
'nl': {
'login_button': 'Inloggen',
},
},
'page_processor': self.home_page,
'login_button_text_xpath': '//a[@role="link"]/div/span/span[text()[normalize-space()="' + '{{login_button_text}}' + '"]]'
},
{
'name': 'login_page_1_email',
'texts': {
'en': {
'title': 'Sign in to X',
'form_button': 'Next',
},
'nl': {
'title': 'Registreren bij X',
'form_button': 'Volgende',
},
},
'page_processor': self.login_page_1_email,
'form_field_xpath': '//input[@type="text" and @name="text" and @autocomplete="username"]',
'form_field_value': self.account['email'],
'form_button_text_xpath': '//div[@role="button"]/div/span/span[text()[normalize-space()="' + '{{form_button_text}}' + '"]]'
},
{
'name': 'login_page_2_password',
'texts': {
'en': {
'title': 'Enter your password',
'form_button': 'Log in',
},
'nl': {
'title': 'Voer je wachtwoord in',
'form_button': 'Inloggen',
},
},
'page_processor': self.login_page_2_password,
'form_field_xpath': '//input[@type="password" and @name="password" and @autocomplete="current-password"]',
'form_field_value': self.account['password'],
'form_button_text_xpath': '//div[@role="button"]/div/span/span[text()[normalize-space()="' + '{{form_button_text}}' + '"]]',
},
{
'name': 'login_page_3_username',
'texts': {
'en': {
'title': 'Enter your phone number or username',
'form_button': 'Next',
},
'nl': {
'title': 'Voer je telefoonnummer of gebruikersnaam',
'form_button': 'Volgende',
},
},
'page_processor': self.login_page_3_username,
'form_field_xpath': '//input[@type="text" and @name="text" and @autocomplete="on"]',
'form_field_value': self.account['username'],
'form_button_text_xpath': '//div[@role="button"]/div/span/span[text()[normalize-space()="' + '{{form_button_text}}' + '"]]',
},
{
'name': 'loggedin_page',
'texts': {
'en': {
},
'nl': {
},
},
'page_processor': self.loggedin_page,
},
]
self.page_name_pages = {}
for page in self.pages:
self.page_name_pages[page['name']] = page
def get_page_lang(self, url):
self.logger.debug(f'(url = {url})')
# get lang from html tag
html_tag_xpath = '//html[@dir="ltr"]'
html_tag_elem = self.wait_for_element(
xpath=html_tag_xpath,
)
# lang must be present and available
lang = html_tag_elem.get_attribute('lang')
if lang not in self.available_langs:
raise Exception(f'lang = {lang} not in available_langs = {self.available_langs}')
self.logger.debug(f'lang = {lang}')
return lang
def which_page(self, url, lang):
self.logger.debug(f'(url = {url}, lang = {lang})')
# construct list of items that identify:
# - not logged in pages
# - logged in
or_conditions = []
for page in self.pages:
page_name = page['name']
if page_name == 'home_page':
# check for 'Sign in' button
login_button_text = page['texts'][lang]['login_button']
xpath = '//a[@href="/login" and @role="link" and @data-testid="loginButton"]/div[@dir="ltr"]/span/span[text()[normalize-space()="' + login_button_text + '"]]'
self.logger.debug(f'xpath = {xpath}')
or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )
elif page_name == 'login_page_1_email':
# check for <h1> title
title_text = page['texts'][lang]['title']
xpath = '//h1/span/span[text()[normalize-space()="' + title_text + '"]]'
self.logger.debug(f'xpath = {xpath}')
or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )
elif page_name == 'login_page_2_password':
# check for <h1> title
title_text = page['texts'][lang]['title']
xpath = '//h1/span/span[text()[normalize-space()="' + title_text + '"]]'
self.logger.debug(f'xpath = {xpath}')
or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )
elif page_name == 'login_page_3_username':
# check for <h1> title
title_text = page['texts'][lang]['title']
xpath = '//h1/span/span[text()[normalize-space()="' + title_text + '"]]'
self.logger.debug(f'xpath = {xpath}')
or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )
# check if logged in using profile button
xpath = '//a[@href="/' + self.account['screen_name'] + '" and @role="link" and @data-testid="AppTabBar_Profile_Link"]'
self.logger.debug(f'xpath = {xpath}')
or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )
# or_conditions
self.logger.debug(f'or_conditions = {or_conditions}')
wait = WebDriverWait(
self.wd,
timeout=self.web_driver_wait_timeout,
poll_frequency=self.web_driver_wait_poll_frequency,
ignored_exceptions=[
selenium_exceptions.ElementNotVisibleException,
selenium_exceptions.ElementNotSelectableException,
],
)
try:
elem = wait.until(EC.any_of(*tuple(or_conditions)))
except selenium_exceptions.TimeoutException as e:
self.logger.exception(f'selenium_exceptions.TimeoutException, e = {e}, e.args = {e.args}')
raise
except Exception as e:
self.logger.exception(f'other exception, e = {e}')
raise
# not logged in and a known page, ... or .... logged in
self.logger.debug(f'elem.tag_name = {elem.tag_name}')
self.get_elem_attrs(elem, dump=True)
page = None
if elem.tag_name == 'a':
href = elem.get_attribute('href')
if href == '/login':
page = self.page_name_pages['home_page']
else:
data_testid = elem.get_attribute('data-testid')
if data_testid == 'AppTabBar_Profile_Link':
page = self.page_name_pages['loggedin_page']
elif elem.tag_name == 'span':
elem_text = elem.text
self.logger.debug(f'elem_text = {elem_text}')
if elem_text == self.page_name_pages['home_page']['texts'][lang]['login_button']:
page = self.page_name_pages['home_page']
elif elem_text == self.page_name_pages['login_page_1_email']['texts'][lang]['title']:
page = self.page_name_pages['login_page_1_email']
elif elem_text == self.page_name_pages['login_page_2_password']['texts'][lang]['title']:
page = self.page_name_pages['login_page_2_password']
elif elem_text == self.page_name_pages['login_page_3_username']['texts'][lang]['title']:
page = self.page_name_pages['login_page_3_username']
elif elem.tag_name == 'h1':
pass
self.logger.debug(f'page = {page}')
if page is None:
raise Exception(f'page is None')
return page
def process_page(self, url):
self.logger.debug(f'(url = {url})')
lang = self.get_page_lang(url)
page = self.which_page(url, lang)
page_processor = page['page_processor']
if page_processor is None:
raise Exception(f'page_processor is None')
return page_processor(page, url, lang)
def login_page_123_processor(self, page, url, lang):
self.logger.debug(f'(page = {page}, url = {url}, lang = {lang}')
# get (optional) form_field and form_button
form_field_xpath = page.get('form_field_xpath')
form_field_value = page.get('form_field_value')
form_field_elem = None
if form_field_xpath is not None:
form_field_elem = self.wait_for_element(
xpath=form_field_xpath,
)
form_button_elem = self.wait_for_element(
xpath=page['form_button_text_xpath'].replace('{{form_button_text}}', page['texts'][lang]['form_button']),
parent_xpath='../../..',
parent_attr_check={
'role': 'button',
},
)
# enter form_field_value and click
self.set_form_field_value_and_click(
form_field_elem=form_field_elem,
form_field_value=form_field_value,
form_button_elem=form_button_elem,
)
# return current_url
current_url = self.wd.current_url
self.logger.debug(f'current_url = {current_url}')
return current_url
def home_page(self, page, url, lang):
self.logger.debug(f'page = {page}, url = {url}, lang = {lang}')
login_button_elem = self.wait_for_element(
xpath=page['login_button_text_xpath'].replace('{{login_button_text}}', page['texts'][lang]['login_button']),
parent_xpath='../../..',
parent_attr_check={
'role': 'link',
},
)
href = login_button_elem.get_attribute('href')
self.logger.debug(f'href = {href}')
# redirect
self.wd.get(href)
self.random_wait(min_wait_secs=self.min_wait_for_next_page)
# return current_url
current_url = self.wd.current_url
self.logger.debug(f'current_url = {current_url}')
return current_url
def login_page_1_email(self, page, url, lang):
return self.login_page_123_processor(page, url, lang=lang)
def login_page_2_password(self, page, url, lang):
return self.login_page_123_processor(page, url, lang=lang)
def login_page_3_username(self, page, url, lang):
return self.login_page_123_processor(page, url, lang=lang)
def loggedin_page(self, page, url, lang):
self.logger.debug(f'page = {page}, url = {url}, lang = {lang}')
# locate search box
xpath = '//form[@role="search"]/div/div/div/div/label/div/div/input[@type="text" and @data-testid="SearchBox_Search_Input"]'
search_field_elem = self.wait_for_element(xpath)
# type the search item
self.elem_send_keys_wait(
search_field_elem,
self.search_for,
before_min_wait=2,
after_min_wait=self.min_wait_for_next_page,
)
# locate search result option buttons
xpath = '//div[@role="listbox" and starts-with(@id, "typeaheadDropdown-")]/div[@role="option" and @data-testid="typeaheadResult"]/div[@role="button"]'
button_elems = self.wait_for_elements(xpath)
# find search_for in options
xpath = './/span[text()[normalize-space()="' + self.search_for + '"]]'
found = False
for button_elem in button_elems:
try:
elem = button_elem.find_element(By.XPATH, xpath)
found = True
break
except selenium_exceptions.NoSuchElementException:
continue
if not found:
raise Exception(f'search_for = {search_for} not found in typeaheadDropdown')
# click the found item
self.elem_click_wait(
button_elem,
before_min_wait=2,
after_min_wait=self.min_wait_for_next_page,
)
# slurp posts visibility_of_element_located
xpath = '//article[@data-testid="tweet"]'
posts = self.wait_for_elements(xpath)
# dump posts
post_html_items = []
posts_len = len(posts)
visible_posts_len = 0
for post in posts:
self.logger.debug(f'tag = {post.tag_name}')
self.logger.debug(f'aria-labelledby = {post.get_attribute("aria-labelledby")}')
self.logger.debug(f'data-testid = {post.get_attribute("data-testid")}')
self.logger.debug(f'post.is_displayed() = {post.is_displayed()}')
if not post.is_displayed():
continue
visible_posts_len += 1
# expand to html
post_html = post.get_attribute('outerHTML')
post_html_items.append(post_html)
if posts_len > 0:
with open('posts.txt', 'w') as fo:
fo.write('\n'.join(post_html_items))
self.logger.debug(f'posts_len = {posts_len}')
self.logger.debug(f'visible_posts_len = {visible_posts_len}')
self.random_wait(min_wait_secs=5)
sys.exit()
def main():
# your account
account = {
'email': 'johndoe@example.com',
'password': 'secret',
'username': 'johndoe',
'screen_name': 'JohnDoe',
}
# your search
search_for = '@elonmusk'
with webdriver.Chrome(options=webdriver_options) as wd:
x = XGetPosts(
logger=logger,
wd=wd,
account=account,
search_for=search_for,
)
url = 'https://www.x.com'
wd.get(url)
x.random_wait(min_wait_secs=x.min_wait_for_next_page)
url = wd.current_url
while True:
logger.debug(f'url = {url}')
url = x.process_page(url)
logger.debug(f'ready ')
if __name__ == '__main__':
main()
Samenvatting
In deze post hebben we Selenium gebruikt om het inloggen op X te automatiseren en een aantal berichten te extraheren. Om elementen te vinden, gebruiken we meestal (relatief) XPaths. We proberen niet te verbergen dat we automatisering gebruiken, maar proberen ons zoveel mogelijk als een mens te gedragen. De code ondersteunt meerdere talen. De code werkt vandaag, maar kan morgen falen door veranderingen in teksten en naamgeving.
Links / credits
ChromeDriver - WebDriver for Chrome
https://sites.google.com/a/chromium.org/chromedriver/capabilities
Playwright
https://playwright.dev/python
Puppeteer
https://pptr.dev
ScrapingAnt - Puppeteer vs. Selenium - Which Is Better? + Bonus
https://scrapingant.com/blog/puppeteer-vs-selenium
Selenium with Python
https://selenium-python.readthedocs.io/index.html
Selenium with Python - 5. Waits
https://selenium-python.readthedocs.io/waits.html
Tracking Modified Selenium ChromeDriver
https://datadome.co/bot-management-protection/tracking-modified-selenium-chromedriver
undetected_chromedriver
https://github.com/ultrafunkamsterdam/undetected-chromedriver
WebScrapingAPI
https://www.webscrapingapi.com
WebScrapingAPI
https://www.webscrapingapi.com
ZenRows - How to Avoid Bot Detection with Selenium
https://www.zenrows.com/blog/selenium-avoid-bot-detection
Lees meer
Scraping Selenium Web automation
Recent
- Database UUID primaire sleutels van je webapplicatie verbergen
- Don't Repeat Yourself (DRY) met Jinja2
- SQLAlchemy, PostgreSQL, maximum aantal rijen per user
- Toon de waarden in SQLAlchemy dynamische filters
- Veilige gegevensoverdracht met Public Key versleuteling en pyNaCl
- rqlite: een alternatief voor SQLite met hoge beschikbaarheid en distributed
Meest bekeken
- Met behulp van Python's pyOpenSSL om SSL-certificaten die van een host zijn gedownload te controleren
- Gebruik van UUIDs in plaats van Integer Autoincrement Primary Keys met SQLAlchemy en MariaDb
- Maak verbinding met een dienst op een Docker host vanaf een Docker container
- PyInstaller en Cython gebruiken om een Python executable te maken
- SQLAlchemy: Gebruik van Cascade Deletes om verwante objecten te verwijderen
- Flask RESTful API verzoekparametervalidatie met Marshmallow-schema's