1
+ use crate :: common:: utils:: {
2
+ scylla_supports_tablets, setup_tracing, test_with_3_node_cluster, unique_keyspace_name, create_new_session_builder,
3
+ PerformDDL ,
4
+ } ;
5
+ use scylla:: client:: execution_profile:: ExecutionProfile ;
6
+ use scylla:: policies:: load_balancing:: { DefaultPolicy , LatencyAwarenessBuilder , LoadBalancingPolicy } ;
7
+ use scylla:: policies:: retry:: FallthroughRetryPolicy ;
8
+ use scylla:: client:: session:: Session ;
9
+ use scylla_cql:: frame:: protocol_features:: ProtocolFeatures ;
10
+ use scylla_cql:: frame:: types;
11
+ use std:: sync:: Arc ;
12
+ use tokio:: sync:: mpsc;
13
+
14
+ use scylla_proxy:: {
15
+ Condition , ProxyError , Reaction , RequestFrame , RequestOpcode , RequestReaction , RequestRule ,
16
+ ResponseOpcode , ResponseReaction , ResponseRule , ShardAwareness , TargetShard , WorkerError ,
17
+ } ;
18
+
19
+ async fn test_query_routed_according_to_policy ( policy : Arc < dyn LoadBalancingPolicy > , tablets_enabled : bool , _running_nodes_amount : i32 ) {
20
+ // Test that DefaultPolicy was as expected for LWT and regular queries for tablets and vnodes
21
+ // policy: DefaultPolicy object defined according to test
22
+ // supports_tablets: True (tablets test) / False (vnodes test)
23
+ // running_nodes_amount: cluster state - how much alive nodes (maximum is 3)
24
+ use scylla:: client:: session_builder:: SessionBuilder ;
25
+ setup_tracing ( ) ;
26
+
27
+ // This is just to increase the likelihood that only intended prepared statements (which contain this mark) are captured by the proxy.
28
+ const MAGIC_MARK : i32 = 123 ;
29
+
30
+ let res = test_with_3_node_cluster ( ShardAwareness :: QueryNode , |proxy_uris, translation_map, mut running_proxy| async move {
31
+
32
+ // We set up proxy, so that it informs us (via supported_rx) about cluster's Supported features (including LWT optimisation mark),
33
+ // and also passes us information about which node was queried (via prepared_rx).
34
+
35
+ let ( supported_tx, mut supported_rx) = mpsc:: unbounded_channel ( ) ;
36
+
37
+ running_proxy. running_nodes [ 0 ] . change_response_rules ( Some ( vec ! [ ResponseRule (
38
+ Condition :: ResponseOpcode ( ResponseOpcode :: Supported ) ,
39
+ ResponseReaction :: noop( ) . with_feedback_when_performed( supported_tx)
40
+ ) ] ) ) ;
41
+
42
+ let prepared_rule = |tx| RequestRule (
43
+ Condition :: and ( Condition :: RequestOpcode ( RequestOpcode :: Execute ) , Condition :: BodyContainsCaseSensitive ( Box :: new ( MAGIC_MARK . to_be_bytes ( ) ) ) ) ,
44
+ RequestReaction :: noop ( ) . with_feedback_when_performed ( tx)
45
+ ) ;
46
+
47
+ let mut prepared_rxs = [ 0 , 1 , 2 ] . map ( |i| {
48
+ let ( prepared_tx, prepared_rx) = mpsc:: unbounded_channel ( ) ;
49
+ running_proxy. running_nodes [ i] . change_request_rules ( Some ( vec ! [ prepared_rule( prepared_tx) ] ) ) ;
50
+ prepared_rx
51
+ } ) ;
52
+
53
+ let handle = ExecutionProfile :: builder ( )
54
+ . load_balancing_policy ( policy)
55
+ . retry_policy ( Arc :: new ( FallthroughRetryPolicy ) )
56
+ . build ( )
57
+ . into_handle ( ) ;
58
+
59
+ // DB preparation phase
60
+ let session: Session = SessionBuilder :: new ( )
61
+ . known_node ( proxy_uris[ 0 ] . as_str ( ) )
62
+ . default_execution_profile_handle ( handle)
63
+ . address_translator ( Arc :: new ( translation_map) )
64
+ . build ( )
65
+ . await
66
+ . unwrap ( ) ;
67
+
68
+ let ( supported_frame, _shard) = supported_rx. recv ( ) . await . unwrap ( ) ;
69
+ let supported_options = types:: read_string_multimap ( & mut & * supported_frame. body ) . unwrap ( ) ;
70
+ let supported_features = ProtocolFeatures :: parse_from_supported ( & supported_options) ;
71
+
72
+ // This will branch our test for cases both when cluster supports the optimisations and when it does not.
73
+ let supports_optimisation_mark = supported_features. lwt_optimization_meta_bit_mask . is_some ( ) ;
74
+
75
+ // Create schema
76
+ let ks = unique_keyspace_name ( ) ;
77
+
78
+ // Enable or disable tablets for the keyspace
79
+ let mut tablets_clause = "" ;
80
+ if scylla_supports_tablets ( & session) . await {
81
+ if tablets_enabled{
82
+ tablets_clause = " AND tablets = {'enabled': true}" ;
83
+ } else {
84
+ tablets_clause = " AND tablets = {'enabled': false}" ;
85
+ }
86
+ }
87
+
88
+ let create_ks = format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}{}" , ks, tablets_clause) ;
89
+
90
+ session. ddl ( create_ks) . await . unwrap ( ) ;
91
+ session. use_keyspace ( ks, false ) . await . unwrap ( ) ;
92
+
93
+ session
94
+ . ddl ( "CREATE TABLE t (a int primary key, b int)" )
95
+ . await
96
+ . unwrap ( ) ;
97
+
98
+ type Rxs = [ mpsc:: UnboundedReceiver < ( RequestFrame , Option < TargetShard > ) > ; 3 ] ;
99
+
100
+ fn clear_rxs ( rxs : & mut Rxs ) {
101
+ for rx in rxs. iter_mut ( ) {
102
+ while rx. try_recv ( ) . is_ok ( ) { }
103
+ }
104
+ }
105
+
106
+ fn get_num_queried ( rxs : & mut Rxs ) -> usize {
107
+ let num_queried = rxs. iter_mut ( ) . filter_map ( |rx| rx. try_recv ( ) . ok ( ) ) . count ( ) ;
108
+ clear_rxs ( rxs) ;
109
+ num_queried
110
+ }
111
+
112
+ fn assert_multiple_replicas_queried ( rxs : & mut Rxs ) {
113
+ let num_queried = get_num_queried ( rxs) ;
114
+
115
+ assert ! ( num_queried > 1 ) ;
116
+ }
117
+
118
+ fn assert_one_replica_queried ( rxs : & mut Rxs ) {
119
+ let num_queried = get_num_queried ( rxs) ;
120
+
121
+ assert ! ( num_queried == 1 ) ;
122
+ }
123
+
124
+ #[ allow( unused) ]
125
+ fn who_was_queried ( rxs : & mut Rxs ) {
126
+ for ( i, rx) in rxs. iter_mut ( ) . enumerate ( ) {
127
+ if rx. try_recv ( ) . is_ok ( ) {
128
+ println ! ( "{} was queried." , i) ;
129
+ return ;
130
+ }
131
+ }
132
+ println ! ( "NOBODY was queried!" ) ;
133
+ }
134
+
135
+ // We will check which nodes were queried, for both LWT and non-LWT prepared statements.
136
+ let prepared_non_lwt = session. prepare ( "INSERT INTO t (a, b) VALUES (?, 1)" ) . await . unwrap ( ) ;
137
+ let prepared_lwt = session. prepare ( "UPDATE t SET b=3 WHERE a=? IF b=2" ) . await . unwrap ( ) ;
138
+
139
+ if supports_optimisation_mark {
140
+ // We make sure that the driver properly marked prepared statements wrt being LWT.
141
+ assert ! ( !prepared_non_lwt. is_confirmed_lwt( ) ) ;
142
+ assert ! ( prepared_lwt. is_confirmed_lwt( ) ) ;
143
+ }
144
+
145
+ assert ! ( prepared_non_lwt. is_token_aware( ) ) ;
146
+ assert ! ( prepared_lwt. is_token_aware( ) ) ;
147
+
148
+ // We execute non-LWT statements and ensure that multiple nodes were queried.
149
+ //
150
+ // Note that our DefaultPolicy no longer performs round robin, but instead randomly picks a replica.
151
+ // To see multiple replicas here, we cannot choose a fixed pick seed, so we must rely on randomness.
152
+ // It happened several times in CI that *not all* replicas were queried, but now we only
153
+ // assert that *more than one* replica is queried. Moreover, we increased iterations
154
+ // from 15 to 30 in hope this will suffice to prevent flakiness.
155
+ // Alternatively, we could give up this part of the test and only test LWT part, but then
156
+ // we couldn't be sure that in non-LWT case the driver truly chooses various replicas.
157
+ for _ in 0 ..30 {
158
+ session. execute_unpaged ( & prepared_non_lwt, ( MAGIC_MARK , ) ) . await . unwrap ( ) ;
159
+ }
160
+
161
+ assert_multiple_replicas_queried ( & mut prepared_rxs) ;
162
+
163
+ // We execute LWT statements, and...
164
+ for _ in 0 ..15 {
165
+ session. execute_unpaged ( & prepared_lwt, ( MAGIC_MARK , ) ) . await . unwrap ( ) ;
166
+ }
167
+
168
+ if supports_optimisation_mark {
169
+ // ...if cluster supports LWT, we assert that one replica was always queried first (and effectively only that one was queried).
170
+ assert_one_replica_queried ( & mut prepared_rxs) ;
171
+ } else {
172
+ // ...else we assert that replicas were shuffled as in case of non-LWT.
173
+ assert_multiple_replicas_queried ( & mut prepared_rxs) ;
174
+ }
175
+
176
+ running_proxy
177
+ } ) . await ;
178
+
179
+ match res {
180
+ Ok ( ( ) ) => ( ) ,
181
+ Err ( ProxyError :: Worker ( WorkerError :: DriverDisconnected ( _) ) ) => ( ) ,
182
+ Err ( err) => panic ! ( "{}" , err) ,
183
+ }
184
+ }
185
+
186
+ #[ tokio:: test]
187
+ #[ ntest:: timeout( 20000 ) ]
188
+ #[ cfg( not( scylla_cloud_tests) ) ]
189
+ #[ cfg_attr( not( ccm_tests) , ignore) ]
190
+ async fn query_routed_optimally_with_tablets_and_policy_token_aware_on_preferences_none_permit_failover_on ( ) {
191
+ let policy = DefaultPolicy :: builder ( )
192
+ . token_aware ( true )
193
+ . permit_dc_failover ( true )
194
+ . build ( ) ;
195
+ test_query_routed_according_to_policy ( policy, true , 3 ) ;
196
+ }
197
+
198
+ #[ tokio:: test]
199
+ #[ ntest:: timeout( 20000 ) ]
200
+ #[ cfg( not( scylla_cloud_tests) ) ]
201
+ #[ cfg_attr( not( ccm_tests) , ignore) ]
202
+ async fn query_routed_optimally_with_tablets_and_policy_token_aware_on_preferences_none_permit_failover_off ( ) {
203
+ let policy = DefaultPolicy :: builder ( )
204
+ . token_aware ( true )
205
+ . permit_dc_failover ( false )
206
+ . build ( ) ;
207
+ test_query_routed_according_to_policy ( policy, true , 3 ) ;
208
+ }
209
+
210
+ #[ tokio:: test]
211
+ #[ ntest:: timeout( 20000 ) ]
212
+ #[ cfg( not( scylla_cloud_tests) ) ]
213
+ #[ cfg_attr( not( ccm_tests) , ignore) ]
214
+ async fn query_routed_optimally_with_tablets_and_policy_token_aware_on_preferences_dc_permit_failover_on ( ) {
215
+ let policy = DefaultPolicy :: builder ( )
216
+ . prefer_datacenter ( "dc1" . to_string ( ) )
217
+ . token_aware ( true )
218
+ . permit_dc_failover ( false )
219
+ . build ( ) ;
220
+ test_query_routed_according_to_policy ( policy, true , 3 ) ;
221
+ }
222
+
223
+ // This is a regression test for #696.
224
+ #[ tokio:: test]
225
+ #[ ntest:: timeout( 2000 ) ]
226
+ async fn latency_aware_query_completes ( ) {
227
+ setup_tracing ( ) ;
228
+ let policy = DefaultPolicy :: builder ( )
229
+ . latency_awareness ( LatencyAwarenessBuilder :: default ( ) )
230
+ . build ( ) ;
231
+ let handle = ExecutionProfile :: builder ( )
232
+ . load_balancing_policy ( policy)
233
+ . build ( )
234
+ . into_handle ( ) ;
235
+
236
+ let session = create_new_session_builder ( )
237
+ . default_execution_profile_handle ( handle)
238
+ . build ( )
239
+ . await
240
+ . unwrap ( ) ;
241
+
242
+ session. query_unpaged ( "whatever" , ( ) ) . await . unwrap_err ( ) ;
243
+ }
0 commit comments