Skip to content

Commit 82e7edc

Browse files
committed
All v3 tests green for HiveMQ client
1 parent a3366a0 commit 82e7edc

File tree

51 files changed

+1886
-25
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1886
-25
lines changed

mqtt-hivemq/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ plugins {
33
}
44

55
dependencies {
6+
api(projects.micronautMqttSsl)
7+
8+
implementation(libs.bcpkix.jdk15on)
69
api(libs.managed.hivemq.client)
710
}
811

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright 2017-2023 original authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micronaut.mqtt.hivemq.bind;
17+
18+
import java.util.List;
19+
20+
/**
21+
* A MQTT message holds the payload and options
22+
* for message delivery.
23+
*
24+
* @author Sven Kobow
25+
*/
26+
public class MqttMessage {
27+
28+
private boolean mutable = true;
29+
private byte[] payload;
30+
private int qos = 1;
31+
private boolean retained = false;
32+
private boolean dup = false;
33+
private int messageId;
34+
private byte[] correlationData;
35+
private List<UserProperty> userProperties;
36+
37+
public MqttMessage() {
38+
setPayload(new byte[]{});
39+
}
40+
41+
public MqttMessage(final byte[] payload) {
42+
setPayload(payload);
43+
}
44+
45+
/**
46+
* Returns whether the message is mutable.
47+
* @return mutable
48+
*/
49+
public boolean getMutable() {
50+
return mutable;
51+
}
52+
53+
/**
54+
* Sets whether the message is mutable.
55+
* @param mutable boolean value
56+
*/
57+
public void setMutable(final boolean mutable) {
58+
this.mutable = mutable;
59+
}
60+
61+
/**
62+
* Returns the payload as byte array.
63+
* @return payload The payload byte array
64+
*/
65+
public byte[] getPayload() {
66+
return payload;
67+
}
68+
69+
/**
70+
* Sets the payload of the message.
71+
* @param payload The payload as byte array
72+
*/
73+
public void setPayload(final byte[] payload) {
74+
this.payload = payload;
75+
}
76+
77+
/**
78+
* Returns the quality of service level for the message.
79+
* @return MQTT quality of service level
80+
*/
81+
public int getQos() {
82+
return qos;
83+
}
84+
85+
/**
86+
* Sets the quality of service level for the message.
87+
* @param qos MQTT quality of service level
88+
*/
89+
public void setQos(final int qos) {
90+
this.qos = qos;
91+
}
92+
93+
/**
94+
* Returns whether the message is a retained message.
95+
* @return boolean value
96+
*/
97+
public boolean isRetained() {
98+
return retained;
99+
}
100+
101+
/**
102+
* Sets whether the message is retained.
103+
* @param retained boolean value
104+
*/
105+
public void setRetained(final boolean retained) {
106+
this.retained = retained;
107+
}
108+
109+
/**
110+
* Returns whether the message is flagged as duplicate.
111+
* @return boolean value
112+
*/
113+
public boolean getDup() {
114+
return dup;
115+
}
116+
117+
/**
118+
* Flags the message as duplicate.
119+
* @param dup boolean value
120+
*/
121+
public void setDup(final boolean dup) {
122+
this.dup = dup;
123+
}
124+
125+
/**
126+
* Returns the message id.
127+
* @return message id
128+
*/
129+
public int getId() {
130+
return messageId;
131+
}
132+
133+
/**
134+
* Sets the message id.
135+
* @param messageId message id
136+
*/
137+
public void setId(final int messageId) {
138+
this.messageId = messageId;
139+
}
140+
141+
/**
142+
* Sets the MQTT user properties.
143+
* @param userProperties MQTT user properties
144+
*/
145+
public void setUserProperties(final List<UserProperty> userProperties) {
146+
this.userProperties = userProperties;
147+
}
148+
149+
/**
150+
* Returns the MQTT user properties.
151+
* @return MQTT user properties
152+
*/
153+
public List<UserProperty> getUserProperties() {
154+
return this.userProperties;
155+
}
156+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2017-2023 original authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micronaut.mqtt.hivemq.bind;
17+
18+
/**
19+
* Represents a MQTT v5 property.
20+
*
21+
* @author Sven Kobow
22+
* @since 3.0.0
23+
*/
24+
public class UserProperty {
25+
private final String key;
26+
private final String value;
27+
28+
public UserProperty(String key, String value) {
29+
this.key = key;
30+
this.value = value;
31+
}
32+
33+
/**
34+
* The key of the property.
35+
* @return the key
36+
*/
37+
public String getKey() {
38+
return key;
39+
}
40+
41+
/**
42+
* The value of the property.
43+
* @return the value
44+
*/
45+
public String getValue() {
46+
return value;
47+
}
48+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2017-2023 original authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micronaut.mqtt.hivemq.client;
17+
18+
import io.micronaut.mqtt.bind.MqttBindingContext;
19+
import io.micronaut.mqtt.hivemq.bind.MqttMessage;
20+
21+
import java.util.Set;
22+
import java.util.function.Consumer;
23+
24+
public interface MqttClientAdapter {
25+
void subscribe(String[] topics, int[] qos, Consumer<MqttBindingContext<MqttMessage>> callback);
26+
27+
void unsubscribe(Set<String> topics);
28+
29+
boolean isConnected();
30+
31+
Object getClientIdentifier();
32+
}

mqtt-hivemq/src/main/java/io/micronaut/mqtt/hivemq/client/MqttClientFactory.java

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,58 @@
1515
*/
1616
package io.micronaut.mqtt.hivemq.client;
1717

18-
import io.micronaut.context.annotation.Factory;
18+
import io.micronaut.mqtt.hivemq.ssl.CertificateReader;
19+
import io.micronaut.mqtt.hivemq.ssl.KeyManagerFactoryCreationException;
20+
import io.micronaut.mqtt.hivemq.ssl.PrivateKeyReader;
21+
import io.micronaut.mqtt.hivemq.ssl.TrustManagerFactoryCreationException;
22+
import io.micronaut.mqtt.ssl.MqttCertificateConfiguration;
1923

20-
@Factory
21-
public class MqttClientFactory {
24+
import javax.net.ssl.KeyManagerFactory;
25+
import javax.net.ssl.TrustManagerFactory;
26+
import java.io.IOException;
27+
import java.security.*;
28+
import java.security.cert.Certificate;
29+
import java.security.cert.CertificateException;
30+
31+
public interface MqttClientFactory {
32+
33+
default KeyManagerFactory getKeyManagerFactory(final MqttCertificateConfiguration certConfiguration) throws KeyManagerFactoryCreationException {
34+
try {
35+
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
36+
final KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
37+
38+
final Certificate certificate = CertificateReader.readCertificate(certConfiguration.getCertificate());
39+
40+
final PrivateKey key = PrivateKeyReader.getPrivateKey(certConfiguration.getPrivateKey(), certConfiguration.getPassword());
41+
42+
keyStore.load(null, null);
43+
keyStore.setCertificateEntry("certificate", certificate);
44+
keyStore.setKeyEntry("private-key", key, certConfiguration.getPassword(), new Certificate[]{certificate});
45+
46+
kmf.init(keyStore, certConfiguration.getPassword());
47+
48+
return kmf;
49+
} catch (NoSuchAlgorithmException | KeyStoreException | IOException | CertificateException |
50+
UnrecoverableKeyException e) {
51+
throw new KeyManagerFactoryCreationException(e.getMessage(), e);
52+
}
53+
}
54+
55+
default TrustManagerFactory getTrustManagerFactory(final MqttCertificateConfiguration certConfiguration) throws TrustManagerFactoryCreationException {
56+
try {
57+
final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
58+
final KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
59+
60+
final Certificate certificate = CertificateReader.readCertificate(certConfiguration.getCertificateAuthority());
61+
62+
keyStore.load(null);
63+
keyStore.setCertificateEntry("ca-certificate", certificate);
64+
65+
tmf.init(keyStore);
66+
67+
return tmf;
68+
} catch (NoSuchAlgorithmException | KeyStoreException | IOException | CertificateException e) {
69+
throw new TrustManagerFactoryCreationException(e.getMessage(), e);
70+
}
71+
}
2272
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2017-2023 original authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micronaut.mqtt.hivemq.client.health;
17+
18+
import io.micronaut.context.annotation.Requires;
19+
import io.micronaut.core.async.publisher.Publishers;
20+
import io.micronaut.core.util.StringUtils;
21+
import io.micronaut.health.HealthStatus;
22+
import io.micronaut.management.endpoint.health.HealthEndpoint;
23+
import io.micronaut.management.health.indicator.HealthIndicator;
24+
import io.micronaut.management.health.indicator.HealthResult;
25+
import io.micronaut.mqtt.hivemq.client.MqttClientAdapter;
26+
import jakarta.inject.Singleton;
27+
import org.reactivestreams.Publisher;
28+
29+
import java.util.Collections;
30+
31+
@Requires(property = HealthEndpoint.PREFIX + ".mqtt.client.enabled", value = StringUtils.TRUE)
32+
@Requires(beans = HealthEndpoint.class)
33+
@Singleton
34+
public class MqttHealthIndicator implements HealthIndicator {
35+
public static final String NAME = "mqtt-client";
36+
private final MqttClientAdapter client;
37+
38+
public MqttHealthIndicator(final MqttClientAdapter client) {
39+
this.client = client;
40+
}
41+
42+
@Override
43+
public Publisher<HealthResult> getResult() {
44+
HealthStatus status = client.isConnected() ? HealthStatus.UP : HealthStatus.DOWN;
45+
HealthResult.Builder builder = HealthResult.builder(NAME, status).details(Collections.singletonMap("clientId", client.getClientIdentifier()));
46+
return Publishers.just(builder.build());
47+
}
48+
}

0 commit comments

Comments
 (0)