Skip to content

Commit 57a8223

Browse files
committed
Add mTLS support
Added mTLS support to set up OpenSearch connection with client certificate Signed-off-by: Andrey Pleskach <ples@aiven.io>
1 parent 8ed2745 commit 57a8223

File tree

10 files changed

+800
-87
lines changed

10 files changed

+800
-87
lines changed

build.gradle

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ plugins {
3535
id "com.diffplug.spotless" version "6.25.0"
3636
}
3737

38+
idea {
39+
module {
40+
downloadSources = true
41+
}
42+
}
43+
3844
wrapper {
3945
distributionType = 'ALL'
4046
doLast {
@@ -173,6 +179,9 @@ dependencies {
173179
implementation "org.slf4j:slf4j-api:$slf4jVersion"
174180
implementation "com.google.code.gson:gson:2.10.1"
175181
implementation "org.opensearch.client:opensearch-rest-high-level-client:$openSearchVersion"
182+
implementation "org.bouncycastle:bcprov-jdk18on:1.78.1"
183+
implementation "org.bouncycastle:bcpkix-jdk18on:1.78.1"
184+
176185

177186
testImplementation "org.junit.jupiter:junit-jupiter:5.10.2"
178187
testImplementation "org.mockito:mockito-core:5.11.0"
@@ -184,6 +193,7 @@ dependencies {
184193
testImplementation "com.fasterxml.jackson.core:jackson-core:2.17.0"
185194
testImplementation "com.fasterxml.jackson.core:jackson-databind:2.17.0"
186195
testImplementation "com.fasterxml.jackson.core:jackson-annotations:2.17.0"
196+
testImplementation "org.apache.commons:commons-lang3:3.14.0"
187197
testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
188198

189199
integrationTestImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"

docs/opensearch-sink-connector-config-options.rst

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,4 +231,78 @@ Authentication
231231
* Default: null
232232
* Importance: medium
233233

234+
SSL Client Settings
235+
^^^^^^^^^^^^^^^^^^^
236+
237+
``connection.ssl.protocol.type``
238+
SSL protocol type. Default value is TLSv1.3, supported are: TLSv1.2, TLSv1.3
239+
240+
* Type: string
241+
* Default: TLSv1.3
242+
* Valid Values: TLSv1.2, TLSv1.3
243+
* Importance: medium
244+
245+
``connection.access.key.password``
246+
User access key password
247+
248+
* Type: password
249+
* Default: null
250+
* Importance: medium
251+
252+
``connection.ca.certificate.location``
253+
Path to X.509 root CAs file (PEM format)
254+
255+
* Type: string
256+
* Default: null
257+
* Importance: medium
258+
259+
``connection.access.certificate.location``
260+
Path to X.509 user access certificate file (PEM format)
261+
262+
* Type: string
263+
* Default: null
264+
* Importance: medium
265+
266+
``connection.access.key.location``
267+
Path to the user certificate’s keys (PKCS #8) file (PEM format)
268+
269+
* Type: string
270+
* Default: null
271+
* Importance: medium
272+
273+
``connection.truststore.location``
274+
Path to the Truststore file (JKS format)
275+
276+
* Type: string
277+
* Default: null
278+
* Importance: medium
279+
280+
``connection.truststore.password``
281+
Truststore password
282+
283+
* Type: password
284+
* Default: null
285+
* Importance: medium
286+
287+
``connection.keystore.location``
288+
Path to the Keystore file (PKCS12/PFX format)
289+
290+
* Type: string
291+
* Default: null
292+
* Importance: medium
293+
294+
``connection.keystore.type``
295+
Keystore type. The default is JKS. Supported values are: JKS, PKCS12 or PFX
296+
297+
* Type: string
298+
* Default: JKS
299+
* Importance: medium
300+
301+
``connection.keystore.password``
302+
Keystore password
303+
304+
* Type: password
305+
* Default: null
306+
* Importance: medium
307+
234308

src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
*/
1616
package io.aiven.kafka.connect.opensearch;
1717

18-
import static io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator.CONNECTION_PASSWORD_CONFIG;
19-
import static io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator.CONNECTION_USERNAME_CONFIG;
2018
import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
19+
import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CONNECTION_PASSWORD_CONFIG;
20+
import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CONNECTION_USERNAME_CONFIG;
2121

2222
import java.io.IOException;
2323
import java.io.UncheckedIOException;

src/main/java/io/aiven/kafka/connect/opensearch/OpensearchBasicAuthConfigurator.java

Lines changed: 0 additions & 83 deletions
This file was deleted.
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.aiven.kafka.connect.opensearch.auth;
17+
18+
import static io.aiven.kafka.connect.opensearch.auth.SSLContextBuilder.SUPPORTED_PROTOCOLS;
19+
20+
import java.util.Objects;
21+
22+
import org.apache.kafka.common.config.ConfigDef;
23+
import org.apache.kafka.common.config.ConfigDef.Importance;
24+
import org.apache.kafka.common.config.ConfigDef.Type;
25+
import org.apache.kafka.common.config.ConfigDef.Width;
26+
import org.apache.kafka.common.config.ConfigException;
27+
import org.apache.kafka.common.config.types.Password;
28+
29+
import io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig;
30+
import io.aiven.kafka.connect.opensearch.spi.ConfigDefContributor;
31+
import io.aiven.kafka.connect.opensearch.spi.OpensearchClientConfigurator;
32+
33+
import org.apache.http.auth.AuthScope;
34+
import org.apache.http.auth.UsernamePasswordCredentials;
35+
import org.apache.http.impl.client.BasicCredentialsProvider;
36+
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
37+
38+
/**
39+
* Adds basic authentication to the {@index HttpAsyncClientBuilder} for Opensearch client if configured.
40+
*/
41+
public class OpensearchBasicAuthConfigurator implements OpensearchClientConfigurator, ConfigDefContributor {
42+
43+
private final static String SSL_SETTINGS_GROUP_NAME = "SSL Client Settings";
44+
45+
public static final String CONNECTION_USERNAME_CONFIG = "connection.username";
46+
private static final String CONNECTION_USERNAME_DOC = "The username used to authenticate with OpenSearch. "
47+
+ "The default is the null, and authentication will only be performed if "
48+
+ " both the username and password are non-null.";
49+
public static final String CONNECTION_PASSWORD_CONFIG = "connection.password";
50+
private static final String CONNECTION_PASSWORD_DOC = "The password used to authenticate with OpenSearch. "
51+
+ "The default is the null, and authentication will only be performed if "
52+
+ " both the username and password are non-null.";
53+
54+
public static final String CLIENT_SSL_PROTOCOL_TYPE = "connection.ssl.protocol.type";
55+
public static final String CLIENT_SSL_PROTOCOL_TYPE_DOC = "SSL protocol type. Default value is "
56+
+ SSLContextBuilder.TLS_1_3 + ", supported are: " + SSLContextBuilder.TLS_1_2 + ", "
57+
+ SSLContextBuilder.TLS_1_3;
58+
59+
public static final String CLIENT_SSL_CA_CERTIFICATE_LOCATION = "connection.ca.certificate.location";
60+
private static final String CLIENT_SSL_CA_CERTIFICATE_LOCATION_DOC = "Path to X.509 root CAs file (PEM format)";
61+
62+
public static final String CLIENT_SSL_ACCESS_CERTIFICATE_LOCATION = "connection.access.certificate.location";
63+
private static final String CLIENT_SSL_ACCESS_CERTIFICATE_LOCATION_DOC = "Path to X.509 user access certificate file (PEM format)";
64+
65+
public static final String CLIENT_SSL_ACCESS_KEY_LOCATION = "connection.access.key.location";
66+
private static final String CLIENT_SSL_ACCESS_KEY_LOCATION_DOC = "Path to the user certificate’s keys (PKCS #8) file (PEM format)";
67+
68+
public static final String CLIENT_SSL_ACCESS_KEY_PASSWORD = "connection.access.key.password";
69+
private static final String CLIENT_SSL_ACCESS_KEY_PASSWORD_DOC = "User access key password";
70+
71+
public static final String CLIENT_SSL_TRUSTSTORE_LOCATION = "connection.truststore.location";
72+
private static final String CLIENT_SSL_TRUSTSTORE_LOCATION_DOC = "Path to the Truststore file (JKS format)";
73+
74+
public static final String CLIENT_SSL_TRUSTSTORE_PASSWORD = "connection.truststore.password";
75+
private static final String CLIENT_SSL_TRUSTSTORE_PASSWORD_DOC = "Truststore password";
76+
77+
public static final String CLIENT_SSL_KEYSTORE_LOCATION = "connection.keystore.location";
78+
private static final String CLIENT_SSL_KEYSTORE_LOCATION_DOC = "Path to the Keystore file (PKCS12/PFX format)";
79+
80+
public static final String CLIENT_SSL_KEYSTORE_TYPE = "connection.keystore.type";
81+
private static final String CLIENT_SSL_KEYSTORE_TYPE_DOC = "Keystore type. The default is JKS. Supported values are: JKS, PKCS12 or PFX";
82+
83+
public static final String CLIENT_SSL_KEYSTORE_PASSWORD = "connection.keystore.password";
84+
private static final String CLIENT_SSL_KEYSTORE_PASSWORD_DOC = "Keystore password";
85+
86+
private static final String SUPPORTED_SSL_PROTOCOLS_MESSAGE = String.join(", ", SUPPORTED_PROTOCOLS);
87+
88+
@Override
89+
public boolean apply(final OpensearchSinkConnectorConfig config, final HttpAsyncClientBuilder builder) {
90+
if (!isAuthenticatedConnection(config)) {
91+
return false;
92+
}
93+
94+
final var credentialsProvider = new BasicCredentialsProvider();
95+
for (final var httpHost : config.httpHosts()) {
96+
credentialsProvider.setCredentials(new AuthScope(httpHost),
97+
new UsernamePasswordCredentials(connectionUsername(config), connectionPassword(config).value()));
98+
}
99+
SSLContextBuilder.buildSSLContext(config).map(builder::setSSLContext);
100+
return true;
101+
}
102+
103+
@Override
104+
public void addConfig(final ConfigDef config) {
105+
int order = -1;
106+
config.define(CONNECTION_USERNAME_CONFIG, Type.STRING, null, Importance.MEDIUM, CONNECTION_USERNAME_DOC,
107+
"Authentication", order++, Width.SHORT, "Connection Username")
108+
.define(CONNECTION_PASSWORD_CONFIG, Type.PASSWORD, null, Importance.MEDIUM, CONNECTION_PASSWORD_DOC,
109+
"Authentication", order++, Width.SHORT, "Connection Password");
110+
111+
// Common SSL settings
112+
config.define(CLIENT_SSL_PROTOCOL_TYPE, Type.STRING, SSLContextBuilder.TLS_1_3, new ConfigDef.Validator() {
113+
@Override
114+
public void ensureValid(String name, Object value) {
115+
assert value instanceof String;
116+
final var s = (String) value;
117+
if (!SSLContextBuilder.TLS_1_3.equalsIgnoreCase(s) && !SSLContextBuilder.TLS_1_2.equalsIgnoreCase(s)) {
118+
throw new ConfigException("Unsupported SSL protocol type " + s + ". Supported are: "
119+
+ SUPPORTED_SSL_PROTOCOLS_MESSAGE);
120+
}
121+
}
122+
123+
@Override
124+
public String toString() {
125+
return SUPPORTED_SSL_PROTOCOLS_MESSAGE;
126+
}
127+
}, Importance.MEDIUM, CLIENT_SSL_PROTOCOL_TYPE_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT,
128+
"SSL protocol type")
129+
.define(CLIENT_SSL_ACCESS_KEY_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
130+
CLIENT_SSL_ACCESS_KEY_PASSWORD_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT,
131+
"User access key password");
132+
// PEM Certificates settings
133+
config.define(CLIENT_SSL_CA_CERTIFICATE_LOCATION, Type.STRING, null, Importance.MEDIUM,
134+
CLIENT_SSL_CA_CERTIFICATE_LOCATION_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT, "Root CAs")
135+
.define(CLIENT_SSL_ACCESS_CERTIFICATE_LOCATION, Type.STRING, null, Importance.MEDIUM,
136+
CLIENT_SSL_ACCESS_CERTIFICATE_LOCATION_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT,
137+
"User access certificate")
138+
.define(CLIENT_SSL_ACCESS_KEY_LOCATION, Type.STRING, null, Importance.MEDIUM,
139+
CLIENT_SSL_ACCESS_KEY_LOCATION_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT,
140+
"User certificate’s key");
141+
// KeyStore and TrustStore files settings
142+
config.define(CLIENT_SSL_TRUSTSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM,
143+
CLIENT_SSL_TRUSTSTORE_LOCATION_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT,
144+
"Trust store location")
145+
.define(CLIENT_SSL_TRUSTSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
146+
CLIENT_SSL_TRUSTSTORE_PASSWORD_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT,
147+
"Trust store password")
148+
.define(CLIENT_SSL_KEYSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM,
149+
CLIENT_SSL_KEYSTORE_LOCATION_DOC, SSL_SETTINGS_GROUP_NAME, order, Width.SHORT,
150+
"Key store location")
151+
.define(CLIENT_SSL_KEYSTORE_TYPE, Type.STRING, "JKS", Importance.MEDIUM, CLIENT_SSL_KEYSTORE_TYPE_DOC,
152+
SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT, "Key store type")
153+
.define(CLIENT_SSL_KEYSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
154+
CLIENT_SSL_KEYSTORE_PASSWORD_DOC, SSL_SETTINGS_GROUP_NAME, order + 1, Width.SHORT,
155+
"Key store password");
156+
157+
}
158+
159+
private static boolean isAuthenticatedConnection(final OpensearchSinkConnectorConfig config) {
160+
return Objects.nonNull(connectionUsername(config)) && Objects.nonNull(connectionPassword(config));
161+
}
162+
163+
private static String connectionUsername(final OpensearchSinkConnectorConfig config) {
164+
return config.getString(CONNECTION_USERNAME_CONFIG);
165+
}
166+
167+
private static Password connectionPassword(final OpensearchSinkConnectorConfig config) {
168+
return config.getPassword(CONNECTION_PASSWORD_CONFIG);
169+
}
170+
171+
}

0 commit comments

Comments
 (0)