Skip to content

Commit 178d395

Browse files
committed
Add metadata to offset commits
1 parent 8fe9107 commit 178d395

File tree

3 files changed

+153
-21
lines changed

3 files changed

+153
-21
lines changed

e2e/consumer.spec.js

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,33 @@ var t = require('assert');
1010
var crypto = require('crypto');
1111

1212
var eventListener = require('./listener');
13+
const { createTopics, deleteTopics } = require('./topicUtils');
1314

1415
var KafkaConsumer = require('../').KafkaConsumer;
1516

1617
var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
17-
var topic = 'test';
1818

1919
describe('Consumer', function() {
2020
var gcfg;
21+
let topic;
22+
let createdTopics = [];
2123

22-
beforeEach(function() {
24+
beforeEach(function(done) {
2325
var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
26+
topic = 'test' + crypto.randomBytes(20).toString('hex');
2427
gcfg = {
2528
'bootstrap.servers': kafkaBrokerList,
2629
'group.id': grp,
2730
'debug': 'all',
2831
'rebalance_cb': true,
2932
'enable.auto.commit': false
3033
};
34+
createTopics([{topic, num_partitions: 1, replication_factor: 1}], kafkaBrokerList, done);
35+
createdTopics.push(topic);
36+
});
37+
38+
after(function(done) {
39+
deleteTopics(createdTopics, kafkaBrokerList, done);
3140
});
3241

3342
describe('commit', function() {
@@ -94,34 +103,50 @@ describe('Consumer', function() {
94103
t.equal(position.length, 0);
95104
});
96105

97-
it('after assign, should get committed array without offsets ', function(done) {
98-
consumer.assign([{topic:topic, partition:0}]);
99-
// Defer this for a second
100-
setTimeout(function() {
101-
consumer.committed(null, 1000, function(err, committed) {
102-
t.ifError(err);
103-
t.equal(committed.length, 1);
104-
t.equal(typeof committed[0], 'object', 'TopicPartition should be an object');
105-
t.deepStrictEqual(committed[0].partition, 0);
106-
t.equal(committed[0].offset, undefined);
107-
done();
108-
});
106+
it('after assign, should get committed array without offsets ', function (done) {
107+
consumer.assign([{ topic: topic, partition: 0 }]);
108+
consumer.committed(null, 1000, function (err, committed) {
109+
t.ifError(err);
110+
t.equal(committed.length, 1);
111+
t.equal(typeof committed[0], 'object', 'TopicPartition should be an object');
112+
t.deepStrictEqual(committed[0].partition, 0);
113+
t.equal(committed[0].offset, undefined);
114+
done();
109115
}, 1000);
110116
});
111117

112-
it('after assign and commit, should get committed offsets', function(done) {
118+
it('after assign and commit, should get committed offsets with same metadata', function(done) {
113119
consumer.assign([{topic:topic, partition:0}]);
114-
consumer.commitSync({topic:topic, partition:0, offset:1000});
120+
consumer.commitSync({topic:topic, partition:0, offset:1000, metadata: 'A string with unicode ǂ'});
115121
consumer.committed(null, 1000, function(err, committed) {
116122
t.ifError(err);
117123
t.equal(committed.length, 1);
118124
t.equal(typeof committed[0], 'object', 'TopicPartition should be an object');
119125
t.deepStrictEqual(committed[0].partition, 0);
120126
t.deepStrictEqual(committed[0].offset, 1000);
127+
t.deepStrictEqual(committed[0].metadata, 'A string with unicode ǂ');
121128
done();
122129
});
123130
});
124131

132+
it('after assign and commit, a different consumer should get the same committed offsets and metadata', function(done) {
133+
consumer.assign([{topic:topic, partition:0}]);
134+
consumer.commitSync({topic:topic, partition:0, offset:1000, metadata: 'A string with unicode ǂ'});
135+
136+
let consumer2 = new KafkaConsumer(gcfg, {});
137+
consumer2.connect({ timeout: 2000 }, function (err, info) {
138+
consumer2.committed([{ topic, partition: 0 }], 1000, function (err, committed) {
139+
t.ifError(err);
140+
t.equal(committed.length, 1);
141+
t.equal(typeof committed[0], 'object', 'TopicPartition should be an object');
142+
t.deepStrictEqual(committed[0].partition, 0);
143+
t.deepStrictEqual(committed[0].offset, 1000);
144+
t.deepStrictEqual(committed[0].metadata, 'A string with unicode ǂ');
145+
consumer2.disconnect(done);
146+
});
147+
});
148+
});
149+
125150
it('after assign, before consume, position should return an array without offsets', function(done) {
126151
consumer.assign([{topic:topic, partition:0}]);
127152
var position = consumer.position();
@@ -154,7 +179,7 @@ describe('Consumer', function() {
154179
consumer.connect({ timeout: 2000 }, function(err, info) {
155180
t.ifError(err);
156181
consumer.assign([{
157-
topic: 'test',
182+
topic,
158183
partition: 0,
159184
offset: 0
160185
}]);
@@ -172,7 +197,7 @@ describe('Consumer', function() {
172197

173198
it('should be able to seek', function(cb) {
174199
consumer.seek({
175-
topic: 'test',
200+
topic,
176201
partition: 0,
177202
offset: 0
178203
}, 1, function(err) {
@@ -183,7 +208,7 @@ describe('Consumer', function() {
183208

184209
it('should be able to seek with a timeout of 0', function(cb) {
185210
consumer.seek({
186-
topic: 'test',
211+
topic,
187212
partition: 0,
188213
offset: 0
189214
}, 0, function(err) {
@@ -217,7 +242,7 @@ describe('Consumer', function() {
217242
t.equal(0, consumer.subscription().length);
218243
consumer.subscribe([topic]);
219244
t.equal(1, consumer.subscription().length);
220-
t.equal('test', consumer.subscription()[0]);
245+
t.equal(topic, consumer.subscription()[0]);
221246
t.equal(0, consumer.assignments().length);
222247
});
223248

@@ -308,6 +333,7 @@ describe('Consumer', function() {
308333

309334
consumer.subscribe([topic]);
310335

336+
consumer.setDefaultConsumeTimeout(500); // Topic might not have any messages.
311337
consumer.consume(1, function(err, messages) {
312338
t.ifError(err);
313339

e2e/topicUtils.js

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* confluent-kafka-javascript - Node.js wrapper for RdKafka C/C++ library
3+
* Copyright (c) 2024 Confluent, Inc.
4+
*
5+
* This software may be modified and distributed under the terms
6+
* of the MIT license. See the LICENSE.txt file for details.
7+
*/
8+
9+
module.exports = { createTopics, deleteTopics };
10+
11+
var Kafka = require('../');
12+
13+
// Create topics and wait for them to be created in the metadata.
14+
function createTopics(topics, brokerList, cb) {
15+
const client = Kafka.AdminClient.create({
16+
'client.id': 'kafka-test-admin-client',
17+
'metadata.broker.list': brokerList,
18+
});
19+
let promises = [];
20+
for (const topic of topics) {
21+
client.createTopic(topic, (err) => {
22+
promises.push(new Promise((resolve, reject) => {
23+
if (err && err.code !== Kafka.CODES.ERR_TOPIC_ALREADY_EXISTS) {
24+
reject(err);
25+
}
26+
resolve();
27+
}));
28+
});
29+
}
30+
31+
Promise.all(promises).then(() => {
32+
let interval = setInterval(() => {
33+
client.listTopics((err, topicList) => {
34+
if (err) {
35+
client.disconnect();
36+
clearInterval(interval);
37+
cb(err);
38+
return;
39+
}
40+
for (const topic of topics) {
41+
if (!topicList.includes(topic.topic)) {
42+
return;
43+
}
44+
}
45+
client.disconnect();
46+
clearInterval(interval);
47+
cb();
48+
});
49+
}, 100);
50+
}).catch((err) => {
51+
client.disconnect();
52+
cb(err);
53+
});
54+
}
55+
56+
// Delete topics.
57+
function deleteTopics(topics, brokerList, cb) {
58+
const client = Kafka.AdminClient.create({
59+
'client.id': 'kafka-test-admin-client',
60+
'metadata.broker.list': brokerList,
61+
});
62+
let promises = [];
63+
for (const topic of topics) {
64+
client.deleteTopic(topic, (err) => {
65+
promises.push(new Promise((resolve, reject) => {
66+
if (err && err.code !== Kafka.CODES.ERR_UNKNOWN_TOPIC_OR_PART) {
67+
reject(err);
68+
}
69+
resolve();
70+
}));
71+
});
72+
}
73+
74+
Promise.all(promises).then(() => {
75+
client.disconnect();
76+
cb();
77+
}).catch((err) => {
78+
client.disconnect();
79+
cb(err);
80+
});
81+
}

src/common.cc

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,16 @@ v8::Local<v8::Array> ToV8Array(
325325
Nan::Set(obj, Nan::New("offset").ToLocalChecked(),
326326
Nan::New<v8::Number>(topic_partition->offset()));
327327
}
328+
329+
// If present, size >= 1, since it will include at least the null terminator.
330+
if (topic_partition->get_metadata().size() > 0) {
331+
Nan::Set(obj, Nan::New("metadata").ToLocalChecked(),
332+
Nan::New<v8::String>(
333+
reinterpret_cast<const char*>(topic_partition->get_metadata().data()),
334+
topic_partition->get_metadata().size() - 1) // null terminator is not required by the constructor.
335+
.ToLocalChecked());
336+
}
337+
328338
Nan::Set(obj, Nan::New("partition").ToLocalChecked(),
329339
Nan::New<v8::Number>(topic_partition->partition()));
330340
Nan::Set(obj, Nan::New("topic").ToLocalChecked(),
@@ -428,7 +438,22 @@ RdKafka::TopicPartition * FromV8Object(v8::Local<v8::Object> topic_partition) {
428438
return NULL;
429439
}
430440

431-
return RdKafka::TopicPartition::create(topic, partition, offset);
441+
RdKafka::TopicPartition *toppar = RdKafka::TopicPartition::create(topic, partition, offset);
442+
443+
v8::Local<v8::String> metadataKey = Nan::New("metadata").ToLocalChecked();
444+
if (Nan::Has(topic_partition, metadataKey).FromMaybe(false)) {
445+
v8::Local<v8::Value> metadataValue = Nan::Get(topic_partition, metadataKey).ToLocalChecked();
446+
447+
if (metadataValue->IsString()) {
448+
Nan::Utf8String metadataValueUtf8Str(metadataValue.As<v8::String>());
449+
std::string metadataValueStr(*metadataValueUtf8Str);
450+
std::vector<unsigned char> metadataVector(metadataValueStr.begin(), metadataValueStr.end());
451+
metadataVector.push_back('\0'); // The null terminator is not included in the iterator.
452+
toppar->set_metadata(metadataVector);
453+
}
454+
}
455+
456+
return toppar;
432457
}
433458

434459
} // namespace TopicPartition

0 commit comments

Comments
 (0)