Skip to content

Commit 9476060

Browse files
committed
Add examples and tests for non-promisified OAUTHBEARER token refresh callback
1 parent 34e4bd3 commit 9476060

File tree

4 files changed

+366
-0
lines changed

4 files changed

+366
-0
lines changed

e2e/oauthbearer_cb.spec.js

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* confluent-kafka-javascript - Node.js wrapper for RdKafka C/C++ library
3+
*
4+
* Copyright (c) 2024 Confluent, Inc.
5+
*
6+
* This software may be modified and distributed under the terms
7+
* of the MIT license. See the LICENSE.txt file for details.
8+
*/
9+
10+
var Kafka = require('../');
11+
var t = require('assert');
12+
13+
var eventListener = require('./listener');
14+
15+
var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
16+
17+
describe('Client with oauthbearer_cb', function () {
18+
const oauthbearer_config = 'key=value';
19+
let oauthbearer_cb_called = 0;
20+
let oauthbearer_cb = function (config, cb) {
21+
console.log("Called oauthbearer_cb with given config: " + config);
22+
t.equal(config, oauthbearer_config);
23+
oauthbearer_cb_called++;
24+
25+
// The broker is not expected to be configured for oauthbearer authentication.
26+
// We just want to make sure that token refresh callback is triggered.
27+
cb(new Error('oauthbearer_cb error'), null);
28+
};
29+
30+
const commonConfig = {
31+
'metadata.broker.list': kafkaBrokerList,
32+
'debug': 'all',
33+
'security.protocol': 'SASL_PLAINTEXT',
34+
'sasl.mechanisms': 'OAUTHBEARER',
35+
'oauthbearer_token_refresh_cb': oauthbearer_cb,
36+
'sasl.oauthbearer.config': oauthbearer_config,
37+
}
38+
39+
const checkClient = function (client, done) {
40+
eventListener(client);
41+
42+
client.on('error', function (e) {
43+
t.match(e.message, /oauthbearer_cb error/);
44+
});
45+
46+
client.connect();
47+
48+
// We don't actually expect the connection to succeed, but we want to
49+
// make sure that the oauthbearer_cb is called so give it a couple seconds.
50+
setTimeout(() => {
51+
client.disconnect();
52+
client = null;
53+
t.equal(oauthbearer_cb_called >= 1, true);
54+
done();
55+
}, 2000);
56+
}
57+
58+
beforeEach(function (done) {
59+
oauthbearer_cb_called = 0;
60+
done();
61+
});
62+
63+
it('as producer', function (done) {
64+
let producer = new Kafka.Producer(commonConfig);
65+
checkClient(producer, done);
66+
producer = null;
67+
}).timeout(2500);
68+
69+
it('as consumer', function (done) {
70+
const config = Object.assign({ 'group.id': 'gid' }, commonConfig);
71+
let consumer = new Kafka.KafkaConsumer(config);
72+
checkClient(consumer, done);
73+
consumer = null;
74+
}).timeout(2500);
75+
76+
it('as admin', function (done) {
77+
let admin = new Kafka.AdminClient.create(commonConfig);
78+
checkClient(admin, done);
79+
admin = null;
80+
}).timeout(2500);
81+
82+
});
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
const Kafka = require('@confluentinc/kafka-javascript');
2+
var jwt = require('jsonwebtoken');
3+
4+
// This example uses the Producer for demonstration purposes.
5+
// It is the same whether you use a Consumer/AdminClient.
6+
7+
function token_refresh(oauthbearer_config /* string - passed from config */, cb) {
8+
console.log("Called token_refresh with given config: " + oauthbearer_config);
9+
// At this point, we can use the information in the token, make
10+
// some API calls, fetch something from a file...
11+
// For the illustration, everything is hard-coded.
12+
const principal = 'admin';
13+
// In seconds - needed by jsonwebtoken library
14+
const exp_seconds = Math.floor(Date.now() / 1000) + (60 * 60);
15+
// In milliseconds - needed by kafka-javascript.
16+
const exp_ms = exp_seconds * 1000;
17+
18+
// For illustration, we're not signing our JWT (algorithm: none).
19+
// For production uses-cases, it should be signed.
20+
const tokenValue = jwt.sign(
21+
{ 'sub': principal, exp: exp_seconds, 'scope': 'requiredScope' }, '', { algorithm: 'none' });
22+
23+
// SASL extensions can be passed as Map or key/value pairs in an object.
24+
const extensions = {
25+
traceId: '123'
26+
};
27+
28+
// The callback is called with the new token, its lifetime, and the principal.
29+
// The extensions are optional and may be omitted.
30+
console.log("Finished token_refresh, triggering callback: with tokenValue: " +
31+
tokenValue.slice(0, 10) + "..., lifetime: " + exp_ms +
32+
", principal: " + principal + ", extensions: " + JSON.stringify(extensions));
33+
cb(
34+
// If no token could be fetched or an error occurred, a new Error can be
35+
// and passed as the first parameter and the second parameter omitted.
36+
null,
37+
{ tokenValue, lifetime: exp_ms, principal, extensions });
38+
}
39+
40+
function run() {
41+
const producer = new Kafka.Producer({
42+
'metadata.broker.list': 'localhost:60125',
43+
'dr_cb': true,
44+
// 'debug': 'all'
45+
46+
// Config important for OAUTHBEARER:
47+
'security.protocol': 'SASL_PLAINTEXT',
48+
'sasl.mechanisms': 'OAUTHBEARER',
49+
'sasl.oauthbearer.config': 'someConfigPropertiesKey=value',
50+
'oauthbearer_token_refresh_cb': token_refresh,
51+
});
52+
53+
producer.connect();
54+
55+
producer.on('event.log', (event) => {
56+
console.log(event);
57+
});
58+
59+
producer.on('ready', () => {
60+
console.log('Producer is ready!');
61+
producer.setPollInterval(1000);
62+
console.log("Producing message.");
63+
producer.produce(
64+
'topic',
65+
null, // partition - let partitioner choose
66+
Buffer.from('messageValue'),
67+
'messageKey',
68+
);
69+
});
70+
71+
producer.on('error', (err) => {
72+
console.error("Encountered error in producer: " + err.message);
73+
});
74+
75+
producer.on('delivery-report', function (err, report) {
76+
console.log('delivery-report: ' + JSON.stringify(report));
77+
// since we just want to produce one message, close shop.
78+
producer.disconnect();
79+
});
80+
}
81+
82+
run();

examples/node-rdkafka/oauthbearer_callback_authentication/package-lock.json

Lines changed: 187 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"name": "oauthbearer_callback_authentication",
3+
"version": "1.0.0",
4+
"description": "",
5+
"main": "oauthbearer_callback_authentication.js",
6+
"scripts": {
7+
"test": "echo \"Error: no test specified\" && exit 1"
8+
},
9+
"author": "",
10+
"license": "MIT",
11+
"dependencies": {
12+
"@confluentinc/kafka-javascript": "file:../../..",
13+
"jsonwebtoken": "^9.0.2"
14+
}
15+
}

0 commit comments

Comments
 (0)