angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

X Автоматизация и скраппинг веб-сайтов с помощью Selenium

Для небольших проектов Selenium мы не препятствуем автоматизации обнаружения, но стараемся имитировать поведение человека.

11 августа 2023 Обновленный 11 августа 2023
post main image
https://unsplash.com/@nixcreative

Когда вы хотите получить данные из Web, вы должны знать, что вы делаете. Не стоит перегружать целевой сервер запросами. Если вы делаете это из одного места, например IP address, вы можете получить (временный) бан.

Если вы хотите скрести большие объемы, рассмотрите возможность использования специализированных сервисов, таких как ZenRows, ScrapFly, WebScrapingAPI, ScrapingAnt и т.д. Они dist распределяют ваши запросы по множеству систем, каждая из которых имеет уникальный IP address, что означает, что целевой сервер будет думать, что к нему обращается множество различных (человеческих) клиентов.

Но иногда мы просто хотим соскрести немного данных, скажем, каждый день просматривать новые сообщения нескольких человек в социальных сетях, например X .

В этом посте мы будем использовать Selenium. Существуют такие решения, как Puppeteer или Playwright, но Selenium существует с 2004 года, это наиболее широко используемый инструмент для автоматизированного тестирования и имеет большое сообщество.

Я начну с некоторых соображений и закончу рабочим кодом. Код автоматизирует вход в X, , а затем поиск и отбраковку некоторых сообщений. Я знаю, что завтра все может измениться, но сегодня это работает.

Как обычно, я использую Ubuntu 22.04.

Скраппинг из X с помощью Selenium

Недавно X решил, что для просмотра сообщений необходимо войти в систему. Мне это не нравится. Большую часть времени я читаю сообщения только нескольких человек, используя расширение для браузера 'Breakthrough Twitter Login Wall'. Это больше не работает.

И иногда я скребу некоторые данные, но не сотни тысяч твитов. Думаю, так поступают многие разработчики. В мире насчитывается почти 30 миллионов инженеров-программистов. Теперь предположим, что каждый тысячный иногда раз в год скребет несколько X постов, которые скребут в среднем по 1000 постов каждый, для небольших проектов, тестирования и развлечения (обучения). То есть 30.000 скреперов в год, 100 скреперов в день, это 100.000 постов в день. Это не так уж и много, учитывая миллионы сообщений каждый день. X все еще имеет бесплатный аккаунт API , но он доступен только для записи. Если мы хотим получить данные, мы можем использовать только веб-сайт X и получить доступ к нему с помощью таких инструментов, как Selenium.

Selenium - это очень мощный инструмент для автоматизированного тестирования, но его также можно использовать для веб-скрепинга. Он был выпущен в 2004 году и имеет большое сообщество. Другие популярные инструменты, которые вы можете рассмотреть, - Puppeteer и Playwright.

Имитация человеческого поведения

Я бы сказал, что соскабливание веб-страниц не является противозаконным, если вы не выпускаете соскобленные данные в узнаваемом виде. Тем не менее, я могу себе представить, что некоторые компании блокируют доступ к своим веб-страницам и сервисам для автоматизированного программного обеспечения.

Что именно представляет собой автоматизированное программное обеспечение и как его можно обнаружить? Самая очевидная проверка - определить, отличается ли поведение посетителя от поведения человека. Обычно это означает, что ваше автоматизированное программное обеспечение должно вести себя как человек. Ввести что-то и подождать несколько секунд, нажать на кнопку и подождать несколько секунд, прокрутить страницу вниз и подождать несколько секунд и т.д. Человек не может заполнить форму за 50 миллисекунд...

Рассмотрим другой сценарий, когда Selenium используется для открытия страниц Web-сайта с помощью голосового управления. Даже если сайт обнаружит, что мы используем Selenium, это выглядит вполне законно, поскольку совершенно очевидно, что действия выполняет человек.

Зайти на сайт и купить дешевый товар - это человеческое поведение, но покупать все дешевые товары в больших количествах - это подозрительно. Особенно если это происходит изо дня в день.

Это означает, что главное, что должно делать наше программное обеспечение, - имитировать поведение человека. Сделать что-то и подождать случайное количество секунд, сделать что-то еще и подождать случайное количество секунд и т.д.

Выбор браузера

Selenium поддерживает множество браузеров. Для данного проекта я использую Chrome. Причина этого не вызывает сомнений: Chrome - самый используемый браузер не только среди пользователей Интернета user, но и среди разработчиков. На своей машине для разработчиков я использую Firefox, Chromium и (иногда) Brave. Это означает, что я могу использовать Chrome исключительно для этого проекта. Если вы используете Chrome также для просмотра веб-страниц, то вы можете создать отдельный профиль для приложения скраппинга.

Обнаружение скрепер-бота

Скрепер-бот - это инструмент или фрагмент кода, используемый для извлечения данных с веб-страниц. Веб-скреппинг - явление не новое, и многие организации предпринимают меры по предотвращению доступа автоматизированных инструментов к своим сайтам. В статье 'How to Avoid Bot Detection with Selenium', см. ссылки ниже, перечислены несколько способов избежать обнаружения бота.

Если мы ничего не будем делать, то обнаружить нашего бота Selenium будет очень легко. Пока мы ведем себя как человек, это может быть не так уж плохо. Мы показываем целевому сайту, что мы либо скрипт-кидди, либо хотим, чтобы они заметили, что мы используем какую-то автоматизацию.

Тем не менее, некоторые компании могут блокировать нас по своим законным причинам. Давайте посмотрим правде в глаза: хороший бот может внезапно превратиться в очень плохого.

Есть несколько сайтов, которые можно использовать для определения того, признан ли наш бот автоматизированным программным обеспечением:

  • https://fingerprint.com/products/bot-detection
  • https://pixelscan.net
  • https://nowsecure.nl/#relax

Ниже приведен пример кода, который можно использовать для тестирования:

# bot_detect_test.py
import os
import time

# selenium
from selenium import webdriver
# chromedriver_binary, see pypi.org
import chromedriver_binary

# configure webdriver
from selenium.webdriver.chrome.options import Options
webdriver_options = Options()
# use a common screen format
webdriver_options.add_argument('window-size=1920,1080')
# profile
user_data_dir = os.path.join(os.getcwd(), 'selenium_data')
webdriver_options.add_argument('user-data-dir=' + user_data_dir)
# allow popups
webdriver_options.add_experimental_option('excludeSwitches', ['disable-popup-blocking']);


def main():
    with webdriver.Chrome(options=webdriver_options) as wd:
        wd.get('https://fingerprint.com/products/bot-detection')
        time.sleep(8)
        wd.save_screenshot('fingerprint.png')
    print(f'ready')

if __name__ == '__main__':
    main()

Будем ли мы заблокированы?

Чем больше мы пытаемся предотвратить обнаружение нашего бота Selenium , тем более подозрительными мы можем показаться бот-детекторам.

Существует проект 'undetected_chromedriver', см. ссылки ниже, который пытается поставить бота, основанного на Selenium, который не может быть обнаружен. Но если он будет обнаружен, то целевой сайт может решить заблокировать вас, потому что... зачем кому-то быть необнаруживаемым?

Компании, разрабатывающие детекторы ботов или предлагающие услуги по их обнаружению, внимательно следят за проектами, подобными 'undetected_chromedriver', и стараются опередить все реализуемые методы предотвращения обнаружения.

Проблему представляют и обновления. Новая версия Selenium или Chrome может нарушить меры по предотвращению обнаружения. Но и длительное отсутствие обновлений Chrome также может стать причиной обнаружения и вызвать подозрения.

Так что же делать? Ничего, немного, или использовать, например, 'undetected_chromedriver'? Если вы хотите наскрести много, то лучше всего воспользоваться сервисами, подобными вышеупомянутым. Для небольших проектов я предлагаю ничего не делать, чтобы предотвратить обнаружение, и посмотреть, что получится.

Действительно ли нам нужно автоматизировать вход в систему?

Вместо того чтобы автоматизировать вход в систему и регистрироваться с помощью нашего скрипта, мы можем войти в систему вручную с помощью браузера, а затем использовать данные из браузера (данные профиля, сессии) при подключении к X. Это сэкономит много кода. Но ... наше приложение в этом случае не будет полностью автоматизировано. А ведь мы используем Selenium, инструмент для автоматизации работы в Интернете (!).

О коде

Приведенный ниже код автоматизирует вход в X , а затем отбирает несколько постов из выбранного user. После этого скрипт завершает работу. Он использует отдельный профиль для веб-браузера, поэтому не должен мешать обычному использованию браузера. Если вход в систему прошел успешно и скрипт существует, то при следующем запуске скрипта он уже не входит в систему, а сразу начинает выбирать user и перебирать сообщения.

Посты выбранного user сбрасываются в файл 'posts.txt'. Это означает, что код не извлекает части сообщений. Для этого можно использовать BeautifulSoup .

Распознавание страниц

Наш скрипт работает со следующими страницами:

Когда вы не вошли в систему:

  • Главная страница
  • Вход - введите e-mail
  • Вход - ввести пароль
  • Логин - введите username (при необычной активности)

При входе в систему:

  • Страница входа в систему

Страница "Необычная активность" показывается иногда, например, когда вы уже вошли в систему через другой браузер, перед страницей "Введите пароль".

Что мы делаем в цикле:

  • Извлекаем язык
  • Определить, какая страница показана
  • Выполняем действие для страницы

Если мы находимся на главной странице, то просто перенаправляем на страницу входа в систему. Все остальные страницы без входа в систему имеют заголовок, "поле формы" и "кнопку формы". Это означает, что мы можем работать с ними аналогичным образом.

Причем, говоря о страницах, мы имеем в виду не URL, а то, что отображается на экране. X - это все Javascript.

Детектор "на какой странице мы находимся

Это, пожалуй, самый "интересный" фрагмент кода. Сначала мы создаем список взаимоисключающих элементов 'presence_of_element_located':

  • Profile-button => logged in
  • Кнопка входа в систему с текстом 'Sign in' => 'home_page'
  • <h1> текст 'Войти в X' => 'login_page_1_email'
  • <h1> text 'Введите пароль' => 'login_page_2_password'
  • <h1> text 'Введите номер телефона или username' => 'login_page_3_username'

Затем мы ждем, пока хотя бы один из элементов не станет расположенным:

    wait = WebDriverWait(
        ....
    )
    elem = wait.until(EC.any_of(*tuple(or_conditions)))

Получив элемент, мы можем определить страницу. А получив страницу, мы можем выполнить соответствующее действие.

Oops: 'Что-то пошло не так. Попробуйте перезагрузить".

При входе в систему иногда появляется это сообщение. Я вижу его в случайное время, поэтому думаю, не было ли оно встроено специально. При этом код выдает ошибку таймаута. Необходимо перезапустить скрипт или реализовать собственную функцию повторной попытки.

Языки

Есть два языка, с которыми нам приходится иметь дело: язык незарегистрированных пользователей и язык учетных записей. Часто они совпадают. Приведенный ниже код работает для английского и голландского языков, даже смешанных. Язык извлекается из страницы.

Чтобы добавить новый язык, добавьте его в:

    # languages
    self.available_langs = ['en', 'nl']

, и добавьте тексты на страницы:

    # pages
    self.pages = [
        ...
    ]

Конечно, сначала необходимо найти тексты, используя ручной вход в систему.

Чтобы запустить браузер на другом языке, сначала удалите каталог профиля браузера:

rm -R browser_profile

, а затем запустить скрипт после (!) установки новой локали:

bash -c 'LANGUAGE=nl_NL.UTF-8 python x_get_posts.py'

XPath поиск

Если вы хотите искать от корня документа, начните XPath с '//'.
Если вы хотите искать относительно определенного элемента, начните XPath с './/'.

Посты

На данный момент видимые посты записываются в файл HTML в виде posts.txt. Чтобы получить больше постов, реализуйте собственную функцию прокрутки вниз.

Извлечение данных из поста

Здесь все зависит от вас. Я предлагаю использовать BeautifulSoup.

Код

Если вы хотите попробовать, то вот код. Перед запуском убедитесь, что вы добавили данные своего аккаунта и имя поиска.

# x_get_posts.py
import logging
import os
import random
import sys
import time

# selenium
from selenium import webdriver
# using chromedriver_binary
import chromedriver_binary  # Adds chromedriver binary to path
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
import selenium.common.exceptions as selenium_exceptions

from selenium.webdriver.chrome.options import Options

# configure webdriver: hide gui, window size, full-screen
webdriver_options = Options()
#webdriver_options.add_argument('--headless')
# use a common screen format
webdriver_options.add_argument('window-size=1920,1080')
# use a separate profile
user_data_dir = os.path.join(os.getcwd(), 'browser_profile')
webdriver_options.add_argument('user-data-dir=' + user_data_dir)
# allow popups
webdriver_options.add_experimental_option('excludeSwitches', ['disable-popup-blocking']);
# force language
webdriver_options.add_argument('--lang=nl-NL');

def get_logger(
    console_log_level=logging.DEBUG,
    file_log_level=logging.DEBUG,
    log_file=os.path.splitext(__file__)[0] + '.log',
):
    logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    if console_log_level:
        # console
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setLevel(console_log_level)
        console_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(console_handler)
    if file_log_level:
        # file
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(file_log_level)
        file_handler.setFormatter(logging.Formatter(logger_format))
        logger.addHandler(file_handler)
    return logger

logger = get_logger(
    file_log_level=None,
)
logger.debug('START')
logger.debug(f'user_data_dir = {user_data_dir}')


class SeleniumBase:
    def __init__(
        self,
        logger=None,
        wd=None,
    ):
        self.logger = logger
        self.wd = wd

        self.web_driver_wait_timeout = 10
        self.web_driver_wait_poll_frequency = 1
        self.min_wait_for_next_page = 3.
        
    def get_elem_attrs(self, elem, dump=False, dump_header=None):
        elem_attrs = self.wd.execute_script('var items = {}; for (index = 0; index < arguments[0].attributes.length; ++index) { items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value }; return items;', elem)
        if dump:
            if dump_header is not None:
                self.logger.debug(f'{dump_header}')
            for k, v in elem_attrs.items():
                self.logger.debug(f'attrs {k}: {v}')
        return elem_attrs

    def random_wait(self, min_wait_secs=None):
        self.logger.debug(f'(min_wait_secs = {min_wait_secs})')
        min_wait_secs = min_wait_secs or 2.
        wait_secs = min_wait_secs + 2 * random.uniform(0, 1)
        self.logger.debug(f'wait_secs = {wait_secs:.1f}')
        time.sleep(wait_secs)

    def elem_click_wait(self, elem, before_min_wait=None, after_min_wait=None):
        self.logger.debug(f'(, before_min_wait = {before_min_wait}, after_min_wait = {after_min_wait})')
        if before_min_wait is not None:
            self.random_wait(min_wait_secs=before_min_wait)
        elem.click()
        if after_min_wait is not None:
            self.random_wait(min_wait_secs=after_min_wait)

    def elem_send_keys_wait(self, elem, value, before_min_wait=None, after_min_wait=None):
        self.logger.debug(f'(, value = {value}, before_min_wait = {before_min_wait}, after_min_wait = {after_min_wait})')
        if before_min_wait is not None:
            self.random_wait(min_wait_secs=before_min_wait)
        elem.send_keys(value)
        if after_min_wait is not None:
            self.random_wait(min_wait_secs=after_min_wait)

    def set_form_field_value_and_click(self, form_field_elem=None, form_field_value=None, form_button_elem=None):
        self.logger.debug(f'(form_field_elem = {form_field_elem}, form_field_value = {form_field_value}, form_button_elem = {form_button_elem})')
        if form_field_elem is not None and form_field_value is not None:
            self.random_wait(min_wait_secs=1.)
            form_field_elem.send_keys(form_field_value)
        if form_button_elem is not None:
            self.random_wait(min_wait_secs=.5)
            form_button_elem.click()
            self.random_wait(min_wait_secs=self.min_wait_for_next_page)

    def wait_for_element(
        self, 
        xpath,
        parent_xpath=None,
        parent_attr_check=None,
        visible=True,
    ):
        self.logger.debug(f'xpath = {xpath}, parent_xpath = {parent_xpath}, parent_attr_check = {parent_attr_check}, visible = {visible}')

        wait = WebDriverWait(
            self.wd,
            timeout=self.web_driver_wait_timeout,
            poll_frequency=self.web_driver_wait_poll_frequency,
            ignored_exceptions=[
                selenium_exceptions.ElementNotVisibleException,
                selenium_exceptions.ElementNotSelectableException,
            ],
        )
        try:
            if visible:
                elem = wait.until(EC.visibility_of_element_located((By.XPATH, xpath)))
            else:
                elem = wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
        except selenium_exceptions.TimeoutException as e:
            self.logger.exception(f'TimeoutException, e = {e}, e.args = {e.args}')
            raise
        except Exception as e:
            self.logger.exception(f'other exception, e = {e}')
            raise

        self.get_elem_attrs(elem, dump=True, dump_header='elem')

        if parent_xpath is None:
            self.logger.debug(f'elem.tag_name = {elem.tag_name}')
            return elem
            
        parent_elem = elem.find_element(By.XPATH, parent_xpath)
        self.get_elem_attrs(parent_elem, dump=True, dump_header='parent_elem')
        if parent_attr_check is not None:
            for k, v_expected in parent_attr_check.items():
                v = parent_elem.get_attribute(k)
                if v != v_expected:
                    raise Exception(f'parent_elem = {parent_elem}, attr k = {k}, v = {v} != v_expected = {v_expected}')
        self.logger.debug(f'parent_elem.tag_name = {parent_elem.tag_name}')
        return parent_elem
    
    def wait_for_elements(self, xpath):
        self.logger.debug(f'xpath = {xpath}')

        wait = WebDriverWait(
            self.wd,
            timeout=self.web_driver_wait_timeout,
            poll_frequency=self.web_driver_wait_poll_frequency,
            ignored_exceptions=[
                selenium_exceptions.ElementNotVisibleException,
                selenium_exceptions.ElementNotSelectableException,
            ],
        )
        try:
            elems = wait.until(EC.presence_of_all_elements_located((By.XPATH, xpath)))
        except selenium_exceptions.TimeoutException as e:
            self.logger.exception(f'TimeoutException, e = {e}, e.args = {e.args}')
            raise
        except Exception as e:
            self.logger.exception(f'other exception, e = {e}')
            raise
        elems_len = len(elems)
        self.logger.debug(f'elems_len = {elems_len}')
        return elems


class XGetPosts(SeleniumBase):
    def __init__(
        self,
        logger=None,
        wd=None,
        account=None,
        search_for=None,
    ):
        super().__init__(
            logger=logger,
            wd=wd,
        )
        self.account = account
        self.search_for = search_for

        # langs are in the <html> tag
        self.available_langs = ['en', 'nl']

        # pages
        self.pages = [
            {
                'name': 'home_page',
                'texts': {
                    'en': {
                        'login_button': 'Sign in',
                    },
                    'nl': {
                        'login_button': 'Inloggen',
                    },
                },
                'page_processor': self.home_page,
                'login_button_text_xpath': '//a[@role="link"]/div/span/span[text()[normalize-space()="' + '{{login_button_text}}' + '"]]'
            },
            {
                'name': 'login_page_1_email',
                'texts': {
                    'en': {
                        'title': 'Sign in to X',
                        'form_button': 'Next',
                    },
                    'nl': {
                        'title': 'Registreren bij X',
                        'form_button': 'Volgende',
                    },
                },
                'page_processor': self.login_page_1_email,
                'form_field_xpath': '//input[@type="text" and @name="text" and @autocomplete="username"]',
                'form_field_value': self.account['email'],
                'form_button_text_xpath': '//div[@role="button"]/div/span/span[text()[normalize-space()="' + '{{form_button_text}}' + '"]]'

            },
            {
                'name': 'login_page_2_password',
                'texts': {
                    'en': {
                        'title': 'Enter your password',
                        'form_button': 'Log in',
                    },
                    'nl': {
                        'title': 'Voer je wachtwoord in',
                        'form_button': 'Inloggen',
                    },
                },
                'page_processor': self.login_page_2_password,
                'form_field_xpath': '//input[@type="password" and @name="password" and @autocomplete="current-password"]',
                'form_field_value': self.account['password'],
                'form_button_text_xpath': '//div[@role="button"]/div/span/span[text()[normalize-space()="' + '{{form_button_text}}' + '"]]',
            },
            {
                'name': 'login_page_3_username',
                'texts': {
                    'en': {
                        'title': 'Enter your phone number or username',
                        'form_button': 'Next',
                    },
                    'nl': {
                        'title': 'Voer je telefoonnummer of gebruikersnaam',
                        'form_button': 'Volgende',
                    },
                },
                'page_processor': self.login_page_3_username,
                'form_field_xpath': '//input[@type="text" and @name="text" and @autocomplete="on"]',
                'form_field_value': self.account['username'],
                'form_button_text_xpath': '//div[@role="button"]/div/span/span[text()[normalize-space()="' + '{{form_button_text}}' + '"]]',
            },
            {
                'name': 'loggedin_page',
                'texts': {
                    'en': {
                    },
                    'nl': {
                    },
                },
                'page_processor': self.loggedin_page,
            },
        ]

        self.page_name_pages = {}
        for page in self.pages:
            self.page_name_pages[page['name']] = page

    def get_page_lang(self, url):
        self.logger.debug(f'(url = {url})')

        # get lang from html tag
        html_tag_xpath = '//html[@dir="ltr"]'
        html_tag_elem = self.wait_for_element(
            xpath=html_tag_xpath,
        )

        # lang must be present and available
        lang = html_tag_elem.get_attribute('lang')
        if lang not in self.available_langs:
            raise Exception(f'lang = {lang} not in  available_langs = {self.available_langs}')
        self.logger.debug(f'lang = {lang}')
        return lang

    def which_page(self, url, lang):
        self.logger.debug(f'(url = {url}, lang = {lang})')

        # construct list of items that identify:
        # - not logged in pages
        # - logged in
        or_conditions = []
        for page in self.pages:
            page_name = page['name']
            if page_name == 'home_page':
                # check for 'Sign in' button
                login_button_text = page['texts'][lang]['login_button']
                xpath = '//a[@href="/login" and @role="link" and @data-testid="loginButton"]/div[@dir="ltr"]/span/span[text()[normalize-space()="' + login_button_text + '"]]'
                self.logger.debug(f'xpath = {xpath}')
                or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )

            elif page_name == 'login_page_1_email':
                # check for <h1> title
                title_text = page['texts'][lang]['title']
                xpath = '//h1/span/span[text()[normalize-space()="' + title_text + '"]]'
                self.logger.debug(f'xpath = {xpath}')
                or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )

            elif page_name == 'login_page_2_password':
                # check for <h1> title
                title_text = page['texts'][lang]['title']
                xpath = '//h1/span/span[text()[normalize-space()="' + title_text + '"]]'
                self.logger.debug(f'xpath = {xpath}')
                or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )

            elif page_name == 'login_page_3_username':
                # check for <h1> title
                title_text = page['texts'][lang]['title']
                xpath = '//h1/span/span[text()[normalize-space()="' + title_text + '"]]'
                self.logger.debug(f'xpath = {xpath}')
                or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )

        # check if logged in using profile button
        xpath = '//a[@href="/' + self.account['screen_name'] + '" and @role="link" and @data-testid="AppTabBar_Profile_Link"]'
        self.logger.debug(f'xpath = {xpath}')
        or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )

        # or_conditions
        self.logger.debug(f'or_conditions = {or_conditions}')

        wait = WebDriverWait(
            self.wd,
            timeout=self.web_driver_wait_timeout,
            poll_frequency=self.web_driver_wait_poll_frequency,
            ignored_exceptions=[
                selenium_exceptions.ElementNotVisibleException,
                selenium_exceptions.ElementNotSelectableException,
            ],
        )
        try:
            elem = wait.until(EC.any_of(*tuple(or_conditions)))
        except selenium_exceptions.TimeoutException as e:
            self.logger.exception(f'selenium_exceptions.TimeoutException, e = {e}, e.args = {e.args}')
            raise
        except Exception as e:
            self.logger.exception(f'other exception, e = {e}')
            raise

        # not logged in and a known page, ... or .... logged in
        self.logger.debug(f'elem.tag_name = {elem.tag_name}')
        self.get_elem_attrs(elem, dump=True)

        page = None
        if elem.tag_name == 'a':
            href = elem.get_attribute('href')
            if href == '/login':
                page = self.page_name_pages['home_page']
            else:
                data_testid = elem.get_attribute('data-testid')
                if data_testid == 'AppTabBar_Profile_Link':
                    page = self.page_name_pages['loggedin_page']
        elif elem.tag_name == 'span':
            elem_text = elem.text
            self.logger.debug(f'elem_text = {elem_text}')
            if elem_text == self.page_name_pages['home_page']['texts'][lang]['login_button']:
                page = self.page_name_pages['home_page']
            elif elem_text == self.page_name_pages['login_page_1_email']['texts'][lang]['title']:
                page = self.page_name_pages['login_page_1_email']
            elif elem_text == self.page_name_pages['login_page_2_password']['texts'][lang]['title']:
                page = self.page_name_pages['login_page_2_password']
            elif elem_text == self.page_name_pages['login_page_3_username']['texts'][lang]['title']:
                page = self.page_name_pages['login_page_3_username']
        elif elem.tag_name == 'h1':
            pass
        self.logger.debug(f'page = {page}')
        if page is None:
            raise Exception(f'page is None')
        return page

    def process_page(self, url):
        self.logger.debug(f'(url = {url})')

        lang = self.get_page_lang(url)
        page = self.which_page(url, lang)

        page_processor = page['page_processor']
        if page_processor is None:
            raise Exception(f'page_processor is None')

        return page_processor(page, url, lang)

    def login_page_123_processor(self, page, url, lang):
        self.logger.debug(f'(page = {page}, url = {url}, lang = {lang}')

        # get (optional) form_field and form_button
        form_field_xpath = page.get('form_field_xpath')
        form_field_value = page.get('form_field_value')
        form_field_elem = None
        if form_field_xpath is not None:
            form_field_elem = self.wait_for_element(
                xpath=form_field_xpath,
            )
        form_button_elem = self.wait_for_element(
            xpath=page['form_button_text_xpath'].replace('{{form_button_text}}', page['texts'][lang]['form_button']),
            parent_xpath='../../..',
            parent_attr_check={
                'role': 'button',
            },
        )
        # enter form_field_value and click
        self.set_form_field_value_and_click(
            form_field_elem=form_field_elem, 
            form_field_value=form_field_value,
            form_button_elem=form_button_elem,
        )
        # return current_url
        current_url = self.wd.current_url
        self.logger.debug(f'current_url = {current_url}')
        return current_url

    def home_page(self, page, url, lang):
        self.logger.debug(f'page = {page}, url = {url}, lang = {lang}')

        login_button_elem = self.wait_for_element(
            xpath=page['login_button_text_xpath'].replace('{{login_button_text}}', page['texts'][lang]['login_button']),
            parent_xpath='../../..',
            parent_attr_check={
                'role': 'link',
            },
        )
        href = login_button_elem.get_attribute('href')
        self.logger.debug(f'href = {href}')

        # redirect
        self.wd.get(href)
        self.random_wait(min_wait_secs=self.min_wait_for_next_page)

        # return current_url
        current_url = self.wd.current_url
        self.logger.debug(f'current_url = {current_url}')
        return current_url

    def login_page_1_email(self, page, url, lang):
        return self.login_page_123_processor(page, url, lang=lang)

    def login_page_2_password(self, page, url, lang):
        return self.login_page_123_processor(page, url, lang=lang)

    def login_page_3_username(self, page, url, lang):
        return self.login_page_123_processor(page, url, lang=lang)

    def loggedin_page(self, page, url, lang):
        self.logger.debug(f'page = {page}, url = {url}, lang = {lang}')

        # locate search box
        xpath = '//form[@role="search"]/div/div/div/div/label/div/div/input[@type="text" and @data-testid="SearchBox_Search_Input"]'
        search_field_elem = self.wait_for_element(xpath)

        # type the search item
        self.elem_send_keys_wait(
            search_field_elem,
            self.search_for,
            before_min_wait=2,
            after_min_wait=self.min_wait_for_next_page,
        )

        # locate search result option buttons
        xpath = '//div[@role="listbox" and starts-with(@id, "typeaheadDropdown-")]/div[@role="option" and @data-testid="typeaheadResult"]/div[@role="button"]'
        button_elems = self.wait_for_elements(xpath)

        # find search_for in options
        xpath = './/span[text()[normalize-space()="' + self.search_for + '"]]'
        found = False
        for button_elem in button_elems:
            try:
                elem = button_elem.find_element(By.XPATH, xpath)
                found = True
                break
            except selenium_exceptions.NoSuchElementException:
                continue
        if not found:
            raise Exception(f'search_for = {search_for} not found in typeaheadDropdown')

        # click the found item
        self.elem_click_wait(
            button_elem,
            before_min_wait=2,
            after_min_wait=self.min_wait_for_next_page,
        )

        # slurp posts visibility_of_element_located
        xpath = '//article[@data-testid="tweet"]'
        posts = self.wait_for_elements(xpath)

        # dump posts
        post_html_items = []
        posts_len = len(posts)
        visible_posts_len = 0
        for post in posts:
            self.logger.debug(f'tag = {post.tag_name}')
            self.logger.debug(f'aria-labelledby = {post.get_attribute("aria-labelledby")}')
            self.logger.debug(f'data-testid = {post.get_attribute("data-testid")}')
            self.logger.debug(f'post.is_displayed() = {post.is_displayed()}')
            if not post.is_displayed():
                continue
            visible_posts_len += 1

            # expand to html
            post_html = post.get_attribute('outerHTML')
            post_html_items.append(post_html)

        if posts_len > 0:
            with open('posts.txt', 'w') as fo:
                fo.write('\n'.join(post_html_items))

        self.logger.debug(f'posts_len = {posts_len}')
        self.logger.debug(f'visible_posts_len = {visible_posts_len}')
        self.random_wait(min_wait_secs=5)
        sys.exit()

def main():

    # your account
    account = {
        'email': 'johndoe@example.com',
        'password': 'secret',
        'username': 'johndoe',
        'screen_name': 'JohnDoe',
    }
    # your search
    search_for = '@elonmusk'

    with webdriver.Chrome(options=webdriver_options) as wd:
        x = XGetPosts(
            logger=logger,
            wd=wd,
            account=account,
            search_for=search_for,
        )
        url = 'https://www.x.com'
        wd.get(url)
        x.random_wait(min_wait_secs=x.min_wait_for_next_page)
        url = wd.current_url

        while True:
            logger.debug(f'url = {url}')
            url = x.process_page(url)
        logger.debug(f'ready ')

if __name__ == '__main__':
    main()

Резюме

В этой заметке мы использовали Selenium для автоматизации входа в X и извлечения ряда сообщений. Для поиска элементов мы используем в основном (относительно) XPaths. Мы не пытаемся скрыть, что используем автоматизацию, но стараемся вести себя не хуже человека. Код поддерживает несколько языков. Код работает сегодня, но может не работать завтра из-за изменений в текстах и именовании.

Ссылки / кредиты

ChromeDriver - WebDriver for Chrome
https://sites.google.com/a/chromium.org/chromedriver/capabilities

Playwright
https://playwright.dev/python

Puppeteer
https://pptr.dev

ScrapingAnt - Puppeteer vs. Selenium - Which Is Better? + Bonus
https://scrapingant.com/blog/puppeteer-vs-selenium

Selenium with Python
https://selenium-python.readthedocs.io/index.html

Selenium with Python - 5. Waits
https://selenium-python.readthedocs.io/waits.html

Tracking Modified Selenium ChromeDriver
https://datadome.co/bot-management-protection/tracking-modified-selenium-chromedriver

undetected_chromedriver
https://github.com/ultrafunkamsterdam/undetected-chromedriver

WebScrapingAPI
https://www.webscrapingapi.com

WebScrapingAPI
https://www.webscrapingapi.com

ZenRows - How to Avoid Bot Detection with Selenium
https://www.zenrows.com/blog/selenium-avoid-bot-detection

Оставить комментарий

Комментируйте анонимно или войдите в систему, чтобы прокомментировать.

Комментарии

Оставьте ответ

Ответьте анонимно или войдите в систему, чтобы ответить.