Skip to content

Feature: Join Latest #874

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5f6d11e
[JOIN] Split _as_stateful into helper functions
gwaramadze Apr 7, 2025
fa9a869
[JOIN] Refactor _as_stateful to accept sdf
gwaramadze Apr 8, 2025
92fb704
[JOIN] Implement join
gwaramadze Apr 9, 2025
4fa3557
Use sdf.register_store method
gwaramadze Apr 21, 2025
14c94f0
First test
gwaramadze Apr 21, 2025
6901e2a
Refactor test into fixtures
gwaramadze Apr 21, 2025
ae883cd
Ensure both sides are copartitioned
gwaramadze Apr 22, 2025
6b59a97
Add on_overlap and merger params
gwaramadze Apr 22, 2025
7548ffd
Add how param with inner and left options
gwaramadze Apr 23, 2025
bc1d81c
Invert left<>right in tests
gwaramadze Apr 23, 2025
c64bc70
Add retention_ms param
gwaramadze Apr 23, 2025
d3167d3
Correct after rebase
gwaramadze May 6, 2025
5a1c06f
Rename to join_latest
gwaramadze May 8, 2025
707ee03
Change order right > left
gwaramadze May 9, 2025
6ef5a5f
Fix return type
gwaramadze May 9, 2025
4c7cfb8
Self joins not supported
gwaramadze May 9, 2025
21b1c03
Optimize TimestampedPartitionTransaction._get_min_eligible_timestamp
daniil-quix May 14, 2025
637aab2
Optimize RocksDBStorePartition.iter_items(backwards=True)
daniil-quix May 15, 2025
40dbaa7
WIP: refactor StreamingDataFrame.join_latest
daniil-quix May 19, 2025
20f93b2
join_latest: register store only for the right side
daniil-quix May 19, 2025
1ddfab5
join_latest: rename on_overlap -> on_merge
daniil-quix May 19, 2025
c6a20ba
Move ensure_topics_copartitioned() to TopicManager
daniil-quix May 19, 2025
155ee63
Replace WindowStoreAlreadyRegisteredError
daniil-quix May 19, 2025
dfb7247
TimestampedStore: Rename get_last -> get_latest
daniil-quix May 19, 2025
3d6cd33
Tests for test_register_timestamped_store_twice
daniil-quix May 19, 2025
c58b1ef
join_latest: update docstring
daniil-quix May 19, 2025
8be9263
join_latest: use is None check in mergers
daniil-quix May 19, 2025
a2c5790
Rename join_latest -> join_asof
daniil-quix May 20, 2025
c129d8d
join_latest: docs
daniil-quix May 21, 2025
e1f5dd8
Add counter to timestamped store (#886)
gwaramadze May 21, 2025
ada71fb
join_latest: remove newlines in docs
daniil-quix May 21, 2025
0711de7
join_latest: add custom on_merge example
daniil-quix May 21, 2025
583341f
join_latest: docs spelling
daniil-quix May 21, 2025
cc2f5b1
Update docs/joins.md
daniil-quix May 21, 2025
4e5629b
Update docs/joins.md
daniil-quix May 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/img/join-asof.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
146 changes: 146 additions & 0 deletions docs/joins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Joins
## Join as-of


Use `StreamingDataFrame.join_asof()` to join two topics into a new stream where each left record
is merged with the right record with the same key whose timestamp is less than or equal to the left timestamp.

This join is built with the timeseries enrichment use cases in mind, where the left side represents some measurements and the right side represents events.

Some examples:

- Matching of the sensor measurements with the events in the system.
- Joining the purchases with the effective prices of the goods.

During as-of join, the records on the right side get stored into a lookup table in the state, and the records from the left side query this state for matches.

![img.png](img/join-asof.png)

### Requirements
To perform a join, the underlying topics must follow these requirements:

1. **Both topics must have the same number of partitions.**
Join is a stateful operation, and it requires partitions of left and right topics to be assigned to the same application during processing.

2. **Keys in both topics must be distributed across partitions using the same algorithm.**
For example, messages with the key `A` must go to the same partition number for both left and right topics. This is Kafka's default behaviour.


### Example

Join records from the topic "measurements" with the latest effective records from
the topic "metadata" using the "inner" join strategy and a grace period of 14 days:

```python
from datetime import timedelta

from quixstreams import Application

app = Application(...)

sdf_measurements = app.dataframe(app.topic("measurements"))
sdf_metadata = app.dataframe(app.topic("metadata"))

# Join records from the topic "measurements"
# with the latest effective records from the topic "metadata".
# using the "inner" join strategy and keeping the "metadata" records stored for 14 days in event time.
sdf_joined = sdf_measurements.join_asof(
right=sdf_metadata,
how="inner", # Emit updates only if the match is found in the store.
on_merge="keep-left", # Prefer the columns from the left dataframe if they overlap with the right.
grace_ms=timedelta(days=14), # Keep the state for 14 days (measured in event time similar to windows).
)

if __name__ == '__main__':
app.run()
```


### How it works

Here is a description of the as-of join algorithm:

- Records from the right side get written to the state store without emitting any updates downstream.
- Records on the left side query the right store for the values with the same **key** and the timestamp lower or equal to the record's timestamp.
- If the match is found, the two records are merged together into a new one according to the `on_merge` logic.
- The size of the right store is controlled by the "grace_ms":
a newly added "right" record expires other values with the same key with timestamps below "<current timestamp> - <grace_ms>".

#### Joining strategies
As-of join supports the following joining strategies:

- `inner` - emit the output for the left record only when the match is found (default).
- `left` - emit the output for the left record even without a match.


#### Merging records together
When the match is found, the two records are merged according to the `on_merge` parameter.

Out-of-the-box implementations assume that records are **dictionaries**.
For merging other data types (as well as customizing the behavior) use the callback option.

Possible values:

- `raise` - merge two records together into a new dictionary and raise an exception if the same keys are found in both dictionaries.
This is a default behavior.

- `keep-left` - merge two records together into a new dictionary and prefer keys from the **left** record in case of overlap.

- `keep-right` - merge two records together into a new dictionary and prefer keys from the **right** record in case of overlap.

- custom callback - pass a callback `(<left>, <right>) -> <merged>` to merge the records manually.
Use it when non-dictionary types are expected, or you want to customize the returned object:

```python
from typing import Optional

from quixstreams import Application

app = Application(...)

sdf_measurements = app.dataframe(app.topic("measurements"))
sdf_metadata = app.dataframe(app.topic("metadata"))


def on_merge(left: int, right: Optional[str]) -> dict:
"""
Merge non-dictionary items into a dict
"""
return {'measurement': left, 'metadata': right}


sdf_joined = sdf_measurements.join_asof(right=sdf_metadata, on_merge=on_merge)

if __name__ == '__main__':
app.run()
```



#### State expiration
`StreamingDataFrame.join_asof` stores the right records to the state.
The `grace_ms` parameter regulates the state's lifetime (default - 7 days) to prevent it from growing in size forever.

It shares some similarities with `grace_ms` in [Windows](windowing.md/#lateness-and-out-of-order-processing):

- The timestamps are obtained from the records.
- The join key keeps track of the maximum observed timestamp for **each individual key**.
- The older values get expired only when the larger timestamp gets stored to the state.

Adjust `grace_ms` based on the expected time gap between the left and the right side of the join.

### Limitations

- Joining dataframes belonging to the same topics (aka "self-join") is not supported.
- As-of join preserves headers only for the left dataframe.
If you need headers of the right side records, consider adding them to the value.

## Message ordering between partitions
Joins use [`StreamingDataFrame.concat()`](concatenating.md) under the hood, which means that the application's internal consumer goes into a special "buffered" mode
when the join is used.

In this mode, it buffers messages per partition in order to process them in the timestamp order between different topics.
Timestamp alignment is effective only for the partitions **with the same numbers**: partition zero is aligned with other zero partitions, but not with partition one.

Note that message ordering works only when the messages are consumed from the topics.
If you change timestamps of the record during processing, they will be processed in the original order.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ nav:
- Windowing: windowing.md
- Aggregations: aggregations.md
- Concatenating Topics: concatenating.md
- Joins: joins.md
- Branching StreamingDataFrames: branching.md
- Configuration: configuration.md

Expand Down
132 changes: 92 additions & 40 deletions quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@
from quixstreams.models.serializers import DeserializerType, SerializerType
from quixstreams.sinks import BaseSink
from quixstreams.state.base import State
from quixstreams.state.manager import StoreTypes
from quixstreams.utils.printing import (
DEFAULT_COLUMN_NAME,
DEFAULT_LIVE,
DEFAULT_LIVE_SLOWDOWN,
)

from .exceptions import InvalidOperation, TopicPartitionsMismatch
from .exceptions import InvalidOperation
from .joins import JoinAsOf, JoinAsOfHow, OnOverlap
from .registry import DataFrameRegistry
from .series import StreamingSeries
from .utils import ensure_milliseconds
Expand Down Expand Up @@ -275,7 +277,7 @@ def func(d: dict, state: State):
Default - `False`.
"""
if stateful:
self._register_store()
self.register_store()
# Force the callback to accept metadata
if metadata:
with_metadata_func = cast(ApplyWithMetadataCallbackStateful, func)
Expand All @@ -284,11 +286,7 @@ def func(d: dict, state: State):
cast(ApplyCallbackStateful, func)
)

stateful_func = _as_stateful(
func=with_metadata_func,
processing_context=self._processing_context,
stream_id=self.stream_id,
)
stateful_func = _as_stateful(with_metadata_func, self)
stream = self.stream.add_apply(stateful_func, expand=expand, metadata=True) # type: ignore[call-overload]
else:
stream = self.stream.add_apply(
Expand Down Expand Up @@ -384,7 +382,7 @@ def func(values: list, state: State):
:return: the updated StreamingDataFrame instance (reassignment NOT required).
"""
if stateful:
self._register_store()
self.register_store()
# Force the callback to accept metadata
if metadata:
with_metadata_func = cast(UpdateWithMetadataCallbackStateful, func)
Expand All @@ -393,11 +391,7 @@ def func(values: list, state: State):
cast(UpdateCallbackStateful, func)
)

stateful_func = _as_stateful(
func=with_metadata_func,
processing_context=self._processing_context,
stream_id=self.stream_id,
)
stateful_func = _as_stateful(with_metadata_func, self)
return self._add_update(stateful_func, metadata=True)
else:
return self._add_update(
Expand Down Expand Up @@ -486,7 +480,7 @@ def func(d: dict, state: State):
"""

if stateful:
self._register_store()
self.register_store()
# Force the callback to accept metadata
if metadata:
with_metadata_func = cast(FilterWithMetadataCallbackStateful, func)
Expand All @@ -495,11 +489,7 @@ def func(d: dict, state: State):
cast(FilterCallbackStateful, func)
)

stateful_func = _as_stateful(
func=with_metadata_func,
processing_context=self._processing_context,
stream_id=self.stream_id,
)
stateful_func = _as_stateful(with_metadata_func, self)
stream = self.stream.add_filter(stateful_func, metadata=True)
else:
stream = self.stream.add_filter( # type: ignore[call-overload]
Expand Down Expand Up @@ -1656,16 +1646,79 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
*self.topics, *other.topics, stream=merged_stream
)

def ensure_topics_copartitioned(self):
partitions_counts = set(t.broker_config.num_partitions for t in self._topics)
if len(partitions_counts) > 1:
msg = ", ".join(
f'"{t.name}" ({t.broker_config.num_partitions} partitions)'
for t in self._topics
)
raise TopicPartitionsMismatch(
f"The underlying topics must have the same number of partitions to use State; got {msg}"
)
def join_asof(
self,
right: "StreamingDataFrame",
how: JoinAsOfHow = "inner",
on_merge: Union[OnOverlap, Callable[[Any, Any], Any]] = "raise",
grace_ms: Union[int, timedelta] = timedelta(days=7),
name: Optional[str] = None,
) -> "StreamingDataFrame":
"""
Join the left dataframe with the records of the right dataframe with
the same key whose timestamp is less than or equal to the left timestamp.
This join is built with the enrichment use case in mind, where the left side
represents some measurements and the right side is metadata.

To be joined, the underlying topics of the dataframes must have the same number of partitions
and use the same partitioner (all keys should be distributed across partitions using the same algorithm).

Joining dataframes belonging to the same topics (aka "self-join") is not supported as of now.

How it works:
- Records from the right side get written to the state store without emitting any updates downstream.
- Records on the left side query the right store for the values with the same **key** and the timestamp lower or equal to the record's timestamp.
Left side emits data downstream.
- If the match is found, the two records are merged together into a new one according to the `on_merge` logic.
- The size of the right store is controlled by the "grace_ms":
a newly added "right" record expires other values with the same key with timestamps below "<current timestamp> - <grace_ms>".

:param right: a StreamingDataFrame to join with.

:param how: the join strategy. Can be one of:
- "inner" - emits the result when the match on the right side is found for the left record.
- "left" - emits the result for each left record even if there is no match on the right side.
Default - `"inner"`.

:param on_merge: how to merge the matched records together assuming they are dictionaries:
- "raise" - fail with an error if the same keys are found in both dictionaries
- "keep-left" - prefer the keys from the left record.
- "keep-right" - prefer the keys from the right record
- callback - a callback in form "(<left>, <right>) -> <new record>" to merge the records manually.
Use it to customize the merging logic or when one of the records is not a dictionary.

:param grace_ms: how long to keep the right records in the store in event time.
(the time is taken from the records' timestamps).
It can be specified as either an `int` representing milliseconds or as a `timedelta` object.
The records are expired per key when the new record gets added.
Default - 7 days.

:param name: The unique identifier of the underlying state store for the "right" dataframe.
If not provided, it will be generated based on the underlying topic names.
Provide a custom name if you need to join the same right dataframe multiple times
within the application.

Example:

```python
from datetime import timedelta
from quixstreams import Application

app = Application()

sdf_measurements = app.dataframe(app.topic("measurements"))
sdf_metadata = app.dataframe(app.topic("metadata"))

# Join records from the topic "measurements"
# with the latest effective records from the topic "metadata"
# using the "inner" join strategy and keeping the "metadata" records stored for 14 days in event time.
sdf_joined = sdf_measurements.join_asof(sdf_metadata, how="inner", grace_ms=timedelta(days=14))
```

"""
return JoinAsOf(
how=how, on_merge=on_merge, grace_ms=grace_ms, store_name=name
).join(self, right)

def _produce(
self,
Expand All @@ -1689,17 +1742,19 @@ def _add_update(
self._stream = self._stream.add_update(func, metadata=metadata) # type: ignore[call-overload]
return self

def _register_store(self):
def register_store(self, store_type: Optional[StoreTypes] = None) -> None:
"""
Register the default store for the current stream_id in StateStoreManager.
"""
self.ensure_topics_copartitioned()
TopicManager.ensure_topics_copartitioned(*self._topics)

# Generate a changelog topic config based on the underlying topics.
changelog_topic_config = self._topic_manager.derive_topic_config(self._topics)

self._processing_context.state_manager.register_store(
stream_id=self.stream_id, changelog_config=changelog_topic_config
stream_id=self.stream_id,
store_type=store_type,
changelog_config=changelog_topic_config,
)

def _groupby_key(
Expand Down Expand Up @@ -1847,19 +1902,16 @@ def wrapper(

def _as_stateful(
func: Callable[[Any, Any, int, Any, State], T],
processing_context: ProcessingContext,
stream_id: str,
sdf: StreamingDataFrame,
) -> Callable[[Any, Any, int, Any], T]:
@functools.wraps(func)
def wrapper(value: Any, key: Any, timestamp: int, headers: Any) -> Any:
ctx = message_context()
transaction = processing_context.checkpoint.get_store_transaction(
stream_id=stream_id,
partition=ctx.partition,
)
# Pass a State object with an interface limited to the key updates only
# and prefix all the state keys by the message key
state = transaction.as_state(prefix=key)
state = sdf.processing_context.checkpoint.get_store_transaction(
stream_id=sdf.stream_id,
partition=message_context().partition,
).as_state(prefix=key)
return func(value, key, timestamp, headers, state)

return wrapper
4 changes: 0 additions & 4 deletions quixstreams/dataframe/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"ColumnDoesNotExist",
"StreamingDataFrameDuplicate",
"GroupByDuplicate",
"TopicPartitionsMismatch",
)


Expand All @@ -27,6 +26,3 @@ class GroupByDuplicate(QuixException): ...


class StreamingDataFrameDuplicate(QuixException): ...


class TopicPartitionsMismatch(QuixException): ...
3 changes: 3 additions & 0 deletions quixstreams/dataframe/joins/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .join_asof import JoinAsOf as JoinAsOf
from .join_asof import JoinAsOfHow as JoinAsOfHow
from .join_asof import OnOverlap as OnOverlap
Loading