Skip to content

Commit 74cf549

Browse files
committed
Completes scalabale databasing lesson
1 parent e207903 commit 74cf549

File tree

1 file changed

+234
-9
lines changed

1 file changed

+234
-9
lines changed

Python-ML/Databasing-for-ML.ipynb

Lines changed: 234 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,18 @@
1212
"## Iris dataset in Pandas: recap"
1313
]
1414
},
15+
{
16+
"cell_type": "code",
17+
"execution_count": null,
18+
"id": "8ce260a9",
19+
"metadata": {},
20+
"outputs": [],
21+
"source": [
22+
"# define path prefix for work directory as it might differ based on the environment\n",
23+
"from pathlib import Path\n",
24+
"WORK = Path(\"work/Python-ML\")"
25+
]
26+
},
1527
{
1628
"cell_type": "code",
1729
"execution_count": null,
@@ -526,7 +538,7 @@
526538
"metadata": {},
527539
"outputs": [],
528540
"source": [
529-
"duckdb.from_df(iris).to_parquet('iris.parquet')"
541+
"duckdb.from_df(iris).to_parquet(f'{WORK}/iris.parquet')"
530542
]
531543
},
532544
{
@@ -544,7 +556,7 @@
544556
"metadata": {},
545557
"outputs": [],
546558
"source": [
547-
"iris.to_parquet('iris-pd.parquet', index=False)"
559+
"iris.to_parquet(WORK / 'iris-pd.parquet', index=False)"
548560
]
549561
},
550562
{
@@ -570,7 +582,7 @@
570582
"import pyarrow.parquet as pq\n",
571583
"\n",
572584
"table = pa.Table.from_pandas(iris)\n",
573-
"pq.write_to_dataset(table, root_path='iris', partition_cols=['target'],\n",
585+
"pq.write_to_dataset(table, root_path=WORK / 'iris', partition_cols=['target'],\n",
574586
" existing_data_behavior='delete_matching')"
575587
]
576588
},
@@ -591,7 +603,7 @@
591603
"metadata": {},
592604
"outputs": [],
593605
"source": [
594-
"pd.read_parquet(path='iris', partitioning='hive')"
606+
"pd.read_parquet(path=WORK / 'iris', partitioning='hive')"
595607
]
596608
},
597609
{
@@ -601,7 +613,7 @@
601613
"metadata": {},
602614
"outputs": [],
603615
"source": [
604-
"duckdb.read_parquet('iris/*/*.parquet', hive_partitioning=True)"
616+
"duckdb.read_parquet(f'{WORK}/iris/*/*.parquet', hive_partitioning=True)"
605617
]
606618
},
607619
{
@@ -676,7 +688,8 @@
676688
"outputs": [],
677689
"source": [
678690
"iris_large = iris.sample(n=100_000, replace=True)\n",
679-
"iris_large.to_parquet('iris_large.parquet')"
691+
"iris_large.to_parquet('work/Python-ML/iris_large.parquet')\n",
692+
"iris_large.to_csv('work/Python-ML/iris_large.csv', index=False)"
680693
]
681694
},
682695
{
@@ -695,7 +708,7 @@
695708
{
696709
"cell_type": "code",
697710
"execution_count": null,
698-
"id": "f4fbc5c2",
711+
"id": "27cfe96a",
699712
"metadata": {},
700713
"outputs": [],
701714
"source": [
@@ -707,15 +720,227 @@
707720
{
708721
"cell_type": "code",
709722
"execution_count": null,
710-
"id": "0cd05a51",
723+
"id": "a5514211",
724+
"metadata": {},
725+
"outputs": [],
726+
"source": [
727+
"%%time\n",
728+
"dbfile = f'{WORK}/iris_large.parquet'\n",
729+
"for i in range(30_000):\n",
730+
" res = duckdb.sql(f\"select target, count(*) from read_parquet('{dbfile}') group by target\")\n"
731+
]
732+
},
733+
{
734+
"cell_type": "code",
735+
"execution_count": null,
736+
"id": "1935ef00",
737+
"metadata": {},
738+
"outputs": [],
739+
"source": [
740+
"%%time\n",
741+
"for i in range(30_000):\n",
742+
" res = iris_large.groupby('target', observed=True).count()"
743+
]
744+
},
745+
{
746+
"cell_type": "markdown",
747+
"id": "7321c04d",
748+
"metadata": {},
749+
"source": [
750+
"There are also different algorithms available for compression (e.g. Gzip, Brotli, Zstd) and encoding (e.g. Delta, RLE, PLAIN, DICT). These could further optimize query performance and storage efficiency for specific use-cases."
751+
]
752+
},
753+
{
754+
"cell_type": "markdown",
755+
"id": "b8ee8098",
756+
"metadata": {},
757+
"source": [
758+
"#### Appending to a Parquet dataset\n",
759+
"\n",
760+
"In the OLAP notion, Parquet datasets are not designed for mutability. Hence, rows can't be updated or deleted. \n",
761+
"\n",
762+
"However, although rows can't simply be appended to a Parquet _file_, new rows _can_ be appended to a Parquet _dataset_, which can consist of multiple files:\n",
763+
"\n",
764+
"- We can simply write new rows to a new Parquet file and make sure it is included in the file glob passed to the `read_parquet()` function. (Usually this means it should be in the same directory as the existing Parquet files.)\n",
765+
"- We can also add new rows to an existing partitioned Parquet dataset. To allow existing files but prevent overwriting them, we need to a combination of basename template and existing data behavior.\n",
766+
"\n",
767+
"Let's say we have 3 batches of data but we receive them not all at once but in 3 separate batches:"
768+
]
769+
},
770+
{
771+
"cell_type": "code",
772+
"execution_count": null,
773+
"id": "c63f1593",
774+
"metadata": {},
775+
"outputs": [],
776+
"source": [
777+
"iris_shuffled = iris.sample(frac=1, replace=False)\n",
778+
"iris_b1 = iris_shuffled.iloc[:50]\n",
779+
"iris_b2 = iris_shuffled.iloc[50:100]\n",
780+
"iris_b3 = iris_shuffled.iloc[100:]"
781+
]
782+
},
783+
{
784+
"cell_type": "markdown",
785+
"id": "526b72a6",
786+
"metadata": {},
787+
"source": [
788+
"Write the first batch (assuming we don't have the others yet)"
789+
]
790+
},
791+
{
792+
"cell_type": "code",
793+
"execution_count": null,
794+
"id": "2ed33d79",
795+
"metadata": {},
796+
"outputs": [],
797+
"source": [
798+
"pq.write_to_dataset(pa.Table.from_pandas(iris_b1, preserve_index=False),\n",
799+
" root_path=f'{WORK}/iris-b', partition_cols=['target'],\n",
800+
" basename_template='b1-{i}.parquet',\n",
801+
" existing_data_behavior='overwrite_or_ignore')"
802+
]
803+
},
804+
{
805+
"cell_type": "markdown",
806+
"id": "0db850ec",
807+
"metadata": {},
808+
"source": [
809+
"Then we can keep appending batches as we get them (or write all batches at once if we have them):"
810+
]
811+
},
812+
{
813+
"cell_type": "code",
814+
"execution_count": null,
815+
"id": "87d256c9",
816+
"metadata": {},
817+
"outputs": [],
818+
"source": [
819+
"for b, batch in enumerate([iris_b2, iris_b3], start=2):\n",
820+
" pq.write_to_dataset(pa.Table.from_pandas(batch, preserve_index=False),\n",
821+
" root_path=f'{WORK}/iris-b', partition_cols=['target'],\n",
822+
" basename_template=f'b{b}'+'-{i}.parquet',\n",
823+
" existing_data_behavior='overwrite_or_ignore')"
824+
]
825+
},
826+
{
827+
"cell_type": "markdown",
828+
"id": "ca832844",
829+
"metadata": {},
830+
"source": [
831+
"How we query the dataset remains the same, regardless of whether the data was written all at once or in batches:"
832+
]
833+
},
834+
{
835+
"cell_type": "code",
836+
"execution_count": null,
837+
"id": "c8582c2c",
838+
"metadata": {},
839+
"outputs": [],
840+
"source": [
841+
"duckdb.sql(f\"select * from read_parquet('{WORK}/iris-b/*/*.parquet', hive_partitioning=True)\")"
842+
]
843+
},
844+
{
845+
"cell_type": "markdown",
846+
"id": "ba9266c5",
847+
"metadata": {},
848+
"source": [
849+
"Changing the schema of a Parquet dataset (such as by adding columns) is difficult. The best way to approach this is to create separate Parquet files for each consistent data batch, then use `duckdb.sql()` to combine them (presumably using some kind of OUTER JOIN), and writing the result back to a Parquet dataset.\n",
850+
"\n",
851+
"Say, our first batch doesn't have the petal measurements:"
852+
]
853+
},
854+
{
855+
"cell_type": "code",
856+
"execution_count": null,
857+
"id": "7162071f",
858+
"metadata": {},
859+
"outputs": [],
860+
"source": [
861+
"iris_b1.filter(regex=\"(sepal|target).*\").to_parquet(f'{WORK}/iris-c1.parquet', index=False)"
862+
]
863+
},
864+
{
865+
"cell_type": "markdown",
866+
"id": "2b9f0d5a",
867+
"metadata": {},
868+
"source": [
869+
"Then combine with the other two datasets"
870+
]
871+
},
872+
{
873+
"cell_type": "code",
874+
"execution_count": null,
875+
"id": "2c5aea8a",
876+
"metadata": {},
877+
"outputs": [],
878+
"source": [
879+
"rel = duckdb.sql(\"select * from iris_b2 UNION ALL \"\n",
880+
" \"select * from iris_b3 UNION ALL \"\n",
881+
" \"select sepal_length, sepal_width, null, null, target \" +\n",
882+
" f\"from read_parquet('{WORK}/iris-c1.parquet')\")\n",
883+
"rel"
884+
]
885+
},
886+
{
887+
"cell_type": "code",
888+
"execution_count": null,
889+
"id": "50452dcd",
890+
"metadata": {},
891+
"outputs": [],
892+
"source": [
893+
"rel.to_parquet(f'{WORK}/iris-c.parquet')\n",
894+
"# or alternatively as a partitioned dataset:\n",
895+
"pq.write_to_dataset(rel.arrow(), root_path=f'{WORK}/iris-c', partition_cols=['target'],\n",
896+
" existing_data_behavior='delete_matching')"
897+
]
898+
},
899+
{
900+
"cell_type": "code",
901+
"execution_count": null,
902+
"id": "bfebccbf",
903+
"metadata": {},
904+
"outputs": [],
905+
"source": [
906+
"duckdb.sql(f\"select * from read_parquet('{WORK}/iris-c.parquet')\")"
907+
]
908+
},
909+
{
910+
"cell_type": "code",
911+
"execution_count": null,
912+
"id": "149cad80",
913+
"metadata": {},
914+
"outputs": [],
915+
"source": [
916+
"duckdb.sql(f\"select target, \"\n",
917+
" \"count(*) as num_rows, count(petal_length) as num_petal_measures \"\n",
918+
" f\"from read_parquet('{WORK}/iris-c.parquet')\" +\n",
919+
" \"group by target\")"
920+
]
921+
},
922+
{
923+
"cell_type": "code",
924+
"execution_count": null,
925+
"id": "57ab8b1b",
926+
"metadata": {},
927+
"outputs": [],
928+
"source": [
929+
"duckdb.sql(f\"select * from read_parquet('{WORK}/iris-c/*/*.parquet', hive_partitioning=True)\")"
930+
]
931+
},
932+
{
933+
"cell_type": "code",
934+
"execution_count": null,
935+
"id": "d25f1509",
711936
"metadata": {},
712937
"outputs": [],
713938
"source": []
714939
}
715940
],
716941
"metadata": {
717942
"kernelspec": {
718-
"display_name": "Python 3",
943+
"display_name": "Python 3 (ipykernel)",
719944
"language": "python",
720945
"name": "python3"
721946
},

0 commit comments

Comments
 (0)