Skip to content

Commit 869f421

Browse files
committed
Self joins not supported
1 parent 795f95f commit 869f421

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

quixstreams/dataframe/dataframe.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1661,6 +1661,12 @@ def join_latest(
16611661
merger: Optional[Callable[[Any, Any], Any]] = None,
16621662
retention_ms: Union[int, timedelta] = timedelta(days=7),
16631663
) -> "StreamingDataFrame":
1664+
if self.stream_id == right.stream_id:
1665+
raise ValueError(
1666+
"Joining dataframes originating from "
1667+
"the same topic is not yet supported.",
1668+
)
1669+
16641670
if how not in get_args(JoinHow):
16651671
raise ValueError(
16661672
f"Invalid how value: {how}. "

tests/test_quixstreams/test_dataframe/test_dataframe.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2904,3 +2904,19 @@ def test_retention_ms(
29042904

29052905
assert publish_left(timestamp=4) == []
29062906
assert publish_left(timestamp=5) == [({"left": 4, "right": 2}, b"key", 5, None)]
2907+
2908+
def test_self_join_not_supported(self, create_topic, create_sdf):
2909+
topic = create_topic()
2910+
match = (
2911+
"Joining dataframes originating from the same topic is not yet supported."
2912+
)
2913+
2914+
# The very same sdf object
2915+
sdf = create_sdf(topic)
2916+
with pytest.raises(ValueError, match=match):
2917+
sdf.join_latest(sdf)
2918+
2919+
# Same topic, different branch
2920+
sdf2 = sdf.apply(lambda v: v)
2921+
with pytest.raises(ValueError, match=match):
2922+
sdf.join_latest(sdf2)

0 commit comments

Comments
 (0)