- DAG예시 -
slack으로 메세지를 보내는 간단한 작업을 수행
사용 가능한 API 목록
Airflow REST API
airflow.apache.org
1. Auth 설정
1-1. 컨테이너 생성 시 설정 (docker-compose.yaml)

AIRFLOW__API__AUTH_BACKENDS 환경 변수 추가
- Airflow 2 버전의 기본값은 외부에서 접근 불가
- id, pw를 입력하여 접근 가능하도록 basic_auth로 변경
1-2. 컨테이너 내부에서 설정
- 컨테이너 접속
docker exec -it <CONTAINER_ID> /bin/bash
- airflow.cfg 파일 변경
vim airflow.cfg

auth_backends = airflow.api.auth.backend.basic_auth로 변경
2. DAG 작성
import json
import requests
import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="slack",
start_date=datetime.datetime(2023, 6, 19),
)
def alert_to_slack(url, **context):
data = {"text": "hello"}
req = requests.post(
url=url,
data=json.dumps(data)
)
slack_alarm = PythonOperator(
task_id='slack_alarm',
python_callable=alert_to_slack,
provide_context=True,
op_args=["<SLACK_HOOK_URL>"],
dag=dag
)
slack_alarm
<SLACK_HOOK_URL> 확인 방법
Slack Webhook(API) 사용하여 메세지 보내기
1. Slack에서 Incoming WebHooks 추가 2. 메시지를 보낼 채널 설정 3. 웹후크 URL 복사 URL 형식은 다음과 같다. 4. 메세지 전송을 위한 코드 작성 (python) import json import requests def send_message(url): data = {"text": "he
ssun-g.tistory.com
3. DAG Pause/Unpause

위 그림과 같이 Unpause 상태여야 트리거 발생 시 정상적으로 동작한다.
Pause/Unpause 방법
- Airflow UI에 접속하여 직접 Pause/Unpause
- http://localhost:8080에 접속 후 DAG를 찾아서 직접 Pause/Unpause 버튼을 클릭하여 상태를 변경한다.
(주소는 사용자에 따라 다를 수 있음)
- http://localhost:8080에 접속 후 DAG를 찾아서 직접 Pause/Unpause 버튼을 클릭하여 상태를 변경한다.
- API를 사용하여 Pause/Unpause
curl -u <ID>:<PASSWORD> \
-X PATCH "http://<IP>:<PORT>/api/v1/dags/<DAG_ID>?update_mask=is_paused" \
-H "Content-Type: application/json" \
-d '{"is_paused": false}'
"is_paused": false인 경우 Unpause
"is_paused": true인 경우 Pause
예시
curl -u admin:admin \
-X PATCH "http://localhost:8080/api/v1/dags/slack?update_mask=is_paused" \
-H "Content-Type: application/json" \
-d '{"is_paused": false}'
이를 Python Code로 변경하면 다음과 같다. (response check 부분은 없어도 된다.)
import requests
api_endpoint = 'http://localhost:8080/api/v1/dags/slack?update_mask=is_paused'
headers = {
'Authorization': 'Basic YWRtaW46YWRtaW4=', # base64 encoding
'Content-Type': 'application/json'
}
json_data = {
'is_paused': False
}
# headers 사용
response = requests.patch(api_endpoint,
headers=headers,
json=json_data)
# headers 미사용
# response = requests.patch(api_endpoint,
# json=json_data,
# auth=('admin', 'admin')) # id, pw
# response check
if response.status_code == 200:
print('DAG run successful.')
else:
print('Failed to trigger DAG run. Status code:', response.status_code)
print('Error:', response.json())
- line 6 ('Authorization': 'Basic YWRtaW46YWRtaW4=')
- Basic 뒤에 오는 문자열은 id:password를 base64 인코딩한 결과를 붙여넣는다.
- 예시 (id: admin / password: admin)
4. Airflow DAG Trigger
curl -u <ID>:<PASSWORD> \
-X POST "http://<IP>:<PORT>/api/v1/dags/<DAG_ID>/dagRuns" \
-H "Content-Type: application/json" \
-d '{"conf": {}}'
예시
curl -u admin:admin \
-X POST "http://localhost:8080/api/v1/dags/slack/dagRuns" \
-H "Content-Type: application/json" \
-d '{"conf": {}}'
이를 Python Code로 변경하면 다음과 같다.
import requests
api_endpoint = 'http://localhost:8080/api/v1/dags/slack/dagRuns'
headers = {
'Authorization': 'Basic YWRtaW46YWRtaW4=', # base64 encoding
'Content-Type': 'application/json'
}
json_data = {
'conf': {}
}
# headers 사용
response = requests.post(api_endpoint,
headers=headers,
json=json_data)
# headers 미사용
# response = requests.post(api_endpoint,
# json=json_data,
# auth=('admin', 'admin')) # id, pw
# response check
if response.status_code == 200:
print('DAG run successful.')
else:
print('Failed to trigger DAG run. Status code:', response.status_code)
print('Error:', response.json())
dag_id는 DAG 코드에서 확인 가능
