Skip to content

Commit 17c9317

Browse files
Feature: Join Latest (#874)
Co-authored-by: Remy Gwaramadze <gwaramadze@users.noreply.github.com> Co-authored-by: Daniil Gusev <daniil@quix.io>
1 parent d168e85 commit 17c9317

27 files changed

+936
-172
lines changed

docs/img/join-asof.png

450 KB
Loading

docs/joins.md

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Joins
2+
## Join as-of
3+
4+
5+
Use `StreamingDataFrame.join_asof()` to join two topics into a new stream where each left record
6+
is merged with the right record with the same key whose timestamp is less than or equal to the left timestamp.
7+
8+
This join is built with the timeseries enrichment use cases in mind, where the left side represents some measurements and the right side represents events.
9+
10+
Some examples:
11+
12+
- Matching of the sensor measurements with the events in the system.
13+
- Joining the purchases with the effective prices of the goods.
14+
15+
During as-of join, the records on the right side get stored into a lookup table in the state, and the records from the left side query this state for matches.
16+
17+
![img.png](img/join-asof.png)
18+
19+
### Requirements
20+
To perform a join, the underlying topics must follow these requirements:
21+
22+
1. **Both topics must have the same number of partitions.**
23+
Join is a stateful operation, and it requires partitions of left and right topics to be assigned to the same application during processing.
24+
25+
2. **Keys in both topics must be distributed across partitions using the same algorithm.**
26+
For example, messages with the key `A` must go to the same partition number for both left and right topics. This is Kafka's default behaviour.
27+
28+
29+
### Example
30+
31+
Join records from the topic "measurements" with the latest effective records from
32+
the topic "metadata" using the "inner" join strategy and a grace period of 14 days:
33+
34+
```python
35+
from datetime import timedelta
36+
37+
from quixstreams import Application
38+
39+
app = Application(...)
40+
41+
sdf_measurements = app.dataframe(app.topic("measurements"))
42+
sdf_metadata = app.dataframe(app.topic("metadata"))
43+
44+
# Join records from the topic "measurements"
45+
# with the latest effective records from the topic "metadata".
46+
# using the "inner" join strategy and keeping the "metadata" records stored for 14 days in event time.
47+
sdf_joined = sdf_measurements.join_asof(
48+
right=sdf_metadata,
49+
how="inner", # Emit updates only if the match is found in the store.
50+
on_merge="keep-left", # Prefer the columns from the left dataframe if they overlap with the right.
51+
grace_ms=timedelta(days=14), # Keep the state for 14 days (measured in event time similar to windows).
52+
)
53+
54+
if __name__ == '__main__':
55+
app.run()
56+
```
57+
58+
59+
### How it works
60+
61+
Here is a description of the as-of join algorithm:
62+
63+
- Records from the right side get written to the state store without emitting any updates downstream.
64+
- Records on the left side query the right store for the values with the same **key** and the timestamp lower or equal to the record's timestamp.
65+
- If the match is found, the two records are merged together into a new one according to the `on_merge` logic.
66+
- The size of the right store is controlled by the "grace_ms":
67+
a newly added "right" record expires other values with the same key with timestamps below "<current timestamp> - <grace_ms>".
68+
69+
#### Joining strategies
70+
As-of join supports the following joining strategies:
71+
72+
- `inner` - emit the output for the left record only when the match is found (default).
73+
- `left` - emit the output for the left record even without a match.
74+
75+
76+
#### Merging records together
77+
When the match is found, the two records are merged according to the `on_merge` parameter.
78+
79+
Out-of-the-box implementations assume that records are **dictionaries**.
80+
For merging other data types (as well as customizing the behavior) use the callback option.
81+
82+
Possible values:
83+
84+
- `raise` - merge two records together into a new dictionary and raise an exception if the same keys are found in both dictionaries.
85+
This is a default behavior.
86+
87+
- `keep-left` - merge two records together into a new dictionary and prefer keys from the **left** record in case of overlap.
88+
89+
- `keep-right` - merge two records together into a new dictionary and prefer keys from the **right** record in case of overlap.
90+
91+
- custom callback - pass a callback `(<left>, <right>) -> <merged>` to merge the records manually.
92+
Use it when non-dictionary types are expected, or you want to customize the returned object:
93+
94+
```python
95+
from typing import Optional
96+
97+
from quixstreams import Application
98+
99+
app = Application(...)
100+
101+
sdf_measurements = app.dataframe(app.topic("measurements"))
102+
sdf_metadata = app.dataframe(app.topic("metadata"))
103+
104+
105+
def on_merge(left: int, right: Optional[str]) -> dict:
106+
"""
107+
Merge non-dictionary items into a dict
108+
"""
109+
return {'measurement': left, 'metadata': right}
110+
111+
112+
sdf_joined = sdf_measurements.join_asof(right=sdf_metadata, on_merge=on_merge)
113+
114+
if __name__ == '__main__':
115+
app.run()
116+
```
117+
118+
119+
120+
#### State expiration
121+
`StreamingDataFrame.join_asof` stores the right records to the state.
122+
The `grace_ms` parameter regulates the state's lifetime (default - 7 days) to prevent it from growing in size forever.
123+
124+
It shares some similarities with `grace_ms` in [Windows](windowing.md/#lateness-and-out-of-order-processing):
125+
126+
- The timestamps are obtained from the records.
127+
- The join key keeps track of the maximum observed timestamp for **each individual key**.
128+
- The older values get expired only when the larger timestamp gets stored to the state.
129+
130+
Adjust `grace_ms` based on the expected time gap between the left and the right side of the join.
131+
132+
### Limitations
133+
134+
- Joining dataframes belonging to the same topics (aka "self-join") is not supported.
135+
- As-of join preserves headers only for the left dataframe.
136+
If you need headers of the right side records, consider adding them to the value.
137+
138+
## Message ordering between partitions
139+
Joins use [`StreamingDataFrame.concat()`](concatenating.md) under the hood, which means that the application's internal consumer goes into a special "buffered" mode
140+
when the join is used.
141+
142+
In this mode, it buffers messages per partition in order to process them in the timestamp order between different topics.
143+
Timestamp alignment is effective only for the partitions **with the same numbers**: partition zero is aligned with other zero partitions, but not with partition one.
144+
145+
Note that message ordering works only when the messages are consumed from the topics.
146+
If you change timestamps of the record during processing, they will be processed in the original order.

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ nav:
4444
- Windowing: windowing.md
4545
- Aggregations: aggregations.md
4646
- Concatenating Topics: concatenating.md
47+
- Joins: joins.md
4748
- Branching StreamingDataFrames: branching.md
4849
- Configuration: configuration.md
4950

quixstreams/dataframe/dataframe.py

Lines changed: 92 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,15 @@
4949
from quixstreams.models.serializers import DeserializerType, SerializerType
5050
from quixstreams.sinks import BaseSink
5151
from quixstreams.state.base import State
52+
from quixstreams.state.manager import StoreTypes
5253
from quixstreams.utils.printing import (
5354
DEFAULT_COLUMN_NAME,
5455
DEFAULT_LIVE,
5556
DEFAULT_LIVE_SLOWDOWN,
5657
)
5758

58-
from .exceptions import InvalidOperation, TopicPartitionsMismatch
59+
from .exceptions import InvalidOperation
60+
from .joins import JoinAsOf, JoinAsOfHow, OnOverlap
5961
from .registry import DataFrameRegistry
6062
from .series import StreamingSeries
6163
from .utils import ensure_milliseconds
@@ -275,7 +277,7 @@ def func(d: dict, state: State):
275277
Default - `False`.
276278
"""
277279
if stateful:
278-
self._register_store()
280+
self.register_store()
279281
# Force the callback to accept metadata
280282
if metadata:
281283
with_metadata_func = cast(ApplyWithMetadataCallbackStateful, func)
@@ -284,11 +286,7 @@ def func(d: dict, state: State):
284286
cast(ApplyCallbackStateful, func)
285287
)
286288

287-
stateful_func = _as_stateful(
288-
func=with_metadata_func,
289-
processing_context=self._processing_context,
290-
stream_id=self.stream_id,
291-
)
289+
stateful_func = _as_stateful(with_metadata_func, self)
292290
stream = self.stream.add_apply(stateful_func, expand=expand, metadata=True) # type: ignore[call-overload]
293291
else:
294292
stream = self.stream.add_apply(
@@ -384,7 +382,7 @@ def func(values: list, state: State):
384382
:return: the updated StreamingDataFrame instance (reassignment NOT required).
385383
"""
386384
if stateful:
387-
self._register_store()
385+
self.register_store()
388386
# Force the callback to accept metadata
389387
if metadata:
390388
with_metadata_func = cast(UpdateWithMetadataCallbackStateful, func)
@@ -393,11 +391,7 @@ def func(values: list, state: State):
393391
cast(UpdateCallbackStateful, func)
394392
)
395393

396-
stateful_func = _as_stateful(
397-
func=with_metadata_func,
398-
processing_context=self._processing_context,
399-
stream_id=self.stream_id,
400-
)
394+
stateful_func = _as_stateful(with_metadata_func, self)
401395
return self._add_update(stateful_func, metadata=True)
402396
else:
403397
return self._add_update(
@@ -486,7 +480,7 @@ def func(d: dict, state: State):
486480
"""
487481

488482
if stateful:
489-
self._register_store()
483+
self.register_store()
490484
# Force the callback to accept metadata
491485
if metadata:
492486
with_metadata_func = cast(FilterWithMetadataCallbackStateful, func)
@@ -495,11 +489,7 @@ def func(d: dict, state: State):
495489
cast(FilterCallbackStateful, func)
496490
)
497491

498-
stateful_func = _as_stateful(
499-
func=with_metadata_func,
500-
processing_context=self._processing_context,
501-
stream_id=self.stream_id,
502-
)
492+
stateful_func = _as_stateful(with_metadata_func, self)
503493
stream = self.stream.add_filter(stateful_func, metadata=True)
504494
else:
505495
stream = self.stream.add_filter( # type: ignore[call-overload]
@@ -1656,16 +1646,79 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
16561646
*self.topics, *other.topics, stream=merged_stream
16571647
)
16581648

1659-
def ensure_topics_copartitioned(self):
1660-
partitions_counts = set(t.broker_config.num_partitions for t in self._topics)
1661-
if len(partitions_counts) > 1:
1662-
msg = ", ".join(
1663-
f'"{t.name}" ({t.broker_config.num_partitions} partitions)'
1664-
for t in self._topics
1665-
)
1666-
raise TopicPartitionsMismatch(
1667-
f"The underlying topics must have the same number of partitions to use State; got {msg}"
1668-
)
1649+
def join_asof(
1650+
self,
1651+
right: "StreamingDataFrame",
1652+
how: JoinAsOfHow = "inner",
1653+
on_merge: Union[OnOverlap, Callable[[Any, Any], Any]] = "raise",
1654+
grace_ms: Union[int, timedelta] = timedelta(days=7),
1655+
name: Optional[str] = None,
1656+
) -> "StreamingDataFrame":
1657+
"""
1658+
Join the left dataframe with the records of the right dataframe with
1659+
the same key whose timestamp is less than or equal to the left timestamp.
1660+
This join is built with the enrichment use case in mind, where the left side
1661+
represents some measurements and the right side is metadata.
1662+
1663+
To be joined, the underlying topics of the dataframes must have the same number of partitions
1664+
and use the same partitioner (all keys should be distributed across partitions using the same algorithm).
1665+
1666+
Joining dataframes belonging to the same topics (aka "self-join") is not supported as of now.
1667+
1668+
How it works:
1669+
- Records from the right side get written to the state store without emitting any updates downstream.
1670+
- Records on the left side query the right store for the values with the same **key** and the timestamp lower or equal to the record's timestamp.
1671+
Left side emits data downstream.
1672+
- If the match is found, the two records are merged together into a new one according to the `on_merge` logic.
1673+
- The size of the right store is controlled by the "grace_ms":
1674+
a newly added "right" record expires other values with the same key with timestamps below "<current timestamp> - <grace_ms>".
1675+
1676+
:param right: a StreamingDataFrame to join with.
1677+
1678+
:param how: the join strategy. Can be one of:
1679+
- "inner" - emits the result when the match on the right side is found for the left record.
1680+
- "left" - emits the result for each left record even if there is no match on the right side.
1681+
Default - `"inner"`.
1682+
1683+
:param on_merge: how to merge the matched records together assuming they are dictionaries:
1684+
- "raise" - fail with an error if the same keys are found in both dictionaries
1685+
- "keep-left" - prefer the keys from the left record.
1686+
- "keep-right" - prefer the keys from the right record
1687+
- callback - a callback in form "(<left>, <right>) -> <new record>" to merge the records manually.
1688+
Use it to customize the merging logic or when one of the records is not a dictionary.
1689+
1690+
:param grace_ms: how long to keep the right records in the store in event time.
1691+
(the time is taken from the records' timestamps).
1692+
It can be specified as either an `int` representing milliseconds or as a `timedelta` object.
1693+
The records are expired per key when the new record gets added.
1694+
Default - 7 days.
1695+
1696+
:param name: The unique identifier of the underlying state store for the "right" dataframe.
1697+
If not provided, it will be generated based on the underlying topic names.
1698+
Provide a custom name if you need to join the same right dataframe multiple times
1699+
within the application.
1700+
1701+
Example:
1702+
1703+
```python
1704+
from datetime import timedelta
1705+
from quixstreams import Application
1706+
1707+
app = Application()
1708+
1709+
sdf_measurements = app.dataframe(app.topic("measurements"))
1710+
sdf_metadata = app.dataframe(app.topic("metadata"))
1711+
1712+
# Join records from the topic "measurements"
1713+
# with the latest effective records from the topic "metadata"
1714+
# using the "inner" join strategy and keeping the "metadata" records stored for 14 days in event time.
1715+
sdf_joined = sdf_measurements.join_asof(sdf_metadata, how="inner", grace_ms=timedelta(days=14))
1716+
```
1717+
1718+
"""
1719+
return JoinAsOf(
1720+
how=how, on_merge=on_merge, grace_ms=grace_ms, store_name=name
1721+
).join(self, right)
16691722

16701723
def _produce(
16711724
self,
@@ -1689,17 +1742,19 @@ def _add_update(
16891742
self._stream = self._stream.add_update(func, metadata=metadata) # type: ignore[call-overload]
16901743
return self
16911744

1692-
def _register_store(self):
1745+
def register_store(self, store_type: Optional[StoreTypes] = None) -> None:
16931746
"""
16941747
Register the default store for the current stream_id in StateStoreManager.
16951748
"""
1696-
self.ensure_topics_copartitioned()
1749+
TopicManager.ensure_topics_copartitioned(*self._topics)
16971750

16981751
# Generate a changelog topic config based on the underlying topics.
16991752
changelog_topic_config = self._topic_manager.derive_topic_config(self._topics)
17001753

17011754
self._processing_context.state_manager.register_store(
1702-
stream_id=self.stream_id, changelog_config=changelog_topic_config
1755+
stream_id=self.stream_id,
1756+
store_type=store_type,
1757+
changelog_config=changelog_topic_config,
17031758
)
17041759

17051760
def _groupby_key(
@@ -1847,19 +1902,16 @@ def wrapper(
18471902

18481903
def _as_stateful(
18491904
func: Callable[[Any, Any, int, Any, State], T],
1850-
processing_context: ProcessingContext,
1851-
stream_id: str,
1905+
sdf: StreamingDataFrame,
18521906
) -> Callable[[Any, Any, int, Any], T]:
18531907
@functools.wraps(func)
18541908
def wrapper(value: Any, key: Any, timestamp: int, headers: Any) -> Any:
1855-
ctx = message_context()
1856-
transaction = processing_context.checkpoint.get_store_transaction(
1857-
stream_id=stream_id,
1858-
partition=ctx.partition,
1859-
)
18601909
# Pass a State object with an interface limited to the key updates only
18611910
# and prefix all the state keys by the message key
1862-
state = transaction.as_state(prefix=key)
1911+
state = sdf.processing_context.checkpoint.get_store_transaction(
1912+
stream_id=sdf.stream_id,
1913+
partition=message_context().partition,
1914+
).as_state(prefix=key)
18631915
return func(value, key, timestamp, headers, state)
18641916

18651917
return wrapper

quixstreams/dataframe/exceptions.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
"ColumnDoesNotExist",
88
"StreamingDataFrameDuplicate",
99
"GroupByDuplicate",
10-
"TopicPartitionsMismatch",
1110
)
1211

1312

@@ -27,6 +26,3 @@ class GroupByDuplicate(QuixException): ...
2726

2827

2928
class StreamingDataFrameDuplicate(QuixException): ...
30-
31-
32-
class TopicPartitionsMismatch(QuixException): ...
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .join_asof import JoinAsOf as JoinAsOf
2+
from .join_asof import JoinAsOfHow as JoinAsOfHow
3+
from .join_asof import OnOverlap as OnOverlap

0 commit comments

Comments
 (0)