Skip to content

Commit 62d6b4e

Browse files
committed
Fix tumbling/hopping windows with partition closing strategy
1 parent 080c5a9 commit 62d6b4e

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

quixstreams/state/rocksdb/windowed/transaction.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@
99
PartitionTransactionStatus,
1010
validate_transaction_status,
1111
)
12+
from quixstreams.state.exceptions import StateSerializationError
1213
from quixstreams.state.metadata import DEFAULT_PREFIX, SEPARATOR
1314
from quixstreams.state.recovery import ChangelogProducer
1415
from quixstreams.state.serialization import (
1516
DumpsFunc,
1617
LoadsFunc,
18+
deserialize,
1719
serialize,
1820
)
1921
from quixstreams.state.types import ExpiredWindowDetail, WindowDetail
@@ -360,7 +362,8 @@ def expire_all_windows(
360362
end=end,
361363
prefix=prefix,
362364
)
363-
yield (start, end), aggregated, collected, prefix
365+
deserialized_prefix = self._deserialize_prefix(prefix)
366+
yield (start, end), aggregated, collected, deserialized_prefix
364367

365368
else:
366369
# If we don't have a saved last_expired value it means one of two cases
@@ -382,7 +385,8 @@ def expire_all_windows(
382385
prefix=prefix,
383386
)
384387

385-
yield (start, end), aggregated, collected, prefix
388+
deserialized_prefix = self._deserialize_prefix(prefix)
389+
yield (start, end), aggregated, collected, deserialized_prefix
386390

387391
if delete:
388392
for prefix, start, end in to_delete:
@@ -394,6 +398,12 @@ def expire_all_windows(
394398
prefix=b"", cache=self._last_expired_timestamps, timestamp_ms=last_expired
395399
)
396400

401+
def _deserialize_prefix(self, prefix: bytes) -> Any:
402+
try:
403+
return deserialize(prefix, loads=self._loads)
404+
except StateSerializationError:
405+
return prefix
406+
397407
def delete_windows(
398408
self, max_start_time: int, delete_values: bool, prefix: bytes
399409
) -> None:

quixstreams/state/types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
tuple[int, int], V, bytes
1313
] # (start, end), aggregated, key
1414
ExpiredWindowDetail: TypeAlias = tuple[
15-
tuple[int, int], V, list[V], bytes
15+
tuple[int, int], V, list[V], Any
1616
] # (start, end), aggregated, collected, key
1717

1818

0 commit comments

Comments
 (0)