1
- from typing import Any , Dict , List
1
+ from typing import Any , Dict , List , Optional
2
2
3
3
from confluent_kafka import Consumer , Message , TopicPartition # type: ignore
4
4
from confluent_kafka .admin import TopicMetadata # type: ignore
8
8
from dlt .common .configuration import configspec
9
9
from dlt .common .configuration .specs import CredentialsConfiguration
10
10
from dlt .common .time import ensure_pendulum_datetime
11
- from dlt .common .typing import DictStrAny , TSecretValue , TAnyDateTime
11
+ from dlt .common .typing import DictStrAny , TSecretValue
12
12
from dlt .common .utils import digest128
13
13
14
14
@@ -218,9 +218,11 @@ class KafkaCredentials(CredentialsConfiguration):
218
218
bootstrap_servers : str = config .value
219
219
group_id : str = config .value
220
220
security_protocol : str = config .value
221
- sasl_mechanisms : str = config .value
222
- sasl_username : str = config .value
223
- sasl_password : TSecretValue = secrets .value
221
+
222
+ # Optional SASL credentials
223
+ sasl_mechanisms : Optional [str ] = config .value
224
+ sasl_username : Optional [str ] = config .value
225
+ sasl_password : Optional [TSecretValue ] = secrets .value
224
226
225
227
def init_consumer (self ) -> Consumer :
226
228
"""Init a Kafka consumer from this credentials.
@@ -232,9 +234,16 @@ def init_consumer(self) -> Consumer:
232
234
"bootstrap.servers" : self .bootstrap_servers ,
233
235
"group.id" : self .group_id ,
234
236
"security.protocol" : self .security_protocol ,
235
- "sasl.mechanisms" : self .sasl_mechanisms ,
236
- "sasl.username" : self .sasl_username ,
237
- "sasl.password" : self .sasl_password ,
238
237
"auto.offset.reset" : "earliest" ,
239
238
}
239
+
240
+ if self .sasl_mechanisms and self .sasl_username and self .sasl_password :
241
+ config .update (
242
+ {
243
+ "sasl.mechanisms" : self .sasl_mechanisms ,
244
+ "sasl.username" : self .sasl_username ,
245
+ "sasl.password" : self .sasl_password ,
246
+ }
247
+ )
248
+
240
249
return Consumer (config )
0 commit comments