Skip to content

Commit 0d09b88

Browse files
coder2jcoder2j
coder2j
authored and
coder2j
committed
MINOR: Add tutorial of dag with hooks s2 and postgresql
1 parent cce4010 commit 0d09b88

File tree

3 files changed

+1071
-13
lines changed

3 files changed

+1071
-13
lines changed

README.md

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
1-
# Apache Airflow Tutorial Series [YouTube](https://www.youtube.com/watch?v=z7xyNOF8tak&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT)
1+
# Apache Airflow Tutorial Series [YouTube](https://www.youtube.com/watch?v=z7xyNOF8tak&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT)
22
## Updated Tutorial Episode
3-
1. [Introduction and Local Installation](https://youtu.be/z7xyNOF8tak)
4-
2. [Get Airflow running in Docker](https://youtu.be/J6azvFhndLg)
5-
3. [Airflow Core Concepts in 5 mins](https://youtu.be/mtJHMdoi_Gg)
6-
4. [Airflow Task Lifecycle and Basic Architecture](https://youtu.be/UFsCvWjQT4w)
7-
5. [Airflow DAG with BashOperator](https://youtu.be/CLkzXrjrFKg)
8-
6. [Airflow DAG with PythonOperator and XComs](https://youtu.be/IumQX-mm20Y)
9-
7. [Airflow TaskFlow API](https://youtu.be/9y0mqWsok_4)
10-
8. [Airflow Catchup and Backfill](https://youtu.be/OXOiUeHOQ-0)
11-
9. [Schedule Airflow DAG with Cron Expression](https://youtu.be/tpuovQFUByk)
12-
10. [Airflow Connection and PostgresOperator](https://youtu.be/S1eapG6gjLU)
13-
11. [Add Python Dependencies via Airflow Docker Image Extending and Customizing](https://youtu.be/0UepvC9X4HY)
14-
12. [AWS S3 Key Sensor Operator](https://youtu.be/vuxrhipJMCk)
3+
1. [Introduction and Local Installation](https://www.youtube.com/watch?v=z7xyNOF8tak&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=1)
4+
2. [Get Airflow running in Docker](https://www.youtube.com/watch?v=J6azvFhndLg&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=2)
5+
3. [Airflow Core Concepts in 5 mins](https://www.youtube.com/watch?v=mtJHMdoi_Gg&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=3)
6+
4. [Airflow Task Lifecycle and Basic Architecture](https://www.youtube.com/watch?v=UFsCvWjQT4w&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=4)
7+
5. [Airflow DAG with BashOperator](https://www.youtube.com/watch?v=CLkzXrjrFKg&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=5)
8+
6. [Airflow DAG with PythonOperator and XComs](https://www.youtube.com/watch?v=IumQX-mm20Y&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=6)
9+
7. [Airflow TaskFlow API](https://www.youtube.com/watch?v=9y0mqWsok_4&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=7)
10+
8. [Airflow Catchup and Backfill](https://www.youtube.com/watch?v=OXOiUeHOQ-0&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=8)
11+
9. [Schedule Airflow DAG with Cron Expression](https://www.youtube.com/watch?v=tpuovQFUByk&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=9)
12+
10. [Airflow Connection and PostgresOperator](https://www.youtube.com/watch?v=S1eapG6gjLU&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=10)
13+
11. [Add Python Dependencies via Airflow Docker Image Extending and Customizing](https://www.youtube.com/watch?v=0UepvC9X4HY&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=11)
14+
12. [AWS S3 Key Sensor Operator](https://www.youtube.com/watch?v=vuxrhipJMCk&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=12)
15+
13. [Airflow Hooks S3 PostgreSQL](https://www.youtube.com/watch?v=rcG4WNwi900&list=PLwFJcsJ61oujAqYpMp1kdUBcPG0sE0QMT&index=13)
1516

1617
## Running apache airflow 2.0 in docker with local executor.
1718
Here are the steps to take to get airflow 2.0 running with docker on your machine.

dags/dag_with_postgres_hooks.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import csv
2+
import logging
3+
from datetime import datetime, timedelta
4+
from tempfile import NamedTemporaryFile
5+
6+
from airflow import DAG
7+
from airflow.operators.python import PythonOperator
8+
from airflow.providers.postgres.hooks.postgres import PostgresHook
9+
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
10+
11+
12+
default_args = {
13+
'owner': 'coder2j',
14+
'retries': 5,
15+
'retry_delay': timedelta(minutes=10)
16+
}
17+
18+
19+
def postgres_to_s3(ds_nodash, next_ds_nodash):
20+
# step 1: query data from postgresql db and save into text file
21+
hook = PostgresHook(postgres_conn_id="postgres_localhost")
22+
conn = hook.get_conn()
23+
cursor = conn.cursor()
24+
cursor.execute("select * from orders where date >= %s and date < %s",
25+
(ds_nodash, next_ds_nodash))
26+
with NamedTemporaryFile(mode='w', suffix=f"{ds_nodash}") as f:
27+
# with open(f"dags/get_orders_{ds_nodash}.txt", "w") as f:
28+
csv_writer = csv.writer(f)
29+
csv_writer.writerow([i[0] for i in cursor.description])
30+
csv_writer.writerows(cursor)
31+
f.flush()
32+
cursor.close()
33+
conn.close()
34+
logging.info("Saved orders data in text file: %s", f"dags/get_orders_{ds_nodash}.txt")
35+
# step 2: upload text file into S3
36+
s3_hook = S3Hook(aws_conn_id="minio_conn")
37+
s3_hook.load_file(
38+
filename=f.name,
39+
key=f"orders/{ds_nodash}.txt",
40+
bucket_name="airflow",
41+
replace=True
42+
)
43+
logging.info("Orders file %s has been pushed to S3!", f.name)
44+
45+
46+
with DAG(
47+
dag_id="dag_with_postgres_hooks_v04",
48+
default_args=default_args,
49+
start_date=datetime(2022, 4, 30),
50+
schedule_interval='@daily'
51+
) as dag:
52+
task1 = PythonOperator(
53+
task_id="postgres_to_s3",
54+
python_callable=postgres_to_s3
55+
)
56+
task1

0 commit comments

Comments
 (0)