Skip to content

Commit dd3bf22

Browse files
committed
Deserialize key for partition closing strategy
1 parent a36aa9c commit dd3bf22

File tree

2 files changed

+3
-1
lines changed

2 files changed

+3
-1
lines changed

quixstreams/dataframe/windows/time_based.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ def expire_by_partition(
229229

230230
def expire_by_key(
231231
self,
232-
key: bytes,
232+
key: Any,
233233
state: WindowedState,
234234
max_expired_start: int,
235235
collect: bool,

quixstreams/state/rocksdb/windowed/transaction.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from quixstreams.state.serialization import (
1515
DumpsFunc,
1616
LoadsFunc,
17+
deserialize,
1718
serialize,
1819
)
1920
from quixstreams.state.types import ExpiredWindowDetail, WindowDetail
@@ -360,6 +361,7 @@ def expire_all_windows(
360361
end=end,
361362
prefix=prefix,
362363
)
364+
prefix = deserialize(prefix, loads=self._loads)
363365
yield (start, end), aggregated, collected, prefix
364366

365367
else:

0 commit comments

Comments
 (0)