본문 바로가기

Apache Airflow

[Airflow] API기반 DAG Trigger 하기

- DAG예시 -
slack으로 메세지를 보내는 간단한 작업을 수행

 

사용 가능한 API 목록

 

Airflow REST API

 

airflow.apache.org


1. Auth 설정

1-1. 컨테이너 생성 시 설정 (docker-compose.yaml)

AIRFLOW__API__AUTH_BACKENDS 환경 변수 추가

  • Airflow 2 버전의 기본값은 외부에서 접근 불가
  • id, pw를 입력하여 접근 가능하도록 basic_auth로 변경

1-2. 컨테이너 내부에서 설정

  • 컨테이너 접속
docker exec -it <CONTAINER_ID> /bin/bash

 

  • airflow.cfg 파일 변경
vim airflow.cfg

auth_backends = airflow.api.auth.backend.basic_auth로 변경


2. DAG 작성

import json
import requests
import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="slack",
    start_date=datetime.datetime(2023, 6, 19),
)


def alert_to_slack(url, **context):
    data = {"text": "hello"}
    req = requests.post(
        url=url,
        data=json.dumps(data)
    )


slack_alarm = PythonOperator(
    task_id='slack_alarm',
    python_callable=alert_to_slack,
    provide_context=True,
    op_args=["<SLACK_HOOK_URL>"],
    dag=dag
)

slack_alarm

 

<SLACK_HOOK_URL> 확인 방법

 

Slack Webhook(API) 사용하여 메세지 보내기

1. Slack에서 Incoming WebHooks 추가 2. 메시지를 보낼 채널 설정 3. 웹후크 URL 복사 URL 형식은 다음과 같다. 4. 메세지 전송을 위한 코드 작성 (python) import json import requests def send_message(url): data = {"text": "he

ssun-g.tistory.com


3. DAG Pause/Unpause

위 그림과 같이 Unpause 상태여야 트리거 발생 시 정상적으로 동작한다.

Pause/Unpause 방법

  1. Airflow UI에 접속하여 직접 Pause/Unpause
    • http://localhost:8080에 접속 후 DAG를 찾아서 직접 Pause/Unpause 버튼을 클릭하여 상태를 변경한다.
      (주소는 사용자에 따라 다를 수 있음)
  2. API를 사용하여 Pause/Unpause
curl -u <ID>:<PASSWORD> \
     -X PATCH "http://<IP>:<PORT>/api/v1/dags/<DAG_ID>?update_mask=is_paused" \
     -H "Content-Type: application/json" \
     -d '{"is_paused": false}'

"is_paused": false인 경우 Unpause

"is_paused": true인 경우 Pause

 

예시

curl -u admin:admin \
     -X PATCH "http://localhost:8080/api/v1/dags/slack?update_mask=is_paused" \
     -H "Content-Type: application/json" \
     -d '{"is_paused": false}'

 

이를 Python Code로 변경하면 다음과 같다. (response check 부분은 없어도 된다.)

import requests

api_endpoint = 'http://localhost:8080/api/v1/dags/slack?update_mask=is_paused'

headers = {
    'Authorization': 'Basic YWRtaW46YWRtaW4=',  # base64 encoding
    'Content-Type': 'application/json'
}

json_data = {
    'is_paused': False
}

# headers 사용
response = requests.patch(api_endpoint,
                          headers=headers,
                          json=json_data)

# headers 미사용
# response = requests.patch(api_endpoint,
#                           json=json_data,
#                           auth=('admin', 'admin'))  # id, pw

# response check
if response.status_code == 200:
    print('DAG run successful.')
else:
    print('Failed to trigger DAG run. Status code:', response.status_code)
    print('Error:', response.json())
  • line 6 ('Authorization': 'Basic YWRtaW46YWRtaW4=')
    • Basic 뒤에 오는 문자열은 id:password를 base64 인코딩한 결과를 붙여넣는다.
    • 예시 (id: admin / password: admin)

4. Airflow DAG Trigger

curl -u <ID>:<PASSWORD> \
     -X POST "http://<IP>:<PORT>/api/v1/dags/<DAG_ID>/dagRuns" \
     -H "Content-Type: application/json" \
     -d '{"conf": {}}'

 

예시

curl -u admin:admin \
     -X POST "http://localhost:8080/api/v1/dags/slack/dagRuns" \
     -H "Content-Type: application/json" \
     -d '{"conf": {}}'

 

이를 Python Code로 변경하면 다음과 같다.

import requests

api_endpoint = 'http://localhost:8080/api/v1/dags/slack/dagRuns'

headers = {
    'Authorization': 'Basic YWRtaW46YWRtaW4=',  # base64 encoding
    'Content-Type': 'application/json'
}

json_data = {
    'conf': {}
}

# headers 사용
response = requests.post(api_endpoint,
                         headers=headers,
                         json=json_data)

# headers 미사용
# response = requests.post(api_endpoint,
#                          json=json_data,
#                          auth=('admin', 'admin'))  # id, pw

# response check
if response.status_code == 200:
    print('DAG run successful.')
else:
    print('Failed to trigger DAG run. Status code:', response.status_code)
    print('Error:', response.json())

 

dag_id는 DAG 코드에서 확인 가능