Skip to content

Commit 1e4d2cd

Browse files
authored
Add dependent admin client (#153)
* Fix memory leak in Connection::GetMetadata * Add dependent admin client to callback based API * Add dependent admin client to promisified API * Update comments and and examples * Add CHANGELOG.md entry * Add topic caching to dependent examples * Address review comments
1 parent 556aa90 commit 1e4d2cd

16 files changed

+603
-73
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ v0.5.0 is a limited availability feature release. It is supported for all usage.
88
1. Add support for an Admin API to delete records.(#141).
99
2. Fixes an issue with unresolved raced Promises leaking in the consumer (#151).
1010
3. Add support for an Admin API to describe topics.(#155).
11+
4. Add support for dependent Admin client (#153).
1112

1213

1314
# confluent-kafka-javascript v0.4.0

e2e/admin-dependent.spec.js

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* confluent-kafka-javascript - Node.js wrapper for RdKafka C/C++ library
3+
*
4+
* Copyright (c) 2024 Confluent, Inc.
5+
*
6+
* This software may be modified and distributed under the terms
7+
* of the MIT license. See the LICENSE.txt file for details.
8+
*/
9+
10+
var Kafka = require('../');
11+
var t = require('assert');
12+
13+
var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
14+
var time = Date.now();
15+
16+
describe('Dependent Admin', function () {
17+
describe('from Producer', function () {
18+
let producer;
19+
20+
this.beforeEach(function (done) {
21+
producer = new Kafka.Producer({
22+
'metadata.broker.list': kafkaBrokerList,
23+
});
24+
done();
25+
});
26+
27+
it('should be created and useable from connected producer', function (done) {
28+
producer.on('ready', function () {
29+
let admin = Kafka.AdminClient.createFrom(producer);
30+
admin.listTopics(null, function (err, res) {
31+
t.ifError(err);
32+
t.ok(res);
33+
producer.disconnect(done);
34+
admin = null;
35+
});
36+
t.ok(admin);
37+
});
38+
producer.connect();
39+
});
40+
41+
it('should fail to be created from unconnected producer', function (done) {
42+
t.throws(function () {
43+
Kafka.AdminClient.createFrom(producer);
44+
}, /Existing client must be connected before creating a new client from it/);
45+
done();
46+
});
47+
48+
});
49+
50+
describe('from Consumer', function () {
51+
let consumer;
52+
53+
this.beforeEach(function (done) {
54+
consumer = new Kafka.KafkaConsumer({
55+
'metadata.broker.list': kafkaBrokerList,
56+
'group.id': 'kafka-mocha-grp-' + time,
57+
});
58+
done();
59+
});
60+
61+
it('should be created and useable from connected consumer', function (done) {
62+
consumer.on('ready', function () {
63+
let admin = Kafka.AdminClient.createFrom(consumer);
64+
admin.listTopics(null, function (err, res) {
65+
t.ifError(err);
66+
t.ok(res);
67+
consumer.disconnect(done);
68+
admin = null;
69+
});
70+
t.ok(admin);
71+
});
72+
consumer.connect();
73+
});
74+
75+
it('should fail to be created from unconnected consumer', function (done) {
76+
t.throws(function () {
77+
Kafka.AdminClient.createFrom(consumer);
78+
}, /Existing client must be connected before creating a new client from it/);
79+
done();
80+
});
81+
82+
});
83+
});
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
2+
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
3+
4+
async function adminFromConsumer() {
5+
const kafka = new Kafka({
6+
kafkaJS: {
7+
brokers: ['localhost:9092'],
8+
}
9+
});
10+
11+
const consumer = kafka.consumer({
12+
kafkaJS: {
13+
groupId: 'test-group',
14+
fromBeginning: true,
15+
}
16+
});
17+
18+
await consumer.connect();
19+
20+
// The consumer can be used as normal
21+
await consumer.subscribe({ topic: 'test-topic' });
22+
consumer.run({
23+
eachMessage: async ({ topic, partition, message }) => {
24+
console.log({
25+
topic,
26+
partition,
27+
offset: message.offset,
28+
key: message.key?.toString(),
29+
value: message.value.toString(),
30+
});
31+
},
32+
});
33+
34+
// And the same consumer can create an admin client - the consumer must have successfully
35+
// been connected before the admin client can be created.
36+
const admin = consumer.dependentAdmin();
37+
await admin.connect();
38+
39+
// The admin client can be used until the consumer is connected.
40+
const listTopicsResult = await admin.listTopics();
41+
console.log(listTopicsResult);
42+
43+
await new Promise(resolve => setTimeout(resolve, 10000));
44+
45+
// Disconnect the consumer and admin clients in the correct order.
46+
await admin.disconnect();
47+
await consumer.disconnect();
48+
}
49+
50+
async function adminFromProducer() {
51+
const kafka = new Kafka({
52+
kafkaJS: {
53+
brokers: ['localhost:9092'],
54+
}
55+
});
56+
57+
const producer = kafka.producer({
58+
'metadata.max.age.ms': 900000, /* This is set to the default value. */
59+
});
60+
61+
await producer.connect();
62+
63+
// And the same producer can create an admin client - the producer must have successfully
64+
// been connected before the admin client can be created.
65+
const admin = producer.dependentAdmin();
66+
await admin.connect();
67+
68+
// The admin client can be used until the producer is connected.
69+
const listTopicsResult = await admin.listTopics();
70+
console.log(listTopicsResult);
71+
72+
// A common use case for the dependent admin client is to make sure the topic
73+
// is cached before producing to it. This avoids delay in sending the first
74+
// message to any topic. Using the admin client linked to the producer allows
75+
// us to do this, by calling `fetchTopicMetadata` before we produce.
76+
// Here, we cache all possible topics, but it's advisable to only cache the
77+
// topics you are going to produce to (if you know it in advance),
78+
// and avoid calling listTopics().
79+
// Once a topic is cached, it will stay cached for `metadata.max.age.ms`,
80+
// which is 15 minutes by default, after which it will be removed if
81+
// it has not been produced to.
82+
await admin.fetchTopicMetadata({ topics: listTopicsResult }).catch(e => {
83+
console.error('Error caching topics: ', e);
84+
})
85+
86+
// The producer can be used as usual.
87+
await producer.send({ topic: 'test-topic', messages: [{ value: 'Hello!' }] });
88+
89+
// Disconnect the producer and admin clients in the correct order.
90+
await admin.disconnect();
91+
await producer.disconnect();
92+
}
93+
94+
adminFromProducer().then(() => adminFromConsumer()).catch(console.error);
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
const Kafka = require('@confluentinc/kafka-javascript');
2+
const admin = require('../../lib/admin');
3+
4+
const bootstrapServers = 'localhost:9092';
5+
6+
function adminFromProducer(callback) {
7+
const producer = new Kafka.Producer({
8+
'bootstrap.servers': bootstrapServers,
9+
'dr_msg_cb': true,
10+
});
11+
12+
const createAdminAndListAndDescribeTopics = (done) => {
13+
// Create an admin client from the producer, which must be connected.
14+
// Thus, this is called from the producer's 'ready' event.
15+
const admin = Kafka.AdminClient.createFrom(producer);
16+
17+
// The admin client can be used until the producer is connected.
18+
admin.listTopics((err, topics) => {
19+
if (err) {
20+
console.error(err);
21+
return;
22+
}
23+
console.log("Topics: ", topics);
24+
25+
// A common use case for the dependent admin client is to make sure the topic
26+
// is cached before producing to it. This avoids delay in sending the first
27+
// message to any topic. Using the admin client linked to the producer allows
28+
// us to do this, by calling `describeTopics` before we produce.
29+
// Here, we cache all possible topics, but it's advisable to only cache the
30+
// topics you are going to produce to (if you know it in advance),
31+
// and avoid calling listTopics().
32+
// Once a topic is cached, it will stay cached for `metadata.max.age.ms`,
33+
// which is 15 minutes by default, after which it will be removed if
34+
// it has not been produced to.
35+
admin.describeTopics(topics, null, (err, topicDescriptions) => {
36+
if (err) {
37+
console.error(err);
38+
return;
39+
}
40+
console.log("Topic descriptions fetched successfully");
41+
admin.disconnect();
42+
done();
43+
});
44+
});
45+
};
46+
47+
producer.connect();
48+
49+
producer.on('ready', () => {
50+
console.log("Producer is ready");
51+
producer.setPollInterval(100);
52+
53+
// After the producer is ready, it can be used to create an admin client.
54+
createAdminAndListAndDescribeTopics(() => {
55+
// The producer can also be used normally to produce messages.
56+
producer.produce('test-topic', null, Buffer.from('Hello World!'), null, Date.now());
57+
});
58+
59+
});
60+
61+
producer.on('event.error', (err) => {
62+
console.error(err);
63+
producer.disconnect(callback);
64+
});
65+
66+
producer.on('delivery-report', (err, report) => {
67+
console.log("Delivery report received:", report);
68+
producer.disconnect(callback);
69+
});
70+
}
71+
72+
function adminFromConsumer() {
73+
const consumer = new Kafka.KafkaConsumer({
74+
'bootstrap.servers': bootstrapServers,
75+
'group.id': 'test-group',
76+
'auto.offset.reset': 'earliest',
77+
});
78+
79+
const createAdminAndListTopics = () => {
80+
// Create an admin client from the consumer, which must be connected.
81+
// Thus, this is called from the consumer's 'ready' event.
82+
const admin = Kafka.AdminClient.createFrom(consumer);
83+
84+
// The admin client can be used until the consumer is connected.
85+
admin.listTopics((err, topics) => {
86+
if (err) {
87+
console.error(err);
88+
return;
89+
}
90+
console.log("Topics: ", topics);
91+
admin.disconnect();
92+
});
93+
};
94+
95+
consumer.connect();
96+
97+
consumer.on('ready', () => {
98+
console.log("Consumer is ready");
99+
100+
// After the consumer is ready, it can be used to create an admin client.
101+
createAdminAndListTopics();
102+
103+
// It can also be used normally to consume messages.
104+
consumer.subscribe(['test-topic']);
105+
consumer.consume();
106+
});
107+
108+
consumer.on('data', (data) => {
109+
// Quit after receiving a message.
110+
console.log("Consumer:data", data);
111+
consumer.disconnect();
112+
});
113+
114+
consumer.on('event.error', (err) => {
115+
console.error("Consumer:error", err);
116+
consumer.disconnect();
117+
});
118+
}
119+
120+
adminFromProducer(() => adminFromConsumer());

0 commit comments

Comments
 (0)