-
Notifications
You must be signed in to change notification settings - Fork 77
Feature: Join Latest #874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Feature: Join Latest #874
Changes from all commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
5f6d11e
[JOIN] Split _as_stateful into helper functions
gwaramadze fa9a869
[JOIN] Refactor _as_stateful to accept sdf
gwaramadze 92fb704
[JOIN] Implement join
gwaramadze 4fa3557
Use sdf.register_store method
gwaramadze 14c94f0
First test
gwaramadze 6901e2a
Refactor test into fixtures
gwaramadze ae883cd
Ensure both sides are copartitioned
gwaramadze 6b59a97
Add on_overlap and merger params
gwaramadze 7548ffd
Add how param with inner and left options
gwaramadze bc1d81c
Invert left<>right in tests
gwaramadze c64bc70
Add retention_ms param
gwaramadze d3167d3
Correct after rebase
gwaramadze 5a1c06f
Rename to join_latest
gwaramadze 707ee03
Change order right > left
gwaramadze 6ef5a5f
Fix return type
gwaramadze 4c7cfb8
Self joins not supported
gwaramadze 21b1c03
Optimize TimestampedPartitionTransaction._get_min_eligible_timestamp
daniil-quix 637aab2
Optimize RocksDBStorePartition.iter_items(backwards=True)
daniil-quix 40dbaa7
WIP: refactor StreamingDataFrame.join_latest
daniil-quix 20f93b2
join_latest: register store only for the right side
daniil-quix 1ddfab5
join_latest: rename on_overlap -> on_merge
daniil-quix c6a20ba
Move ensure_topics_copartitioned() to TopicManager
daniil-quix 155ee63
Replace WindowStoreAlreadyRegisteredError
daniil-quix dfb7247
TimestampedStore: Rename get_last -> get_latest
daniil-quix 3d6cd33
Tests for test_register_timestamped_store_twice
daniil-quix c58b1ef
join_latest: update docstring
daniil-quix 8be9263
join_latest: use is None check in mergers
daniil-quix a2c5790
Rename join_latest -> join_asof
daniil-quix c129d8d
join_latest: docs
daniil-quix e1f5dd8
Add counter to timestamped store (#886)
gwaramadze ada71fb
join_latest: remove newlines in docs
daniil-quix 0711de7
join_latest: add custom on_merge example
daniil-quix 583341f
join_latest: docs spelling
daniil-quix cc2f5b1
Update docs/joins.md
daniil-quix 4e5629b
Update docs/joins.md
daniil-quix File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
# Joins | ||
## Join as-of | ||
|
||
|
||
Use `StreamingDataFrame.join_asof()` to join two topics into a new stream where each left record | ||
is merged with the right record with the same key whose timestamp is less than or equal to the left timestamp. | ||
|
||
This join is built with the timeseries enrichment use cases in mind, where the left side represents some measurements and the right side represents events. | ||
|
||
Some examples: | ||
|
||
- Matching of the sensor measurements with the events in the system. | ||
- Joining the purchases with the effective prices of the goods. | ||
|
||
During as-of join, the records on the right side get stored into a lookup table in the state, and the records from the left side query this state for matches. | ||
|
||
 | ||
|
||
### Requirements | ||
To perform a join, the underlying topics must follow these requirements: | ||
|
||
1. **Both topics must have the same number of partitions.** | ||
Join is a stateful operation, and it requires partitions of left and right topics to be assigned to the same application during processing. | ||
|
||
2. **Keys in both topics must be distributed across partitions using the same algorithm.** | ||
For example, messages with the key `A` must go to the same partition number for both left and right topics. This is Kafka's default behaviour. | ||
|
||
|
||
### Example | ||
|
||
Join records from the topic "measurements" with the latest effective records from | ||
the topic "metadata" using the "inner" join strategy and a grace period of 14 days: | ||
|
||
```python | ||
from datetime import timedelta | ||
|
||
from quixstreams import Application | ||
|
||
app = Application(...) | ||
|
||
sdf_measurements = app.dataframe(app.topic("measurements")) | ||
sdf_metadata = app.dataframe(app.topic("metadata")) | ||
|
||
# Join records from the topic "measurements" | ||
# with the latest effective records from the topic "metadata". | ||
# using the "inner" join strategy and keeping the "metadata" records stored for 14 days in event time. | ||
sdf_joined = sdf_measurements.join_asof( | ||
right=sdf_metadata, | ||
how="inner", # Emit updates only if the match is found in the store. | ||
on_merge="keep-left", # Prefer the columns from the left dataframe if they overlap with the right. | ||
grace_ms=timedelta(days=14), # Keep the state for 14 days (measured in event time similar to windows). | ||
) | ||
|
||
if __name__ == '__main__': | ||
app.run() | ||
``` | ||
|
||
|
||
### How it works | ||
|
||
Here is a description of the as-of join algorithm: | ||
|
||
- Records from the right side get written to the state store without emitting any updates downstream. | ||
- Records on the left side query the right store for the values with the same **key** and the timestamp lower or equal to the record's timestamp. | ||
- If the match is found, the two records are merged together into a new one according to the `on_merge` logic. | ||
- The size of the right store is controlled by the "grace_ms": | ||
a newly added "right" record expires other values with the same key with timestamps below "<current timestamp> - <grace_ms>". | ||
|
||
#### Joining strategies | ||
As-of join supports the following joining strategies: | ||
|
||
- `inner` - emit the output for the left record only when the match is found (default). | ||
- `left` - emit the output for the left record even without a match. | ||
|
||
|
||
#### Merging records together | ||
When the match is found, the two records are merged according to the `on_merge` parameter. | ||
|
||
Out-of-the-box implementations assume that records are **dictionaries**. | ||
For merging other data types (as well as customizing the behavior) use the callback option. | ||
|
||
Possible values: | ||
|
||
- `raise` - merge two records together into a new dictionary and raise an exception if the same keys are found in both dictionaries. | ||
This is a default behavior. | ||
|
||
- `keep-left` - merge two records together into a new dictionary and prefer keys from the **left** record in case of overlap. | ||
|
||
- `keep-right` - merge two records together into a new dictionary and prefer keys from the **right** record in case of overlap. | ||
|
||
- custom callback - pass a callback `(<left>, <right>) -> <merged>` to merge the records manually. | ||
Use it when non-dictionary types are expected, or you want to customize the returned object: | ||
|
||
```python | ||
from typing import Optional | ||
|
||
from quixstreams import Application | ||
|
||
app = Application(...) | ||
|
||
sdf_measurements = app.dataframe(app.topic("measurements")) | ||
sdf_metadata = app.dataframe(app.topic("metadata")) | ||
|
||
|
||
def on_merge(left: int, right: Optional[str]) -> dict: | ||
""" | ||
Merge non-dictionary items into a dict | ||
""" | ||
return {'measurement': left, 'metadata': right} | ||
|
||
|
||
sdf_joined = sdf_measurements.join_asof(right=sdf_metadata, on_merge=on_merge) | ||
|
||
if __name__ == '__main__': | ||
app.run() | ||
``` | ||
|
||
|
||
|
||
#### State expiration | ||
`StreamingDataFrame.join_asof` stores the right records to the state. | ||
The `grace_ms` parameter regulates the state's lifetime (default - 7 days) to prevent it from growing in size forever. | ||
|
||
It shares some similarities with `grace_ms` in [Windows](windowing.md/#lateness-and-out-of-order-processing): | ||
|
||
- The timestamps are obtained from the records. | ||
- The join key keeps track of the maximum observed timestamp for **each individual key**. | ||
- The older values get expired only when the larger timestamp gets stored to the state. | ||
|
||
Adjust `grace_ms` based on the expected time gap between the left and the right side of the join. | ||
|
||
### Limitations | ||
|
||
- Joining dataframes belonging to the same topics (aka "self-join") is not supported. | ||
- As-of join preserves headers only for the left dataframe. | ||
If you need headers of the right side records, consider adding them to the value. | ||
|
||
## Message ordering between partitions | ||
Joins use [`StreamingDataFrame.concat()`](concatenating.md) under the hood, which means that the application's internal consumer goes into a special "buffered" mode | ||
when the join is used. | ||
|
||
In this mode, it buffers messages per partition in order to process them in the timestamp order between different topics. | ||
Timestamp alignment is effective only for the partitions **with the same numbers**: partition zero is aligned with other zero partitions, but not with partition one. | ||
|
||
Note that message ordering works only when the messages are consumed from the topics. | ||
If you change timestamps of the record during processing, they will be processed in the original order. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .join_asof import JoinAsOf as JoinAsOf | ||
from .join_asof import JoinAsOfHow as JoinAsOfHow | ||
from .join_asof import OnOverlap as OnOverlap |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.