-
Notifications
You must be signed in to change notification settings - Fork 77
Feature: Join Latest #874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Feature: Join Latest #874
Changes from 19 commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
5f6d11e
[JOIN] Split _as_stateful into helper functions
gwaramadze fa9a869
[JOIN] Refactor _as_stateful to accept sdf
gwaramadze 92fb704
[JOIN] Implement join
gwaramadze 4fa3557
Use sdf.register_store method
gwaramadze 14c94f0
First test
gwaramadze 6901e2a
Refactor test into fixtures
gwaramadze ae883cd
Ensure both sides are copartitioned
gwaramadze 6b59a97
Add on_overlap and merger params
gwaramadze 7548ffd
Add how param with inner and left options
gwaramadze bc1d81c
Invert left<>right in tests
gwaramadze c64bc70
Add retention_ms param
gwaramadze d3167d3
Correct after rebase
gwaramadze 5a1c06f
Rename to join_latest
gwaramadze 707ee03
Change order right > left
gwaramadze 6ef5a5f
Fix return type
gwaramadze 4c7cfb8
Self joins not supported
gwaramadze 21b1c03
Optimize TimestampedPartitionTransaction._get_min_eligible_timestamp
daniil-quix 637aab2
Optimize RocksDBStorePartition.iter_items(backwards=True)
daniil-quix 40dbaa7
WIP: refactor StreamingDataFrame.join_latest
daniil-quix 20f93b2
join_latest: register store only for the right side
daniil-quix 1ddfab5
join_latest: rename on_overlap -> on_merge
daniil-quix c6a20ba
Move ensure_topics_copartitioned() to TopicManager
daniil-quix 155ee63
Replace WindowStoreAlreadyRegisteredError
daniil-quix dfb7247
TimestampedStore: Rename get_last -> get_latest
daniil-quix 3d6cd33
Tests for test_register_timestamped_store_twice
daniil-quix c58b1ef
join_latest: update docstring
daniil-quix 8be9263
join_latest: use is None check in mergers
daniil-quix a2c5790
Rename join_latest -> join_asof
daniil-quix c129d8d
join_latest: docs
daniil-quix e1f5dd8
Add counter to timestamped store (#886)
gwaramadze ada71fb
join_latest: remove newlines in docs
daniil-quix 0711de7
join_latest: add custom on_merge example
daniil-quix 583341f
join_latest: docs spelling
daniil-quix cc2f5b1
Update docs/joins.md
daniil-quix 4e5629b
Update docs/joins.md
daniil-quix File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .join_latest import How as How | ||
from .join_latest import JoinLatest as JoinLatest | ||
from .join_latest import OnOverlap as OnOverlap |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
import typing | ||
from datetime import timedelta | ||
from typing import Any, Callable, Literal, Union, cast, get_args | ||
|
||
from quixstreams.context import message_context | ||
from quixstreams.dataframe.utils import ensure_milliseconds | ||
from quixstreams.models.topics.manager import TopicManager | ||
from quixstreams.state.rocksdb.timestamped import TimestampedPartitionTransaction | ||
|
||
from .utils import keep_left_merger, keep_right_merger, raise_merger | ||
|
||
if typing.TYPE_CHECKING: | ||
from quixstreams.dataframe.dataframe import StreamingDataFrame | ||
|
||
DISCARDED = object() | ||
How = Literal["inner", "left"] | ||
How_choices = get_args(How) | ||
|
||
OnOverlap = Literal["keep-left", "keep-right", "raise"] | ||
OnOverlap_choices = get_args(OnOverlap) | ||
|
||
|
||
class JoinLatest: | ||
def __init__( | ||
self, | ||
how: How, | ||
on_overlap: Union[OnOverlap, Callable[[Any, Any], Any]], | ||
grace_ms: Union[int, timedelta], | ||
store_name: str = "join", | ||
): | ||
if how not in How_choices: | ||
raise ValueError( | ||
f'Invalid "how" value: {how}. ' | ||
f"Valid choices are: {', '.join(How_choices)}." | ||
) | ||
self._how = how | ||
|
||
if callable(on_overlap): | ||
self._merger = on_overlap | ||
elif on_overlap == "keep-left": | ||
self._merger = keep_left_merger | ||
elif on_overlap == "keep-right": | ||
self._merger = keep_right_merger | ||
elif on_overlap == "raise": | ||
self._merger = raise_merger | ||
else: | ||
raise ValueError( | ||
f'Invalid "on_overlap" value: {on_overlap}. ' | ||
f"Provide either one of {', '.join(OnOverlap_choices)} or " | ||
f"a callable to merge records manually." | ||
) | ||
|
||
self._retention_ms = ensure_milliseconds(grace_ms) | ||
self._store_name = store_name | ||
|
||
def join( | ||
self, | ||
left: "StreamingDataFrame", | ||
right: "StreamingDataFrame", | ||
) -> "StreamingDataFrame": | ||
if left.stream_id == right.stream_id: | ||
raise ValueError( | ||
"Joining dataframes originating from " | ||
"the same topic is not yet supported.", | ||
) | ||
left.ensure_topics_copartitioned(*left.topics, *right.topics) | ||
|
||
for sdf in (left, right): | ||
daniil-quix marked this conversation as resolved.
Show resolved
Hide resolved
|
||
changelog_config = TopicManager.derive_topic_config(sdf.topics) | ||
sdf.processing_context.state_manager.register_timestamped_store( | ||
stream_id=sdf.stream_id, | ||
store_name=self._store_name, | ||
changelog_config=changelog_config, | ||
) | ||
|
||
is_inner_join = self._how == "inner" | ||
|
||
def left_func(value, key, timestamp, headers): | ||
tx = cast( | ||
TimestampedPartitionTransaction, | ||
right.processing_context.checkpoint.get_store_transaction( | ||
stream_id=right.stream_id, | ||
partition=message_context().partition, | ||
store_name=self._store_name, | ||
), | ||
) | ||
|
||
right_value = tx.get_last(timestamp=timestamp, prefix=key) | ||
if is_inner_join and not right_value: | ||
return DISCARDED | ||
return self._merger(value, right_value) | ||
|
||
def right_func(value, key, timestamp, headers): | ||
tx = cast( | ||
TimestampedPartitionTransaction, | ||
right.processing_context.checkpoint.get_store_transaction( | ||
stream_id=right.stream_id, | ||
partition=message_context().partition, | ||
store_name=self._store_name, | ||
), | ||
) | ||
tx.set_for_timestamp( | ||
timestamp=timestamp, | ||
value=value, | ||
prefix=key, | ||
retention_ms=self._retention_ms, | ||
) | ||
|
||
right = right.update(right_func, metadata=True).filter(lambda value: False) | ||
left = left.apply(left_func, metadata=True).filter( | ||
lambda value: value is not DISCARDED | ||
) | ||
return left.concat(right) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from typing import Mapping, Optional | ||
|
||
|
||
def keep_left_merger(left: Optional[Mapping], right: Optional[Mapping]) -> dict: | ||
""" | ||
Merge two dictionaries, preferring values from the left dictionary | ||
""" | ||
left = left or {} | ||
right = right or {} | ||
return {**right, **left} | ||
|
||
|
||
def keep_right_merger(left: Optional[Mapping], right: Optional[Mapping]) -> dict: | ||
""" | ||
Merge two dictionaries, preferring values from the right dictionary | ||
""" | ||
left = left or {} | ||
right = right or {} | ||
# TODO: Add try-except everywhere and tell to pass a callback if one of the objects is not a mapping | ||
return {**left, **right} | ||
|
||
|
||
def raise_merger(left: Optional[Mapping], right: Optional[Mapping]) -> dict: | ||
""" | ||
Merge two dictionaries and raise an error if overlapping keys detected | ||
""" | ||
left = left or {} | ||
right = right or {} | ||
if overlapping_columns := left.keys() & right.keys(): | ||
overlapping_columns_str = ", ".join(sorted(overlapping_columns)) | ||
raise ValueError( | ||
f"Overlapping columns: {overlapping_columns_str}." | ||
'You need to provide either an "on_overlap" value of ' | ||
"'keep-left' or 'keep-right' or a custom merger function." | ||
) | ||
return {**left, **right} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.