# 任務調度神器Airflow怎么用呢
## 一、什么是Airflow
Apache Airflow 是一個由Python編寫的開源工作流管理平臺,最初由Airbnb開發并于2016年開源。它通過**有向無環圖(DAG)**的方式定義任務依賴關系,提供可視化界面監控任務執行狀態,是現代數據工程領域的核心工具之一。
### 核心特性
1. **可視化工作流**:Web UI直觀展示任務依賴和執行狀態
2. **靈活調度**:支持基于時間、外部事件等多種觸發方式
3. **可擴展性**:豐富的Operator和Hook支持各類系統集成
4. **編程式定義**:所有工作流通過Python代碼定義,便于版本控制
## 二、核心概念解析
### 1. DAG (Directed Acyclic Graph)
```python
from airflow import DAG
from datetime import datetime
dag = DAG(
'my_first_dag',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
)
常見類型:
- BashOperator
:執行Shell命令
- PythonOperator
:執行Python函數
- EmailOperator
:發送郵件通知
- 數據庫相關:PostgresOperator
, MySqlOperator
等
from airflow.operators.bash import BashOperator
task1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag
)
task1 >> task2 # 設置task2依賴task1
# 等價于
task2.set_upstream(task1)
# Python 3.8+環境
sudo apt-get install python3 python3-pip
# 推薦使用虛擬環境
python3 -m venv airflow_env
source airflow_env/bin/activate
pip install "apache-airflow==2.6.1" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.1/constraints-3.8.txt"
airflow db init
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email spiderman@example.com
airflow webserver -p 8080
airflow scheduler
構建每日執行的電商數據處理流程: 1. 下載前日訂單數據 2. 清洗轉換數據 3. 加載到數據倉庫 4. 生成銷售報表 5. 發送通知郵件
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
default_args = {
'owner': 'ecom_team',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
def process_data(**context):
import pandas as pd
execution_date = context['execution_date']
# 數據清洗邏輯
print(f"Processing data for {execution_date}")
with DAG(
'ecommerce_pipeline',
default_args=default_args,
start_date=datetime(2023, 6, 1),
schedule_interval='0 3 * * *', # 每天凌晨3點
catchup=False
) as dag:
download = BashOperator(
task_id='download_orders',
bash_command='wget https://example.com/orders/{{ ds }}.csv -O /tmp/orders.csv'
)
process = PythonOperator(
task_id='process_data',
python_callable=process_data,
provide_context=True
)
load = BashOperator(
task_id='load_to_dw',
bash_command='psql -c "COPY orders FROM \'/tmp/orders.csv\' CSV HEADER"'
)
report = BashOperator(
task_id='generate_report',
bash_command='python /scripts/generate_report.py {{ ds }}'
)
notify = EmailOperator(
task_id='send_email',
to='team@example.com',
subject='Daily Report {{ ds }}',
html_content="<h1>Daily ETL Completed</h1>"
)
download >> process >> load >> [report, notify]
from airflow.models import Variable
from airflow.hooks.base import BaseHook
# 使用變量
api_key = Variable.get("MY_API_KEY")
# 獲取數據庫連接
conn = BaseHook.get_connection("postgres_default")
conn_string = f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"
# 生產者任務
def push_data(**context):
context['ti'].xcom_push(key='my_key', value='secret_data')
# 消費者任務
def pull_data(**context):
value = context['ti'].xcom_pull(key='my_key')
for region in ['north', 'south', 'east', 'west']:
dag_id = f'process_{region}_data'
with DAG(dag_id, schedule_interval='@daily') as region_dag:
start = DummyOperator(task_id='start')
process = PythonOperator(
task_id=f'process_{region}',
python_callable=process_region,
op_kwargs={'region': region}
)
start >> process
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
def __init__(self, my_param, **kwargs):
super().__init__(**kwargs)
self.my_param = my_param
def execute(self, context):
print(f"Running with param: {self.my_param}")
pool
和priority_weight
scheduler
的parallelism
參數# 查看任務日志
airflow tasks log my_dag my_task 2023-07-01
# 測試單個任務
airflow tasks test my_dag my_task 2023-07-01
Fernet
密鑰加密數據庫連接執行器類型 | 適用場景 | 優缺點 |
---|---|---|
SequentialExecutor | 開發測試 | 簡單但無法并行 |
LocalExecutor | 小型生產環境 | 支持并行但單點故障 |
CeleryExecutor | 中大型生產環境 | 分布式但需要維護Redis |
KubernetesExecutor | 云原生環境 | 彈性伸縮但配置復雜 |
+-----------------+
| Load Balancer |
+--------+--------+
|
+----------------+----------------+
| |
+----------+---------+ +----------+---------+
| Web Server 1 | | Web Server 2 |
+----------+---------+ +----------+---------+
| |
+----------+---------+ +----------+---------+
| Scheduler 1 | | Scheduler 2 |
+--------------------+ +--------------------+
| |
+----------------+----------------+
|
+--------+--------+
| PostgreSQL HA |
+--------+--------+
|
+--------+--------+
| Redis Cluster |
+-----------------+
pip install apache-airflow-providers-google
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-microsoft-azure
SparkSubmitOperator
HiveOperator
KafkaProducerOperator
airflow-exporter
通過本文的全面介紹,您應該已經掌握了Airflow的核心概念和實戰技巧。建議從簡單DAG開始實踐,逐步構建復雜的數據管道。Airflow的強大之處在于其靈活性,隨著使用深入,您會發現它能解決各種自動化工作流需求。 “`
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。