Skip to content

Commit 6d7c4be

Browse files
authored
(fix) tableau output fixes - datatypes, archive bucket, and sort order (#507)
* sort objects from list_s3 query by date to get the "last X files" logic to work properly * add correct str representation for pyarrow types to convert to the proper hyper type * output devgreen data to the private archive bucket instead of public
1 parent 4b61fdc commit 6d7c4be

File tree

4 files changed

+32
-11
lines changed

4 files changed

+32
-11
lines changed

runners/run_gtfs_rt_parquet_converter.py

+14-5
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,18 @@ def start_devgreen_gtfs_rt_parquet_updates_local() -> None:
2222
parquet_update_jobs: List[HyperJob] = [HyperDevGreenGtfsRtVehiclePositions, HyperDevGreenGtfsRtTripUpdates]
2323

2424
for job in parquet_update_jobs:
25-
# breakpoint()
2625
job.run_parquet(None)
27-
# outs = job.create_local_hyper()
28-
# print(outs)
26+
27+
28+
def start_devgreen_gtfs_rt_parquet_updates_local_hyper() -> None:
29+
"""Run all gtfs_rt Parquet Update jobs"""
30+
31+
parquet_update_jobs: List[HyperJob] = [HyperDevGreenGtfsRtVehiclePositions, HyperDevGreenGtfsRtTripUpdates]
32+
33+
for job in parquet_update_jobs:
34+
job.run_parquet(None)
35+
outs = job.create_local_hyper()
36+
print(outs)
2937

3038

3139
def start_gtfs_rt_parquet_updates_local() -> None:
@@ -41,5 +49,6 @@ def start_gtfs_rt_parquet_updates_local() -> None:
4149

4250

4351
if __name__ == "__main__":
44-
start_gtfs_rt_parquet_updates_local()
45-
start_devgreen_gtfs_rt_parquet_updates_local()
52+
# start_gtfs_rt_parquet_updates_local()
53+
# start_devgreen_gtfs_rt_parquet_updates_local()
54+
start_devgreen_gtfs_rt_parquet_updates_local_hyper()

src/lamp_py/runtime_utils/remote_files.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,12 @@ def s3_uri(self) -> str:
165165

166166
# DEVGREEN
167167
tableau_devgreen_rt_vehicle_positions_lightrail_60_day = S3Location(
168-
bucket=S3_PUBLIC,
168+
bucket=S3_ARCHIVE,
169169
prefix=os.path.join(TABLEAU, "devgreen-gtfs-rt", "LAMP_DEVGREEN_RT_VehiclePositions_LR_60_day.parquet"),
170170
)
171171
# light rail output file - to be converted to .hyper
172172
tableau_devgreen_rt_trip_updates_lightrail_60_day = S3Location(
173-
bucket=S3_PUBLIC,
173+
bucket=S3_ARCHIVE,
174174
prefix=os.path.join(TABLEAU, "devgreen-gtfs-rt", "LAMP_DEVGREEN_RT_TripUpdates_LR_60_day.parquet"),
175175
)
176176

src/lamp_py/tableau/hyper.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,16 @@ def convert_parquet_dtype(self, dtype: pyarrow.DataType) -> SqlType:
9595
dtype = str(dtype)
9696
dtype_map = {
9797
"int8": SqlType.small_int(),
98+
"uint8": SqlType.small_int(),
9899
"int16": SqlType.small_int(),
100+
"uint16": SqlType.int(),
99101
"int32": SqlType.int(),
100102
"uint32": SqlType.big_int(),
101103
"int64": SqlType.big_int(),
102104
"bool": SqlType.bool(),
103-
"float16": SqlType.double(),
104-
"float32": SqlType.double(),
105-
"float64": SqlType.double(),
105+
"halffloat": SqlType.double(),
106+
"float": SqlType.double(),
107+
"double": SqlType.double(),
106108
}
107109

108110
map_check = dtype_map.get(dtype)

src/lamp_py/tableau/jobs/filtered_hyper.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def update_parquet(self, _: None) -> bool:
7979
if self.first_run:
8080
self.create_tableau_parquet(num_files=self.rollup_num_days)
8181
self.first_run = False
82+
return True
8283

8384
# only run once per day after 11AM UTC
8485
if object_exists(self.remote_input_location.s3_uri):
@@ -106,7 +107,15 @@ def create_tableau_parquet(self, num_files: Optional[int]) -> None:
106107
s3_uris = file_list_from_s3(
107108
bucket_name=self.remote_input_location.bucket, file_prefix=file_prefix, in_filter=self.object_filter
108109
)
110+
109111
ds_paths = [s.replace("s3://", "") for s in s3_uris]
112+
113+
# s3 list returns in lexicographical order,
114+
# so month=4/day=4 comes after month=4/day=30. This sort grabs the last part,
115+
# e.g. 2025-04-22T00:00:00.parquet as the sort key, and re-orders by that instead
116+
# to get it in date order
117+
ds_paths = sorted(ds_paths, key=lambda x: os.path.split(x)[1])
118+
110119
if num_files is not None:
111120
ds_paths = ds_paths[-num_files:]
112121

@@ -115,8 +124,9 @@ def create_tableau_parquet(self, num_files: Optional[int]) -> None:
115124
format="parquet",
116125
filesystem=S3FileSystem(),
117126
)
118-
process_logger = ProcessLogger("filtered_hyper_create", file_prefix=file_prefix)
127+
process_logger = ProcessLogger("filtered_hyper_create", file_prefix=file_prefix, num_days=num_files)
119128
process_logger.log_start()
129+
process_logger.add_metadata(first_file=ds_paths[0], last_file=ds_paths[-1])
120130
with pq.ParquetWriter(self.local_parquet_path, schema=self.processed_schema) as writer:
121131
for batch in ds.to_batches(
122132
batch_size=500_000, columns=self.processed_schema.names, filter=self.parquet_filter

0 commit comments

Comments
 (0)