Skip to content

Commit 4d3c5fa

Browse files
authored
Update IV Building production pipelines in Airflow.py
1 parent a46855e commit 4d3c5fa

File tree

1 file changed

+68
-0
lines changed

1 file changed

+68
-0
lines changed

Introduction to Airflow in Python/IV Building production pipelines in Airflow.py

+68
Original file line numberDiff line numberDiff line change
@@ -316,3 +316,71 @@ def process_data(**context):
316316
#|
317317
#|
318318
### Adding the final changes to your pipeline
319+
from airflow.models import DAG
320+
from airflow.contrib.sensors.file_sensor import FileSensor
321+
from airflow.operators.bash_operator import BashOperator
322+
from airflow.operators.python_operator import PythonOperator
323+
from airflow.operators.python_operator import BranchPythonOperator
324+
from airflow.operators.dummy_operator import DummyOperator
325+
from airflow.operators.email_operator import EmailOperator
326+
from dags.process import process_data
327+
from datetime import datetime, timedelta
328+
329+
# Update the default arguments and apply them to the DAG.
330+
331+
default_args = {
332+
'start_date': datetime(2019,1,1),
333+
'sla': timedelta(minutes=90)
334+
}
335+
336+
dag = DAG(dag_id='etl_update', default_args=default_args)
337+
338+
sensor = FileSensor(task_id='sense_file',
339+
filepath='/home/repl/workspace/startprocess.txt',
340+
poke_interval=45,
341+
dag=dag)
342+
343+
bash_task = BashOperator(task_id='cleanup_tempfiles',
344+
bash_command='rm -f /home/repl/*.tmp',
345+
dag=dag)
346+
347+
python_task = PythonOperator(task_id='run_processing',
348+
python_callable=process_data,
349+
provide_context=True,
350+
dag=dag)
351+
352+
353+
email_subject="""
354+
Email report for {{ params.department }} on {{ ds_nodash }}
355+
"""
356+
357+
358+
email_report_task = EmailOperator(task_id='email_report_task',
359+
to='sales@mycompany.com',
360+
subject=email_subject,
361+
html_content='',
362+
params={'department': 'Data subscription services'},
363+
dag=dag)
364+
365+
366+
no_email_task = DummyOperator(task_id='no_email_task', dag=dag)
367+
368+
369+
def check_weekend(**kwargs):
370+
dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
371+
# If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.
372+
if (dt.weekday() < 5):
373+
return 'email_report_task'
374+
else:
375+
return 'no_email_task'
376+
377+
378+
branch_task = BranchPythonOperator(task_id='check_if_weekend',
379+
python_callable=check_weekend,
380+
provide_context=True,
381+
dag=dag)
382+
383+
384+
sensor >> bash_task >> python_task
385+
386+
python_task >> branch_task >> [email_report_task, no_email_task]

0 commit comments

Comments
 (0)