Skip to content

Commit 127567f

Browse files
committed
Allow token refresh cb to be an async function
1 parent 9476060 commit 127567f

File tree

2 files changed

+97
-63
lines changed

2 files changed

+97
-63
lines changed

e2e/oauthbearer_cb.spec.js

Lines changed: 80 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -14,69 +14,88 @@ var eventListener = require('./listener');
1414

1515
var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
1616

17-
describe('Client with oauthbearer_cb', function () {
18-
const oauthbearer_config = 'key=value';
19-
let oauthbearer_cb_called = 0;
20-
let oauthbearer_cb = function (config, cb) {
21-
console.log("Called oauthbearer_cb with given config: " + config);
22-
t.equal(config, oauthbearer_config);
23-
oauthbearer_cb_called++;
24-
25-
// The broker is not expected to be configured for oauthbearer authentication.
26-
// We just want to make sure that token refresh callback is triggered.
27-
cb(new Error('oauthbearer_cb error'), null);
28-
};
29-
30-
const commonConfig = {
31-
'metadata.broker.list': kafkaBrokerList,
32-
'debug': 'all',
33-
'security.protocol': 'SASL_PLAINTEXT',
34-
'sasl.mechanisms': 'OAUTHBEARER',
35-
'oauthbearer_token_refresh_cb': oauthbearer_cb,
36-
'sasl.oauthbearer.config': oauthbearer_config,
37-
}
38-
39-
const checkClient = function (client, done) {
40-
eventListener(client);
41-
42-
client.on('error', function (e) {
43-
t.match(e.message, /oauthbearer_cb error/);
44-
});
17+
const oauthbearer_config = 'key=value';
18+
let oauthbearer_cb_called = 0;
19+
20+
let oauthbearer_cb_callback = function (config, cb) {
21+
console.log("Called oauthbearer_cb with given config: " + config);
22+
t.equal(config, oauthbearer_config);
23+
oauthbearer_cb_called++;
24+
25+
// The broker is not expected to be configured for oauthbearer authentication.
26+
// We just want to make sure that token refresh callback is triggered.
27+
cb(new Error('oauthbearer_cb error'), null);
28+
};
29+
30+
let oauthbearer_cb_async = async function (config) {
31+
console.log("Called oauthbearer_cb with given config: " + config);
32+
t.equal(config, oauthbearer_config);
33+
oauthbearer_cb_called++;
34+
35+
// The broker is not expected to be configured for oauthbearer authentication.
36+
// We just want to make sure that token refresh callback is triggered.
37+
throw new Error('oauthbearer_cb error');
38+
};
39+
40+
for (const oauthbearer_cb of [oauthbearer_cb_async, oauthbearer_cb_callback]) {
41+
describe('Client with ' + (oauthbearer_cb.name), function () {
42+
43+
const commonConfig = {
44+
'metadata.broker.list': kafkaBrokerList,
45+
'debug': 'all',
46+
'security.protocol': 'SASL_PLAINTEXT',
47+
'sasl.mechanisms': 'OAUTHBEARER',
48+
'oauthbearer_token_refresh_cb': oauthbearer_cb,
49+
'sasl.oauthbearer.config': oauthbearer_config,
50+
}
51+
52+
const checkClient = function (client, done, useCb) {
53+
eventListener(client);
4554

46-
client.connect();
55+
client.on('error', function (e) {
56+
t.match(e.message, /oauthbearer_cb error/);
57+
});
4758

48-
// We don't actually expect the connection to succeed, but we want to
49-
// make sure that the oauthbearer_cb is called so give it a couple seconds.
50-
setTimeout(() => {
51-
client.disconnect();
52-
client = null;
53-
t.equal(oauthbearer_cb_called >= 1, true);
59+
// The default timeout for the connect is 30s, so even if we
60+
// call disconnect() midway, the test ends up being at least 30s.
61+
client.connect({timeout: 2000});
62+
63+
// We don't actually expect the connection to succeed, but we want to
64+
// make sure that the oauthbearer_cb is called so give it a couple seconds.
65+
setTimeout(() => {
66+
t.equal(oauthbearer_cb_called >= 1, true);
67+
client.disconnect(() => {
68+
done();
69+
});
70+
client = null;
71+
if (!useCb) // for admin client, where disconnect is sync.
72+
done();
73+
}, 2000);
74+
}
75+
76+
beforeEach(function (done) {
77+
oauthbearer_cb_called = 0;
5478
done();
55-
}, 2000);
56-
}
79+
});
5780

58-
beforeEach(function (done) {
59-
oauthbearer_cb_called = 0;
60-
done();
61-
});
81+
it('as producer', function (done) {
82+
let producer = new Kafka.Producer(commonConfig);
83+
checkClient(producer, done, true);
84+
producer = null;
85+
}).timeout(5000);
6286

63-
it('as producer', function (done) {
64-
let producer = new Kafka.Producer(commonConfig);
65-
checkClient(producer, done);
66-
producer = null;
67-
}).timeout(2500);
68-
69-
it('as consumer', function (done) {
70-
const config = Object.assign({ 'group.id': 'gid' }, commonConfig);
71-
let consumer = new Kafka.KafkaConsumer(config);
72-
checkClient(consumer, done);
73-
consumer = null;
74-
}).timeout(2500);
75-
76-
it('as admin', function (done) {
77-
let admin = new Kafka.AdminClient.create(commonConfig);
78-
checkClient(admin, done);
79-
admin = null;
80-
}).timeout(2500);
81-
82-
});
87+
it('as consumer', function (done) {
88+
const config = Object.assign({ 'group.id': 'gid' }, commonConfig);
89+
let consumer = new Kafka.KafkaConsumer(config);
90+
checkClient(consumer, done, true);
91+
consumer = null;
92+
}).timeout(5000);
93+
94+
it('as admin', function (done) {
95+
let admin = new Kafka.AdminClient.create(commonConfig);
96+
checkClient(admin, done, false);
97+
admin = null;
98+
}).timeout(5000);
99+
100+
});
101+
}

lib/client.js

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,10 @@ function Client(globalConf, SubClientType, topicConf) {
109109
// we might want to disconnect without ever completing connection.
110110
return;
111111
}
112-
savedCallback(oauthbearer_config, (err, token) => {
112+
113+
// This sets the token or error within librdkafka, and emits any
114+
// errors on the emitter.
115+
const postProcessTokenRefresh = (err, token) => {
113116
try {
114117
if (err) {
115118
throw err;
@@ -130,7 +133,19 @@ function Client(globalConf, SubClientType, topicConf) {
130133
this._client.setOAuthBearerTokenFailure(e.message);
131134
this.emit('error', e);
132135
}
133-
});
136+
};
137+
const returnPromise = savedCallback(oauthbearer_config, postProcessTokenRefresh);
138+
139+
// If it looks like a promise, and quacks like a promise, it is a promise
140+
// (or an async function). We expect the callback NOT to have been called
141+
// in such a case.
142+
if (returnPromise && (typeof returnPromise.then === 'function')) {
143+
returnPromise.then((token) => {
144+
postProcessTokenRefresh(null, token);
145+
}).catch(err => {
146+
postProcessTokenRefresh(err);
147+
});
148+
}
134149
}
135150
}
136151

0 commit comments

Comments
 (0)