Основы ETL на примере работы с Superset, Airflow и ClickHouse

from airflow import DAG from airflow.decorators import dag, task from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook from datetime import datetime, timedelta import pandas as pd import numpy as np import re from time import sleep from selenium import webdriver from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager from bs4 import BeautifulSoup # Настройка опций Chrome options = webdriver.ChromeOptions() options.add_argument('--headless') # Работать в фоновом режиме options.add_argument('window-size=1920x1080') # Задаем большой размер окна options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-gpu') # Отключить GPU для совместимости options.add_argument('--incognito') # Режим инкогнито default_args = { 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), 'start_date': datetime(2024, 10, 29)} schedule_interval = '0 9 * * *' def get_page_content(articles): ''' Функция, которая получает на вход список содежимого тегов article, в которых содежится информация из карточек товара при быстром просмотре товара в каталоге Возвращает датафрейм со следующими полями: - id - артикул товара, string - name - название товара, string - price - цена, int - old_price - старая цена, float или nan - brand - название бренда, string - rate - рейтинг товарв, float или nan - estimate - количество оценок, float или nan - href - ссылка на карточку товара для получения полной информации, string ''' idx_lst, name_lst, price_lst, old_price_lst, brand_lst, rate_lst, estimate_lst, href_lst = [], [], [], [], [], [], [], [] for i in range(len(articles)): try: idx_lst.append(articles[i].attrs['data-nm-id']) except Exception as e: idx_lst.append(None) try: name_lst.append(articles[i].a.get('aria-label')) except Exception as e: name_lst.append(None) try: brand_lst.append(articles[i].find('span', class_='product-card__brand').get_text(strip=True)) except Exception as e: brand_lst.append(None) try: price_lst.append(int(''.join(filter(str.isdigit, articles[i].find('ins', class_='price__lower-price') .get_text(strip=True))))) except Exception as e: price_lst.append(None) try: old_price_lst.append(int(''.join(filter(str.isdigit, articles[i].find('del').get_text(strip=True))))) except Exception as e: old_price_lst.append(None) try: rate_lst.append(float(articles[i].find('span', class_='address-rate-mini address-rate-mini--sm') .get_text(strip=True))) except Exception as e: rate_lst.append(None) try: estimate_lst.append(int(''.join(filter(str.isdigit, articles[i].find('span', class_='product-card__count') .get_text(strip=True))))) except Exception as e: estimate_lst.append(None) try: href_lst.append(articles[i].a.get('href')) except Exception as e: href_lst.append(None) # у тех, у которых нет id - это из рекомендательной ленты, их можно удалить df = pd.DataFrame({'id': idx_lst, 'name': name_lst, 'price': price_lst, 'old_price': old_price_lst, 'brand': brand_lst, 'rate': rate_lst, 'estimate': estimate_lst, 'href': href_lst})\ .query('id==id').reset_index(drop=True) return df # Функция медленного скролла def slow_scroll(driver, step, delay): """Функция для медленного скролла страницы.""" # Получаем высоту страницы scroll_height = driver.execute_script("return document.body.scrollHeight") current_scroll_position = 0 while current_scroll_position < scroll_height: # Прокрутка вниз на заданное количество пикселей driver.execute_script(f"window.scrollBy(0, {step});") current_scroll_position += step sleep(delay) # Обновляем высоту страницы после скролла scroll_height = driver.execute_script("return document.body.scrollHeight") def map_to_base_brand(brand): if pd.isna(brand): return brand brand = brand.lower() for base_brand, pattern in brand_mapping.items(): if re.search(pattern, brand): return base_brand return brand # Вернуть оригинальное значение, если не найдено соответствие def map_to_base_color(color): if pd.isna(color): return color color = color.lower() for base_color, pattern in color_mapping.items(): if re.search(pattern, color): return base_color return 'другой' # Вернуть оригинальное значение, если не найдено соответствие def dict_to_records(data_dict): keys = list(data_dict.keys()) records = [tuple(data_dict[key][str(i)] for key in keys) for i in range(len(data_dict[keys[0]]))] return records columns_russian = ['Цвет', 'Объем пылесборника', 'Режимы уборки', 'Объем резервуара для воды', 'Модель', 'Гарантийный срок', 'Время работы (мин)', 'Емкость аккумулятора', 'Питание', 'Время автономной работы', 'Тип управления', 'Тип уборки', 'Тип пылесборника', 'Ориентация в пространстве', 'Мощность устройства', 'Выходной фильтр', 'Максимальный уровень звука/шума', 'Доп. опции робота пылесоса', 'Материал изделия', 'Индикация робота пылесоса', 'Допустимая высота препятствия', 'Вес товара без упаковки (г)', 'Комплектация', 'Страна производства', 'Вес товара с упаковкой (г)', 'Установка на зарядное устройство', 'Высота предмета', 'Ширина предмета', 'Глубина предмета', 'Длина упаковки', 'Высота упаковки', 'Ширина упаковки', 'Артикул', 'Объем пылесборника', 'Режимы уборки', 'Объем резервуара для воды', 'Модель', 'Мощность всасывания', 'Ограничитель зоны уборки', 'Charging_installation', 'Материал насадки', 'Количество предметов в упаковке', 'Тип насадки', 'Вес с упаковкой (кг)', 'Вес без упаковки (кг)', 'Мощность всасывания (Па)', 'Диаметр сопла'] # Новые названия колонок на латинице columns_latin = ['Color', 'Dustbin_capacity', 'Cleaning_modes', 'Water_tank_capacity', 'Model', 'Warranty_period', 'Runtime_min', 'Battery_capacity', 'Power_supply', 'Autonomy_time', 'Control_type', 'Cleaning_type', 'Dustbin_type', 'Spatial_orientation', 'Device_power', 'Output_filter', 'Max_noise_level', 'Additional_features', 'Material', 'Robot_vacuum_indication', 'Obstacle_height', 'Weight_g', 'Package_contents', 'Country_of_manufacture', 'Weight_with_packaging_g', 'Charging_installation', 'Item_height', 'Item_width', 'Item_depth', 'Package_length', 'Package_height', 'Package_width', 'Article', 'Dustbin_capacity', 'Cleaning_modes', 'Water_tank_capacity', 'Model', 'Suction_power', 'Cleaning_area_limiter', 'Charging_installation','Nozzle material', 'Number_of_items_in_package','Nozzle_type','Weight_with_packaging_kg', 'Weight_wo_packaging_kg','Suction_power_Pa','Nozzle diameter'] # Создание словаря для соответствия старых и новых названий колонок column_mapping = dict(zip(columns_russian, columns_latin)) brand_mapping = { 'xiaomi': r'(?i)\bксиоми\b|\bxiaomi robot\b|\bxioamii\b|\bс я о м и\b|\bx i a o m i\b|\bmi\b|\bxiaomi mi\b', 'samsung':r'(?i)\bsamsung\b|\bсамсун\b', 'honor': r'(?i)\bhonor\b|\bhonor choice\b|\bhonorchoice\b|\bхонор\b', 'polaris': r'(?i)\bpolaris\b|\bполярис\b', 'dreame': r'(?i)\bdreame\b', 'lydsto': r'(?i)\blydsto\b|\blidsto\b', 'roborock': r'(?i)\broborock\b', 'redmond': r'(?i)\bredmond\b', 'filterix': r'(?i)\bfilterix\b', 'hobot': r'(?i)\bhobot\b|\bхобот\b', 'mijia': r'(?i)\bmijia\b', 'futula': r'(?i)\bfutula\b', 'ilife': r'(?i)\bilife\b', } # Словарь основных цветов и соответствующих оттенков color_mapping = { 'белый': r'(?i)\bбелый\b|\bwhite\b', 'черный': r'(?i)\bчерный\b|\bграфит\b|\bblack\b', 'синий': r'(?i)\bсиний\b|\bголубой\b|\b蓝色\b|\b蓝色\b|\bdark blue\b', 'серый': r'(?i)\bсерый\b|\bсеребристый\b|\bсеребристо\b|\bсеро\b', 'красный': r'(?i)\bкрасный\b|\bярко-красный\b|\bкрасно\b', 'зеленый': r'(?i)\bзеленый\b|\bсалатовый\b', 'желтый': r'(?i)\bжелтый\b', 'бежевый': r'(?i)\bбежевый\b', 'оранжевый': r'(?i)\bоранжевый\b', 'розовый': r'(?i)\bрозовый\b', 'фиолетовый': r'(?i)\bфиолетовый\b', 'золотистый': r'(?i)\bзолотистый\b', 'металл': r'(?i)\bметалл\b|\bхром\b|\bсеребро\b|\bплатина\b', } # Устанавливаем параметры скрола для прогрузки элементов scroll_height = 15000 # Высота страницы step = 100 # количество пикселей за один шаг delay = 0.5 # задержка между шагами в секундах @dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False) def robot_vacuum__scraper_wb(): @task def scraping_data(): # Установка веб-драйвера и инициализация driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) print('driver created') sleep(2) try: # Очищаем кэш перед началом работы и обновляем страницу driver.delete_all_cookies() driver.refresh() driver.get('https://www.wildberries.ru/') # Открываем стартовую страницу sleep(5) ## Находим поисковую строку search_box = WebDriverWait(driver, 18).until(EC.presence_of_element_located((By.ID, 'searchInput'))) sleep(3) search_box.send_keys('робот-пылесос') # Вводим в поисковую строку запрос sleep(4) # Симулируем нажатие клавиши Enter search_box.send_keys(Keys.RETURN) sleep(10) slow_scroll(driver, step, delay) # Выполнение медленного скролла sleep(2) content = BeautifulSoup(driver.page_source, "html.parser") # Получаем содержимое первой страницы # Инициируем датафрейм, записывая в него содержимое первой страницы df = get_page_content(articles = content.find_all('article')).assign(number_page = 1) print(f"Получены данные со страницы 1 в {datetime.now().strftime('%H:%M:%S')}") # В цикле перелистываем страницы - оставляем 10 первых страниц в выдаче for page in range(2, 10): try: # Переходим на следующую страницу next_page = WebDriverWait(driver, 20).until(EC.element_to_be_clickable((By.LINK_TEXT, str(page)))) driver.execute_script("arguments[0].click();", next_page) sleep(2) slow_scroll(driver, step, delay) # Скроллим sleep(2) content = BeautifulSoup(driver.page_source, "html.parser") # Получаем содержимое page_content = get_page_content(articles = content.find_all('article')).assign(number_page = page) df = pd.concat([df, page_content]).reset_index(drop=True) print(f"Получены данные со страницы {page} в {datetime.now().strftime('%H:%M:%S')}") except Exception as e: print(f"Страницы {page} не существует, получена ошибка {repr(e)}") break # Удаляем дубликаты df = df.drop_duplicates('id').reset_index(drop=True) # Делаем обход каждой сохраненной страницы, получая подробные характеристики товара all_details = pd.DataFrame() # Последовательно проходим по всем ссылкам из списка пылесосов, открываем карточки и получаем детали for i, product_url in enumerate(df.href): try: driver.get(product_url) # Открытие страницы товара sleep(4) # Явное ожидание загрузки кнопки "Все характеристики и описание" button = WebDriverWait(driver, 14).until(EC.element_to_be_clickable((By.CLASS_NAME, 'product-page__btn-detail'))) print(f'goods {i}, waited load') driver.execute_script("arguments[0].scrollIntoView(true);", button) #Прокрутка к кнопке, чтобы она была в видимой области button.click() # Нажимаем на кнопку sleep(5) page_source = driver.page_source # Получение исходного кода страницы после загрузки soup = BeautifulSoup(page_source, 'html.parser') # Инициализация BeautifulSoup для парсинга страницы #print(f'goods {i}, start pardced') details = pd.DataFrame([[i.span.text for i in soup.find_all("td", class_="product-params__cell")]], columns=[i.span.text for i in soup.find_all("span", class_="product-params__cell-decor")]) select_index = pd.DataFrame({'col':details.rename(columns=column_mapping).columns}) .drop_duplicates('col').index.to_list() details = details.rename(columns=column_mapping).iloc[:, select_index] all_details = pd.concat([all_details, details]).reset_index(drop=True) except Exception as e: print(f'href {product_url}: {repr(e)}') # Закрытие веб-драйвера driver.quit() # Объединяем основной список с подробными характеристиками # Добавляем номер позиции в выдаче, удаляем дубликаты # Добавляем дату получения данных, преобразуем в строку, т.к. JSON не поддерживает сериализацию объектов Timestamp напрямую df = (df.drop_duplicates('id').rename(columns={'id':'Article'}) .merge(all_details.drop_duplicates('Article'), on='Article') .assign(number_position = lambda df: np.arange(len(df)))\ .assign(date = pd.to_datetime(datetime.now()).normalize().strftime('%Y-%m-%d'))) #df = df.where(pd.notnull(df), None) df = df.astype(object).where(pd.notnull(df), None) return df.to_dict() finally: print(f'quit driver') driver.quit() @task def preprocessing_data(records): print('Start prepocessing') df = pd.DataFrame(records) # Преобразование records в DataFrame #df['price'] = df['price'].astype('Int64') # Бренд df_brand = (df[['Article', 'brand']].rename(columns={'brand': 'old_brand'}) .assign(sup_brand=lambda df: df.old_brand.apply(map_to_base_brand), num_brand=lambda df: df.groupby('sup_brand')['sup_brand'].transform('count')) .assign(brand=lambda df: [brand if num_brand > 1 else "other" for brand, num_brand in zip(df.sup_brand, df.num_brand)])[['Article', 'brand']]) print('get brand') # Страна производства # Решила захардкодить значения, возможно это не самый лучший вариант country = pd.DataFrame({'Country_of_manufacture':['Китай', 'Россия', 'Тайвань', '中国', 'Вьетнам', 'Того', 'Республика Корея', 'Германия', 'China', 'Гонконг', 'Малайзия', 'КНДР', 'Южная Корея'], 'country_of_manufacture':['Китай', 'Россия', 'Тайвань', 'Китай', 'Вьетнам', 'Того', 'Республика Корея', 'Германия', 'Китай', 'Гонконг', 'Малайзия', 'КНДР', 'Республика Корея']}) df_country = df[['Article','Country_of_manufacture']].merge(country, on='Country_of_manufacture', how='left') .fillna('other')[['Article','country_of_manufacture']] print('get country') # Цвет корпуса df_color = df[['Article', 'Color']].rename(columns={'Color':'old_color'}) .assign(color = lambda df: df.old_color.apply(map_to_base_color))[['Article','color']] print('get color') # Специфика далее приведенных категориальных признаков в том, что каждый пылесос может иметь # несколько значений одновременно. # В качестве значений будем использовать флаги (значения 1 или 0) - есть опция или нет # Тип управления ('Control_type') # Механическое управление, вручную, кнопки df_manual_control = df[(df.Control_type.fillna('na').str.contains('кнопк|кнопочн|механич|на корпусе', case=False))] [['Article']].assign(manual_control = 1) print('get manual control') # Голосовой помощник - также используем значение поля Ориентация в пространстве ('Spatial_orientation') # и Доп. опции робота пылесоса ('Additional_features') pattern = 'умный дом|умная колонка|алис|голос|alex|марус|assistant' df_voice_assistant = df[(df.Control_type.fillna('na').str.contains(pattern, case=False))| (df.Spatial_orientation.fillna('na').str.contains(pattern, case=False))| (df.Additional_features.fillna('na').str.contains(pattern, case=False))]\ [['Article']].assign(voice_assistant = 1) print('get voice assistant') # Управление со смартфона - также берем из Доп. опции робота пылесоса ('Additional_features') df_app_control = df[(df.Control_type.fillna('na').str.contains('мобильн|смартфон|приложен|телефон', case=False))| (df.Additional_features.fillna('na').str.contains('мобильн|смартфон|приложен|телефон', case=False))]\ [['Article']].assign(app_control = 1) print('get app control') # Управление пультом - также берем из Доп. опции робота пылесоса ('Additional_features') df_remote_control = df[(df.Control_type.fillna('na').str.contains('пульт|ДУ', case=False))| (df.Additional_features.fillna('na').str.contains('пульт|ДУ', case=False))]\ [['Article']].assign(remote_control = 1) print('get remove control') # Тип уборки ('Cleaning_type') - также берем из Доп. опции робота пылесоса ('Additional_features') # Моющий df_wash_cleaning = df[(df.Cleaning_type.fillna('na').str.contains('мойк|моющ', case=False))| (df.Additional_features.fillna('na').str.contains('мойк|моющ', case=False))]\ [['Article']].assign(wash_cleaning = 1) print('get wash cleaning') # Сухая уборка df_dry_cleaning = df[(df.Cleaning_type.fillna('na').str.contains('сухая', case=False))| (df.Additional_features.fillna('na').str.contains('сухая', case=False))]\ [['Article']].assign(dry_cleaning = 1) print('get dry cleaning') # Влажная уборка df_wet_cleaning = df[(df.Cleaning_type.fillna('na').str.contains('влажная', case=False))| (df.Additional_features.fillna('na').str.contains('влажная', case=False))]\ [['Article']].assign(wet_cleaning = 1) print('get wet cleaning') # Сухая и влажная уборка одновременно df_dry_and_wet_cleaning = df[(df.Cleaning_type.fillna('na').str.contains('одновремен|Комбинирован|комплексн', case=False))] [['Article']].assign(dry_and_wet_cleaning = 1) print('get dry and wet cleaning') # Тип пылесборника ('Dustbin_type') # Мешок для мусора df_garbage_bag = df[(df.Dustbin_type.fillna('na').str.contains('мешок', case=False))] [['Article']].assign(garbage_bag = 1) print('get garbage bag') # Контейнер df_container = df[(df.Dustbin_type.fillna('na').str.contains(r'контейнер|емкость', case=False))] [['Article']].assign(container = 1) print('get container') # Аквафильтр df_aquafilter = df[(df.Dustbin_type.fillna('na').str.contains('аквафильтр', case=False))| (df.Dustbin_type.fillna('na').str.contains(r'(?=.*моющийся)(?=.*фильтр)', case=False))]\ [['Article']].assign(aquafilter = 1) print('get aquafilter') # Ориентация в пространстве ('Spatial_orientation') # также все признаки берем из Доп. опции робота пылесоса ('Additional_features'), т.к. они также могут быть там указаны # возврат на базу df_return_to_base = df[(df.Spatial_orientation.fillna('na').str.contains(r'(?=.*установк)(?=.*зарядн)', case=False))| (df.Additional_features.fillna('na').str.contains(r'(?=.*установк)(?=.*зарядн)', case=False))| (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*поиск)(?=.*зарядн)', case=False))| (df.Additional_features.fillna('na').str.contains(r'(?=.*поиск)(?=.*зарядн)', case=False))| (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*возврат)(?=.*баз)', case=False))| (df.Additional_features.fillna('na').str.contains(r'(?=.*возврат)(?=.*баз)', case=False))| (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*возврат)(?=.*зарядн)', case=False))| (df.Additional_features.fillna('na').str.contains(r'(?=.*возврат)(?=.*зарядн)', case=False))| (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*поиск)(?=.*баз)', case=False))| (df.Additional_features.fillna('na').str.contains(r'(?=.*поиск)(?=.*баз)', case=False))| (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*установк)(?=.*баз)', case=False))| (df.Additional_features.fillna('na').str.contains(r'(?=.*установк)(?=.*баз)', case=False))| (df.Spatial_orientation.fillna('na').str.contains(r'парковк', case=False))| (df.Additional_features.fillna('na').str.contains(r'парковк', case=False))]\ [['Article']].assign(return_to_base= 1) print('get return to base') # Виртуальная стена df_virtual_wall = df[(df.Spatial_orientation.fillna('na').str.contains('Ограничитель', case=False))| (df.Additional_features.fillna('na').str.contains('Ограничитель', case=False))| (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*запретн)(?=.*зон)', case=False))| (df.Additional_features.fillna('na').str.contains(r'(?=.*запретн)(?=.*зон)', case=False))| (df.Spatial_orientation.fillna('na').str.contains(r'(?=.*виртуальн)(?=.*стен)', case=False))| (df.Additional_features.fillna('na').str.contains(r'(?=.*виртуальн)(?=.*стен)', case=False))]\ [['Article']].assign(virtual_wall = 1) print('get virtual wall') # Составление карты df_space_map = df[(df.Spatial_orientation.fillna('na').str.contains('план|карт|зонирован', case=False))| (df.Additional_features.fillna('na').str.contains('план|карт|зонирован', case=False))]\ [['Article']].assign(space_map = 1) print('get space map') # Специальный режим для ковра, определение ковра df_carped_mode = df[(df.Spatial_orientation.fillna('na').str.contains('ковр|ковер', case=False))| (df.Additional_features.fillna('na').str.contains('ковр|ковер', case=False))]\ [['Article']].assign(carped_mode = 1) print('get carped mode') # Лазерные датчики df_laser_sensor = df[(df.Spatial_orientation.fillna('na').str.contains('лазер', case=False))| (df.Additional_features.fillna('na').str.contains('лазер', case=False))]\ [['Article']].assign(laser_sensor = 1) print('get laser sensor') # гироскоп df_giroscope = df[(df.Spatial_orientation.fillna('na').str.contains('гироскоп', case=False))| (df.Additional_features.fillna('na').str.contains('гироскоп', case=False))]\ [['Article']].assign(giroscope = 1) print('get giroscope') # лидар df_lidar = df[(df.Spatial_orientation.fillna('na').str.contains('лидар|LiDAR', case=False))| (df.Additional_features.fillna('na').str.contains('лидар|LiDAR', case=False))]\ [['Article']].assign(lidar = 1) print('get lidar') # Детекция края, ступенек, перепада высоты df_edge_detection = df [(df.Spatial_orientation.fillna('na').str.contains('ступен|высот|обрыв|края|край|перепад|паден', case=False))| (df.Additional_features.fillna('na').str.contains('ступен|высот|обрыв|края|край|перепад|паден', case=False))]\ [['Article']].assign(edge_detection = 1) print('get edge detection') # Определение препятствий df_obstancle_detection = df [(df.Spatial_orientation.fillna('na').str.contains('стен|столкновен|препятств|предмет|провод', case=False))| (df.Additional_features.fillna('na').str.contains('стен|столкновен|препятств|предмет|провод', case=False))]\ [['Article']].assign(obstancle_detection = 1) print('obstancle detection') # Доп. опции робота пылесоса ('Additional_features') # Планировщик df_scheduler = df[(df.Additional_features.fillna('na') .str.contains('таймер|расписан|график|автовключен|автоотключен|автовыключен|отложенный старт', case=False))]\ [['Article']].assign(scheduler = 1) print('get scheduler') # возобновление уборки после подзарядки df_continued_after_charging = df[(df.Additional_features.fillna('na') .str.contains('после зарядки|после подзарядки', case=False))]\ [['Article']].assign(continued_after_charging= 1) print('get continued after charging') # Турбощетка df_turbo_brush = df[(df.Additional_features.fillna('na').str.contains('турборежим', case=False))| (df.Additional_features.fillna('na').str.contains('Турбощетка', case=False))]\ [['Article']].assign(turbo_brush = 1) print('get turbo brush') # Бампер df_bamper = df[(df.Additional_features.fillna('na').str.contains('бампер', case=False))] [['Article']].assign(bamper = 1) print('get bamper') # уф лампа pattern = 'уф лампа|уфлампа|УФ стерилизация|лампа для стерилизации|УФ-обеззараживание|УФ-лампа|Уфампа'+ '|УФ дезинфекция|Ультрафиолетовая лампа' df_uv_lamp = df[(df.Additional_features.fillna('na').str.contains(pattern, case=False))] [['Article']].assign(uv_lamp = 1) print('get uv lamp') # видеокамера df_video_camera = df[(df.Additional_features.fillna('na').str.contains('видео|камер', case=False))] [['Article']].assign(video_camera = 1) print('get video camera') # самоочистка df_self_cleaning = df[(df.Additional_features.fillna('na').str.contains('самоочистк|Автоматическая очистка', case=False))] [['Article']].assign(self_cleaning = 1) print('get self cleaning') # Основные числовые признаки с фиксированными единицами измерения # распаршиваем одним циклом ''' Длина упаковки ('Package_length') Высота упаковки ('Package_height') Ширина упаковки ('Package_width') Ширина предмета ('Item_width',) Высота предмета ('Item_height') Вес товара с упаковкой (г) ('Weight_with_packaging_g') Максимальный уровень звука/шума ('Max_noise_level') Объем пылесборника ('Dustbin_capacity') ''' col_name=['Package_length', 'Package_height', 'Package_width','Item_width','Item_height', 'Weight_with_packaging_g', 'Dustbin_capacity', 'Max_noise_level'] new_col_name=['package_length_sm', 'package_height_sm', 'package_width_sm', 'item_width_sm','item_height_sm', 'weight_with_packaging_g','dustbin_capacity_l', 'max_noise_level_db'] measures = [' см', ' см', ' см', ' см', ' см', ' г', ' л', ' дБ'] low_borders = [5, 5, 5, 4, 4, 500, 0.05, 0] upper_borders = [200, 200, 200, 100, 100, 20000, 3, 100] df_fixed_measure = df[['Article']].copy() for col, new_col, meas, low, upper in zip(col_name, new_col_name, measures, low_borders, upper_borders): values = [] for val in df[col]: try: values.append(float(val.split(meas)[0])) except Exception as e: values.append(None) df_fixed_measure[new_col] = values df_fixed_measure.loc[(df_fixed_measure[new_col] < low)|(df_fixed_measure[new_col] > upper), new_col] = None print(f'get {col}') # Распаршиваем числовые значения с разными единицами измерений ''' Мощность устройства ('Device_power') - может быть выражена в Вт, Па, причем ед. изм-я могут пыть написаны разными регистрами, кириллицей или латиницей, с разными символами Гарантийный срок ('Warranty_period') - может быть в днях, годах, месяцах и вообще произвольным текстом Емкость аккумулятора ('Battery_capacity') - может быть в в мАч, Ач ''' # Мощность устройства ('Device_power') # Регулярные выражения для поиска значений watt_pattern = re.compile(r'(\d+(\.\d+)?)(?:\s*[WwВвтт]+|W|Вт)?') pa_pattern = re.compile(r'(\d+(\.\d+)?)\s*(?:Па|Pa|пА|pa|PA|па|ПА)') # Функция конвертации Па в Вт def pa_to_watt(pa): return pa / 71 # Результирующий список device_power_watt = [] Article = [] for article, item in zip(df.Article, df.Device_power): try: if pd.isna(item): device_power_watt.append(None) Article.append(article) else: # Поиск значений в паскалях pa_match = pa_pattern.search(item) if pa_match: pa_value = float(pa_match.group(1)) device_power_watt.append(pa_to_watt(pa_value)) Article.append(article) else: # Поиск значений в ваттах watt_match = watt_pattern.search(item) if watt_match: device_power_watt.append(float(watt_match.group(1))) Article.append(article) else: # Если не удалось распарсить, добавляем np.nan device_power_watt.append(None) Article.append(article) except Exception as e: device_power_watt.append(None) Article.append(article) df_device_power_watt = pd.DataFrame({'Article': Article, 'device_power_watt': device_power_watt}) print('get device power watt') # Гарантийный срок ('Warranty_period') warranty_days = [] pattern = re.compile(r'(\d+|\b[Оо]дин\b|\b[Оо]дного\b|\b[Пп]олгода\b|\b1\b|\( один\)|\(один\))[\s\-·/]*([гГ]од|[мM][еЕе]с(?:[еЕе]ц)?|д[еЕе]н|г|г\.|м|дней|день|дней)', re.IGNORECASE) for entry in df['Warranty_period']: try: if pd.isna(entry): warranty_days.append(None) continue match = pattern.search(entry) if match: value, unit = match.groups() if value.lower() in ['один', 'одного', '1', '( один)', '(один)']: value = 1 elif value.lower() == 'полгода': value = 0.5 * 12 else: value = int(value) if 'год' in unit.lower() or 'г' in unit.lower(): days = value * 365 elif 'мес' in unit.lower() or 'м' in unit.lower(): days = value * 30 elif 'ден' in unit.lower() or 'дней' in unit.lower() or 'день' in unit.lower(): days = value else: days = None warranty_days.append(days) else: warranty_days.append(None) except Exception as e: warranty_days.append(None) df_warranty_days = df[['Article']].assign(warranty_days = warranty_days) print('get warranty days') # Емкость аккумулятора ('Battery_capacity') battery_capacity_measure = [] battery_capacity = [] # Регулярные выражения для поиска единиц измерения mAh_pattern = re.compile(r'(?:м|m)[\s\-*·/]*[Aа][\s\-*·/]*[hч]', re.IGNORECASE) Ah_pattern = re.compile(r'[Aа][\s\-*·/]*[hч]', re.IGNORECASE) #V_pattern = re.compile(r'[Вv]', re.IGNORECASE) # Регулярное выражение для поиска числовых значений number_pattern = re.compile(r'\d+(?:[\.,]\d+)?') for item in df.Battery_capacity: try: if isinstance(item, str): # Поиск единиц измерения if mAh_pattern.search(item): battery_capacity_measure.append('mAh') elif Ah_pattern.search(item): battery_capacity_measure.append('Ah') else: battery_capacity_measure.append(None) # Поиск числовых значений numbers = number_pattern.findall(item) if numbers: battery_capacity.append(float(numbers[0].replace(',', '.'))) else: battery_capacity.append(None) else: battery_capacity_measure.append(None) battery_capacity.append(None) except Exception as e: print(repr(e)) print(item) battery_capacity_measure.append(None) battery_capacity.append(None) df_battery_capacity = df[['Article']].assign(battery_capacity=battery_capacity) .assign(battery_capacity_measure=battery_capacity_measure) df_battery_capacity['battery_capacity_mAh'] = np.where(df_battery_capacity.battery_capacity_measure=="Ah", df_battery_capacity.battery_capacity*1000, df_battery_capacity.battery_capacity) df_battery_capacity = df_battery_capacity[['Article', 'battery_capacity_mAh']] print('get battery capacity') new_df = (df[['Article','name','price','old_price','rate','estimate','number_page','number_position','date']] .merge(df_country, on='Article', how='left').merge(df_brand, on='Article', how='left') .merge(df_color, on='Article', how='left').merge(df_manual_control, on='Article', how='left') .merge(df_voice_assistant, on='Article', how='left').merge(df_app_control, on='Article', how='left') .merge(df_remote_control, on='Article', how='left').merge(df_wash_cleaning, on='Article', how='left') .merge(df_dry_cleaning, on='Article', how='left').merge(df_wet_cleaning, on='Article', how='left') .merge(df_dry_and_wet_cleaning, on='Article', how='left').merge(df_garbage_bag, on='Article', how='left') .merge(df_container, on='Article', how='left').merge(df_aquafilter, on='Article', how='left') .merge(df_return_to_base, on='Article', how='left').merge(df_virtual_wall, on='Article', how='left') .merge(df_space_map, on='Article', how='left').merge(df_carped_mode, on='Article', how='left') .merge(df_laser_sensor, on='Article', how='left').merge(df_giroscope, on='Article', how='left') .merge(df_lidar, on='Article', how='left').merge(df_edge_detection, on='Article', how='left') .merge(df_obstancle_detection, on='Article', how='left').merge(df_continued_after_charging, on='Article', how='left') .merge(df_turbo_brush, on='Article', how='left').merge(df_bamper, on='Article', how='left') .merge(df_uv_lamp, on='Article', how='left').merge(df_video_camera, on='Article', how='left') .merge(df_self_cleaning, on='Article', how='left').merge(df_fixed_measure, on='Article', how='left') .merge(df_device_power_watt, on='Article', how='left').merge(df_warranty_days, on='Article', how='left') .merge(df_battery_capacity, on='Article', how='left') # В перекодированных в бинарные признаках заменим пропуск на 0 .fillna({'manual_control': 0, 'voice_assistant': 0, 'app_control': 0, 'remote_control': 0, 'wash_cleaning': 0, 'dry_cleaning': 0, 'wet_cleaning': 0, 'dry_and_wet_cleaning': 0, 'garbage_bag': 0, 'container': 0, 'aquafilter': 0, 'return_to_base': 0, 'virtual_wall': 0, 'space_map': 0, 'carped_mode': 0, 'laser_sensor': 0, 'giroscope': 0, 'lidar': 0, 'edge_detection': 0, 'obstancle_detection': 0, 'continued_after_charging': 0, 'turbo_brush': 0, 'bamper': 0, 'uv_lamp': 0, 'video_camera': 0, 'self_cleaning': 0,'rate':0,'estimate':0}) ) print('union df') # Очистим от аксессуаров patern = 'Детский|фильтр|салфет|щетк|Тряпк|ручной|насадк|Аксессуар|Сменн|Фибр|щеток'+ '|Щётк|Комплект|Накладк|аксесуар|Мешок|датчик|Рампа|автомобил|Батарейка' new_df = new_df[~new_df.name.str.contains(patern, case=False)].query('price>700').reset_index(drop=True).replace({np.nan: None}) print('cleaned df') return new_df.to_dict() @task def load_data(records): # Создаем DataFrame из словаря records df = pd.DataFrame.from_dict(records) # Приведение типов для вставки в ClickHouse df['Article'] = df['Article'].astype(str) df['name'] = df['name'].astype(pd.StringDtype()).fillna('') df['price'] = df['price'].astype('Int64') df['old_price'] = df['old_price'].astype('float64') df['rate'] = df['rate'].astype('float64') df['estimate'] = df['estimate'].astype('float64') df['number_page'] = df['number_page'].astype('Int64') df['number_position'] = df['number_position'].astype('Int64') df['date'] = pd.to_datetime(df['date']).dt.date df['country_of_manufacture'] = df['country_of_manufacture'].astype(pd.StringDtype()).fillna('') df['brand'] = df['brand'].astype(pd.StringDtype()).fillna('') df['color'] = df['color'].astype(pd.StringDtype()).fillna('') # Для столбцов с типом Nullable(UInt8) - приведение к типу Int64 с замещением None на 0 uint8_columns = [ 'manual_control', 'voice_assistant', 'app_control', 'remote_control', 'wash_cleaning', 'dry_cleaning', 'wet_cleaning', 'dry_and_wet_cleaning', 'garbage_bag', 'container', 'aquafilter', 'return_to_base', 'virtual_wall', 'space_map', 'carped_mode', 'laser_sensor', 'giroscope', 'lidar', 'edge_detection', 'obstancle_detection', 'continued_after_charging', 'turbo_brush', 'bamper', 'uv_lamp', 'video_camera', 'self_cleaning' ] for col in uint8_columns: df[col] = df[col].astype('Int64') # Для столбцов с типом Nullable(Float64) float64_columns = [ 'package_length_sm', 'package_height_sm', 'package_width_sm', 'item_width_sm', 'item_height_sm', 'weight_with_packaging_g', 'dustbin_capacity_l', 'max_noise_level_db', 'device_power_watt', 'warranty_days', 'battery_capacity_mAh' ] for col in float64_columns: df[col] = df[col].astype('float64') # Преобразуем DataFrame в список кортежей records_tuples = [tuple(x) for x in df.to_numpy()] ch_hook = ClickHouseHook(clickhouse_conn_id='clickhouse_default') ch_hook.execute('''CREATE DATABASE IF NOT EXISTS scraper_wb''') ch_hook.execute('''CREATE TABLE IF NOT EXISTS scraper_wb.robot_vacuum ( Article String, name Nullable(String), price Nullable(Int64), old_price Nullable(Float64), rate Nullable(Float64), estimate Nullable(Float64), number_page Nullable(Int64), number_position Nullable(Int64), date Date, country_of_manufacture Nullable(String), brand Nullable(String), color Nullable(String), manual_control Nullable(UInt8), voice_assistant Nullable(UInt8), app_control Nullable(UInt8), remote_control Nullable(UInt8), wash_cleaning Nullable(UInt8), dry_cleaning Nullable(UInt8), wet_cleaning Nullable(UInt8), dry_and_wet_cleaning Nullable(UInt8), garbage_bag Nullable(UInt8), container Nullable(UInt8), aquafilter Nullable(UInt8), return_to_base Nullable(UInt8), virtual_wall Nullable(UInt8), space_map Nullable(UInt8), carped_mode Nullable(UInt8), laser_sensor Nullable(UInt8), giroscope Nullable(UInt8), lidar Nullable(UInt8), edge_detection Nullable(UInt8), obstancle_detection Nullable(UInt8), continued_after_charging Nullable(UInt8), turbo_brush Nullable(UInt8), bamper Nullable(UInt8), uv_lamp Nullable(UInt8), video_camera Nullable(UInt8), self_cleaning Nullable(UInt8), package_length_sm Nullable(Float64), package_height_sm Nullable(Float64), package_width_sm Nullable(Float64), item_width_sm Nullable(Float64), item_height_sm Nullable(Float64), weight_with_packaging_g Nullable(Float64), dustbin_capacity_l Nullable(Float64), max_noise_level_db Nullable(Float64), device_power_watt Nullable(Float64), warranty_days Nullable(Float64), battery_capacity_mAh Nullable(Float64) ) ENGINE = MergeTree() PRIMARY KEY (Article, date) ORDER BY (Article, date)''') ch_hook.execute('INSERT INTO scraper_wb.robot_vacuum VALUES', records_tuples) df = scraping_data() transformed_df = preprocessing_data(df) load_data(transformed_df) robot_vacuum__scraper_wb = robot_vacuum__scraper_wb()
Информация на этой странице взята из источника: https://habr.com/ru/companies/beget/articles/928712/