Skip to content

Commit be3d5f5

Browse files
committed
[kafka] feat: Making SASL variables optional, as not every Kafka setup may have them
1 parent d29f1b9 commit be3d5f5

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

sources/kafka/helpers.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, Dict, List
1+
from typing import Any, Dict, List, Optional
22

33
from confluent_kafka import Consumer, Message, TopicPartition # type: ignore
44
from confluent_kafka.admin import TopicMetadata # type: ignore
@@ -8,7 +8,7 @@
88
from dlt.common.configuration import configspec
99
from dlt.common.configuration.specs import CredentialsConfiguration
1010
from dlt.common.time import ensure_pendulum_datetime
11-
from dlt.common.typing import DictStrAny, TSecretValue, TAnyDateTime
11+
from dlt.common.typing import DictStrAny, TSecretValue
1212
from dlt.common.utils import digest128
1313

1414

@@ -231,9 +231,11 @@ class KafkaCredentials(CredentialsConfiguration):
231231
bootstrap_servers: str = config.value
232232
group_id: str = config.value
233233
security_protocol: str = config.value
234-
sasl_mechanisms: str = config.value
235-
sasl_username: str = config.value
236-
sasl_password: TSecretValue = secrets.value
234+
235+
# Optional SASL credentials
236+
sasl_mechanisms: Optional[str] = config.value
237+
sasl_username: Optional[str] = config.value
238+
sasl_password: Optional[TSecretValue] = secrets.value
237239

238240
def init_consumer(self) -> Consumer:
239241
"""Init a Kafka consumer from this credentials.
@@ -245,9 +247,16 @@ def init_consumer(self) -> Consumer:
245247
"bootstrap.servers": self.bootstrap_servers,
246248
"group.id": self.group_id,
247249
"security.protocol": self.security_protocol,
248-
"sasl.mechanisms": self.sasl_mechanisms,
249-
"sasl.username": self.sasl_username,
250-
"sasl.password": self.sasl_password,
251250
"auto.offset.reset": "earliest",
252251
}
252+
253+
if self.sasl_mechanisms and self.sasl_username and self.sasl_password:
254+
config.update(
255+
{
256+
"sasl.mechanisms": self.sasl_mechanisms,
257+
"sasl.username": self.sasl_username,
258+
"sasl.password": self.sasl_password,
259+
}
260+
)
261+
253262
return Consumer(config)

0 commit comments

Comments
 (0)