File tree 2 files changed +9
-5
lines changed
tests/test_quixstreams/test_dataframe
2 files changed +9
-5
lines changed Original file line number Diff line number Diff line change @@ -70,6 +70,7 @@ def register_groupby(
70
70
Register a "groupby" SDF, which is one generated with `SDF.group_by()`.
71
71
:param source_sdf: the SDF used by `sdf.group_by()`
72
72
:param new_sdf: the SDF generated by `sdf.group_by()`.
73
+ :param register_new_root: whether to register the new SDF as a root SDF.
73
74
"""
74
75
if source_sdf .stream_id in self ._repartition_origins :
75
76
raise GroupByNestingLimit (
@@ -78,7 +79,7 @@ def register_groupby(
78
79
79
80
if new_sdf .stream_id in self ._repartition_origins :
80
81
raise GroupByDuplicate (
81
- "A `SDF.group_by()` operation appears to be the same as another, "
82
+ "An `SDF.group_by()` operation appears to be the same as another, "
82
83
"either from using the same column or name parameter; "
83
84
"adjust by setting a unique name with `SDF.group_by(name=<NAME>)` "
84
85
)
@@ -90,7 +91,7 @@ def register_groupby(
90
91
self .register_root (new_sdf )
91
92
except StreamingDataFrameDuplicate :
92
93
raise GroupByDuplicate (
93
- "A `SDF.group_by()` operation appears to be the same as another, "
94
+ "An `SDF.group_by()` operation appears to be the same as another, "
94
95
"either from using the same column or name parameter; "
95
96
"adjust by setting a unique name with `SDF.group_by(name=<NAME>)` "
96
97
)
Original file line number Diff line number Diff line change @@ -1682,10 +1682,11 @@ def test_group_by_column(
1682
1682
sdf = sdf .group_by (col )
1683
1683
sdf [col ] = col_update
1684
1684
1685
- groupby_topic = sdf .topics [0 ]
1686
1685
if num_partitions == 1 :
1686
+ groupby_topic = topic
1687
1687
assert sdf_registry .consumer_topics == [topic ]
1688
1688
else :
1689
+ groupby_topic = sdf .topics [0 ]
1689
1690
assert sdf_registry .consumer_topics == [topic , groupby_topic ]
1690
1691
assert groupby_topic .name .startswith ("repartition__" )
1691
1692
@@ -1773,10 +1774,11 @@ def test_group_by_column_with_name(
1773
1774
sdf = sdf .group_by (col , name = op_name )
1774
1775
sdf [col ] = col_update
1775
1776
1776
- groupby_topic = sdf .topics [0 ]
1777
1777
if num_partitions == 1 :
1778
+ groupby_topic = topic
1778
1779
assert sdf_registry .consumer_topics == [topic ]
1779
1780
else :
1781
+ groupby_topic = sdf .topics [0 ]
1780
1782
assert sdf_registry .consumer_topics == [topic , groupby_topic ]
1781
1783
assert groupby_topic .name .startswith ("repartition__" )
1782
1784
@@ -1864,10 +1866,11 @@ def test_group_by_func(
1864
1866
sdf = sdf .group_by (lambda v : v [col ], name = op_name )
1865
1867
sdf [col ] = col_update
1866
1868
1867
- groupby_topic = sdf .topics [0 ]
1868
1869
if num_partitions == 1 :
1870
+ groupby_topic = topic
1869
1871
assert sdf_registry .consumer_topics == [topic ]
1870
1872
else :
1873
+ groupby_topic = sdf .topics [0 ]
1871
1874
assert sdf_registry .consumer_topics == [topic , groupby_topic ]
1872
1875
assert groupby_topic .name .startswith ("repartition__" )
1873
1876
You can’t perform that action at this time.
0 commit comments