airflow

airflow
Фото: Pexels
```html

Airflow — платформа с открытым исходным кодом для оркестрации рабочих процессов, созданная в Airbnb. Она позволяет писать на Python определения сложных конвейеров обработки данных, управлять их расписанием и следить за ходом выполнения. Всё это даёт вам автоматизацию, надёжность и возможность масштабирования.

Что такое Airflow и зачем он нужен

По сути, Airflow решает одну ключевую задачу: управление зависимостями между задачами. Представьте типичный сценарий: нужно загрузить данные из источника, трансформировать их, проверить качество и загрузить в хранилище. Без инструмента это быстро становится кошмаром — ошибки, пропущенные шаги, сложные зависимости.

Вот почему Airflow полезен:

  • Описывайте сложные зависимости между задачами прямо в коде
  • Запускайте задачи по расписанию автоматически
  • Видите логи и статус каждой задачи в реальном времени
  • Забудьте о ручных повторных запусках — Airflow перезапустит задачу при сбое
  • Визуализируйте весь ваш конвейер в веб-интерфейсе
  • Масштабируйте от десятков до миллионов задач без проблем

Архитектура Airflow

Airflow состоит из нескольких компонентов, которые работают вместе как одно целое.

Webserver (Веб-сервер)

Это ваш главный пульт управления. Через веб-интерфейс вы видите статус всех задач, читаете логи, проверяете метрики производительности. Можно вручную запустить или остановить рабочий процесс, если нужно.

Scheduler (Планировщик)

Это сердце системы. Планировщик постоянно следит за вашими DAG, проверяет расписание и решает, какие задачи нужно запустить и когда. Работает в фоне, как незримый помощник.

Executor (Исполнитель)

Исполнитель — это рабочая лошадка, которая фактически запускает задачи. Airflow предлагает несколько вариантов: SequentialExecutor для разработки, LocalExecutor для одного сервера, CeleryExecutor для распределённого выполнения, KubernetesExecutor для контейнеризованных окружений и другие.

Metadata Database (База метаданных)

Здесь хранится вся информация: описания DAG, статусы задач, логи, истории запусков. Для начала подойдёт SQLite, но когда вырастете — переходите на PostgreSQL или MySQL.

Worker (Рабочий процесс)

Рабочие выполняют фактические задачи. В зависимости от конфигурации они могут работать на одной машине или распределяться по целому кластеру.

DAG в Airflow

DAG (направленный ациклический граф) — это сердце Airflow. По сути, это набор задач и правила того, как они связаны между собой. "Направленный" значит, что задачи выполняются в определённом порядке. "Ациклический" значит, что нет циклов — задача не может зависеть от самой себя.

Каждый DAG должен содержать:

  • Уникальный идентификатор (dag_id)
  • Расписание выполнения (schedule_interval)
  • Набор задач (tasks)
  • Правила зависимостей между задачами
  • Параметры конфигурации — timeout, количество повторов и прочее

Пример простого DAG на Python:

Вы пишете DAG как обычный Python-скрипт. Каждая задача — это объект оператора. Зависимости определяются просто: `задача_1 >> задача_2` означает, что задача_2 ждёт завершения задача_1. Такой синтаксис делает код читаемым даже для сложных конвейеров.

Операторы и задачи в Airflow

Operator — это класс, который определяет одну конкретную задачу. Airflow поставляется с огромной библиотекой готовых операторов для всего, что только может понадобиться.

Самые полезные из них:

  • BashOperator — запускает bash-команды
  • PythonOperator — выполняет Python-функции
  • EmailOperator — отправляет письма
  • SQLOperator — выполняет SQL-запросы
  • HttpOperator — делает HTTP-запросы
  • S3Operator — работает с Amazon S3
  • DockerOperator — запускает Docker-контейнеры
  • KubernetesPodOperator — запускает поды в Kubernetes

Отдельно стоит упомянуть Sensor — это специальный оператор, который ждёт, пока что-то произойдёт. Например, FileSensor ждёт появления файла, SqlSensor проверяет результат запроса к БД.

Расписание и триггеры

Хотите, чтобы DAG запускался каждый день в 2 утра? Или каждый час? Airflow может это сделать. Расписание задаётся параметром schedule_interval — это может быть cron-выражение или объект timedelta.

Несколько примеров:

  • @daily — каждый день
  • @hourly — каждый час
  • '0 2 * * *' — ровно в 2:00 по cron-синтаксису
  • timedelta(days=1) — каждые 24 часа

Но расписание — не единственный способ. Можно запустить DAG вручную через веб-интерфейс или API. Есть и внешние триггеры — они позволяют запустить DAG, когда произойдёт какое-то событие во внешней системе.

Мониторинг и логирование

Airflow записывает всё, что происходит. Каждая задача оставляет подробные логи, которые хранятся на диске или в облаке (S3, GCS) и легко доступны через веб-интерфейс.

Вы можете:

  • Видеть статус каждой задачи — успешно, ошибка, пропущена или не выполнена из-за проблемы с зависимостью
  • Анализировать, сколько времени каждая задача занимает
  • Получать алерты при сбоях
  • Интегрировать с Prometheus и Grafana для мониторинга
  • Отправлять уведомления в Slack или по email при проблемах

Обработка ошибок и повторные попытки

Жизнь непредсказуема, и задачи иногда падают. Airflow имеет встроенные инструменты для этого. Если задача упала, Airflow может автоматически повторить её несколько раз с задержкой между попытками.

Параметры для управления этим:

  • retries — сколько раз повторить
  • retry_delay — как долго ждать перед повтором
  • retry_exponential_backoff — увеличивать задержку с каждой попыткой
  • max_retry_delay — не ждать больше этого времени

Кроме того, можно написать callback-функции, которые сработают при успехе или ошибке: on_success_callback и on_failure_callback. Например, отправить письмо в Slack при ошибке.

Масштабируемость Airflow

Airflow спроектирован с прицелом на рост. Для маленького проекта достаточно LocalExecutor на одном сервере. Когда вырастаете — переходите на CeleryExecutor и добавляете worker-узлы.

Масштабирование работает за счёт:

  • Распределения задач между несколькими рабочими процессами
  • Очередей для приоритизации — срочные задачи выполняются первыми
  • Горизонтального масштабирования — просто добавляйте worker-узлы
  • Оптимизации базы данных, где хранятся метаданные

Интеграция с другими системами

Airflow легко работает с популярными инструментами. Есть готовые провайдеры (расширения) для интеграции с:

  • AWS — S3, EC2, RDS, EMR
  • Google Cloud — BigQuery, Cloud Storage, Dataflow
  • Azure — Data Lake, Synapse, Blob Storage
  • Apache Spark и Hadoop
  • Kubernetes
  • Базами данных — PostgreSQL, MySQL, Oracle, MongoDB
  • Системами очередей — Celery, RabbitMQ
  • Slack, Email и прочими сервисами уведомлений

Преимущества и недостатки Airflow

Airflow стал популярным выбором, но это не универсальное решение. Вот честная оценка.

Преимущества

  • Полностью на Python — знаете Python, сразу начнёте писать DAG
  • Открытый исходный код с активным сообществом
  • Огромная библиотека операторов и провайдеров
  • Мощный веб-интерфейс, который действительно удобен
  • Гибкий и масштабируемый под разные нужды
  • Встроенная обработка ошибок и автоматические повторы

Недостатки

  • Нужно знать Python, чтобы писать DAG
  • Для простых задач может быть избыточным
  • Требует развёртывания и поддержки собственной инфраструктуры
  • Не подходит для real-time обработки потоков данных
  • Кривая обучения может быть крутой для новичков

Альтернативы Airflow

На рынке есть и другие инструменты, если Airflow не совсем подходит:

  • Prefect — более свежая платформа с лучшим пользовательским опытом
  • Dagster — заточена под data-driven рабочие процессы с продвинутой обработкой ошибок
  • Apache Beam — для потоков и батч-обработки данных
  • Kubernetes CronJobs — встроённое решение для Kubernetes
  • Luigi — лёгкая альтернатива от Spotify
  • Oozie — для экосистемы Hadoop

Установка и начало работы с Airflow

Установка простая — используйте pip:

После этого инициализируйте базу метаданных, запустите scheduler и webserver. Потом создавайте DAG как Python-файлы и кладите их в папку dags_folder (по умолчанию ~/airflow/dags).

Airflow сам найдёт новые DAG и сделает их доступными в веб-интерфейсе.

Лучшие практики при работе с Airflow

Чтобы не наступать на грабли, следуйте этим советам:

  • Делайте каждую задачу независимой и самодостаточной
  • Версионируйте ваши DAG
  • Документируйте, что делает каждый DAG и каждая задача
  • Устанавливайте timeout, чтобы зависшие задачи не ломали конвейер
  • Для паролей и ключей используйте переменные и подключения Airflow
  • Тестируйте DAG локально перед развёртыванием на production
  • Следите за производительностью scheduler и worker
  • Передавайте данные между задачами через XCom осторожно — это может стать узким местом
```
airflow — иллюстрация 2
Фото: Pexels
airflow — иллюстрация 3
Фото: Pexels

Частые вопросы

  • Чем отличается Airflow от простого cron?

    Cron — это встроенная утилита Unix для планирования простых команд. Airflow предоставляет намного больше функциональности: управление сложными зависимостями, мониторинг, логирование, обработка ошибок, веб-интерфейс и масштабируемость на распределённые системы. Airflow лучше подходит для сложных конвейеров данных.

  • Можно ли использовать Airflow для real-time обработки данных?

    Airflow не оптимален для real-time обработки потоков данных. Он разработан для batch-обработки и периодического выполнения задач. Для real-time обработки лучше использовать Apache Kafka, Apache Flink или Apache Spark Streaming.

  • Какой исполнитель выбрать для продакшена?

    Для продакшена обычно используется CeleryExecutor для распределённого выполнения на нескольких машинах или KubernetesExecutor для контейнеризованных окружений. Выбор зависит от инфраструктуры и требований к масштабируемости.

  • Как передавать данные между задачами в Airflow?

    Данные между задачами можно передавать через XCom (cross-communication). Одна задача может push значение в XCom, а другая задача может его pull. Также можно использовать общее хранилище (S3, базу данных) или переменные Airflow.

  • Нужна ли база данных для Airflow?

    Да, Airflow требует базу данных для хранения метаданных о DAG, задачах и их статусах. По умолчанию используется SQLite, но для продакшена рекомендуется PostgreSQL или MySQL для лучшей производительности и надёжности.

Поделиться