Skip to content

Commit 34e4bd3

Browse files
committed
Add OAUTHBEARER token refresh callback
1 parent fd13955 commit 34e4bd3

14 files changed

+409
-22
lines changed

lib/client.js

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ var util = require('util');
1515
var Kafka = require('../librdkafka.js');
1616
var assert = require('assert');
1717

18-
const bindingVersion = require('./util').bindingVersion;
18+
const { bindingVersion, dictToStringList } = require('./util');
1919

2020
var LibrdKafkaError = require('./error');
2121

@@ -100,6 +100,40 @@ function Client(globalConf, SubClientType, topicConf) {
100100
}.bind(this);
101101
}
102102

103+
if (Object.hasOwn(this._cb_configs.global, 'oauthbearer_token_refresh_cb')) {
104+
const savedCallback = this._cb_configs.global.oauthbearer_token_refresh_cb;
105+
this._cb_configs.global.oauthbearer_token_refresh_cb = (oauthbearer_config) => {
106+
if (this._isDisconnecting) {
107+
// Don't call the callback if we're in the middle of disconnecting.
108+
// This is especially important when the credentials are wrong, and
109+
// we might want to disconnect without ever completing connection.
110+
return;
111+
}
112+
savedCallback(oauthbearer_config, (err, token) => {
113+
try {
114+
if (err) {
115+
throw err;
116+
}
117+
let { tokenValue, lifetime, principal, extensions } = token;
118+
119+
// If the principal isn't there, set an empty principal.
120+
if (!principal) {
121+
principal = '';
122+
}
123+
124+
// Convert extensions from a Map/object to a list that librdkafka expects.
125+
extensions = dictToStringList(extensions);
126+
127+
this._client.setOAuthBearerToken(tokenValue, lifetime, principal, extensions);
128+
} catch (e) {
129+
e.message = "oauthbearer_token_refresh_cb: " + e.message;
130+
this._client.setOAuthBearerTokenFailure(e.message);
131+
this.emit('error', e);
132+
}
133+
});
134+
}
135+
}
136+
103137
this.metrics = {};
104138
this._isConnected = false;
105139
this.errorCounter = 0;

lib/util.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
* confluent-kafka-javascript - Node.js wrapper for RdKafka C/C++ library
33
*
44
* Copyright (c) 2016-2023 Blizzard Entertainment
5+
* 2024 Confluent, Inc.
56
*
67
* This software may be modified and distributed under the terms
78
* of the MIT license. See the LICENSE.txt file for details.
@@ -28,4 +29,27 @@ util.isObject = function (obj) {
2829
return obj && typeof obj === 'object';
2930
};
3031

32+
// Convert Map or object to a list of [key, value, key, value...].
33+
util.dictToStringList = function (mapOrObject) {
34+
let list = null;
35+
if (mapOrObject && (mapOrObject instanceof Map)) {
36+
list =
37+
Array
38+
.from(mapOrObject).reduce((acc, [key, value]) => {
39+
acc.push(key, value);
40+
return acc;
41+
}, [])
42+
.map(v => String(v));
43+
} else if (util.isObject(mapOrObject)) {
44+
list =
45+
Object
46+
.entries(mapOrObject).reduce((acc, [key, value]) => {
47+
acc.push(key, value);
48+
return acc;
49+
}, [])
50+
.map(v => String(v));
51+
}
52+
return list;
53+
};
54+
3155
util.bindingVersion = 'v0.1.11-devel';

src/admin.cc

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,41 @@ AdminClient::~AdminClient() {
3939
}
4040

4141
Baton AdminClient::Connect() {
42-
std::string errstr;
42+
if (IsConnected()) {
43+
return Baton(RdKafka::ERR_NO_ERROR);
44+
}
45+
46+
Baton baton = setupSaslOAuthBearerConfig();
47+
if (baton.err() != RdKafka::ERR_NO_ERROR) {
48+
return baton;
49+
}
4350

51+
// Activate the dispatchers before the connection, as some callbacks may run
52+
// on the background thread.
53+
// We will deactivate them if the connection fails.
54+
ActivateDispatchers();
55+
56+
std::string errstr;
4457
{
4558
scoped_shared_write_lock lock(m_connection_lock);
4659
m_client = RdKafka::Producer::create(m_gconfig, errstr);
4760
}
4861

4962
if (!m_client || !errstr.empty()) {
63+
DeactivateDispatchers();
5064
return Baton(RdKafka::ERR__STATE, errstr);
5165
}
5266

5367
if (rkqu == NULL) {
5468
rkqu = rd_kafka_queue_new(m_client->c_ptr());
5569
}
5670

57-
return Baton(RdKafka::ERR_NO_ERROR);
71+
baton = setupSaslOAuthBearerBackgroundQueue();
72+
if (baton.err() != RdKafka::ERR_NO_ERROR) {
73+
DeactivateDispatchers();
74+
}
75+
76+
return baton;
5877
}
5978

6079
Baton AdminClient::Disconnect() {
@@ -66,6 +85,8 @@ Baton AdminClient::Disconnect() {
6685
rkqu = NULL;
6786
}
6887

88+
DeactivateDispatchers();
89+
6990
delete m_client;
7091
m_client = NULL;
7192
}
@@ -99,6 +120,9 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
99120
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
100121
Nan::SetPrototypeMethod(tpl, "setSaslCredentials", NodeSetSaslCredentials);
101122
Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata);
123+
Nan::SetPrototypeMethod(tpl, "setOAuthBearerToken", NodeSetOAuthBearerToken);
124+
Nan::SetPrototypeMethod(tpl, "setOAuthBearerTokenFailure",
125+
NodeSetOAuthBearerTokenFailure);
102126

103127
constructor.Reset(
104128
(tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked());

src/callbacks.cc

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@
1010

1111
#include <string>
1212
#include <vector>
13-
#include <algorithm>
1413

15-
#include "src/callbacks.h"
1614
#include "src/kafka-consumer.h"
1715

1816
using v8::Local;
@@ -547,6 +545,37 @@ void OffsetCommit::offset_commit_cb(RdKafka::ErrorCode err,
547545
dispatcher.Execute();
548546
}
549547

548+
// OAuthBearerTokenRefresh callback
549+
void OAuthBearerTokenRefreshDispatcher::Add(
550+
const std::string &oauthbearer_config) {
551+
scoped_mutex_lock lock(async_lock);
552+
m_oauthbearer_config = oauthbearer_config;
553+
}
554+
555+
void OAuthBearerTokenRefreshDispatcher::Flush() {
556+
Nan::HandleScope scope;
557+
558+
const unsigned int argc = 1;
559+
560+
std::string oauthbearer_config;
561+
{
562+
scoped_mutex_lock lock(async_lock);
563+
oauthbearer_config = m_oauthbearer_config;
564+
m_oauthbearer_config.clear();
565+
}
566+
567+
v8::Local<v8::Value> argv[argc] = {};
568+
argv[0] = Nan::New<v8::String>(oauthbearer_config.c_str()).ToLocalChecked();
569+
570+
Dispatch(argc, argv);
571+
}
572+
573+
void OAuthBearerTokenRefresh::oauthbearer_token_refresh_cb(
574+
RdKafka::Handle *handle, const std::string &oauthbearer_config) {
575+
dispatcher.Add(oauthbearer_config);
576+
dispatcher.Execute();
577+
}
578+
550579
// Partitioner callback
551580

552581
Partitioner::Partitioner() {}

src/callbacks.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,23 @@ class OffsetCommit : public RdKafka::OffsetCommitCb {
248248
v8::Persistent<v8::Function> m_cb;
249249
};
250250

251+
class OAuthBearerTokenRefreshDispatcher : public Dispatcher {
252+
public:
253+
OAuthBearerTokenRefreshDispatcher(){};
254+
~OAuthBearerTokenRefreshDispatcher(){};
255+
void Add(const std::string &oauthbearer_config);
256+
void Flush();
257+
258+
private:
259+
std::string m_oauthbearer_config;
260+
};
261+
262+
class OAuthBearerTokenRefresh : public RdKafka::OAuthBearerTokenRefreshCb {
263+
public:
264+
void oauthbearer_token_refresh_cb(RdKafka::Handle *, const std::string &);
265+
OAuthBearerTokenRefreshDispatcher dispatcher;
266+
};
267+
251268
class Partitioner : public RdKafka::PartitionerCb {
252269
public:
253270
Partitioner();

src/common.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,26 @@ std::vector<std::string> v8ArrayToStringVector(v8::Local<v8::Array> parameter) {
143143
return newItem;
144144
}
145145

146+
std::list<std::string> v8ArrayToStringList(v8::Local<v8::Array> parameter) {
147+
std::list<std::string> newItem;
148+
if (parameter->Length() >= 1) {
149+
for (unsigned int i = 0; i < parameter->Length(); i++) {
150+
v8::Local<v8::Value> v;
151+
if (!Nan::Get(parameter, i).ToLocal(&v)) {
152+
continue;
153+
}
154+
Nan::MaybeLocal<v8::String> p = Nan::To<v8::String>(v);
155+
if (p.IsEmpty()) {
156+
continue;
157+
}
158+
Nan::Utf8String pVal(p.ToLocalChecked());
159+
std::string pString(*pVal);
160+
newItem.push_back(pString);
161+
}
162+
}
163+
return newItem;
164+
}
165+
146166
template<> v8::Local<v8::Array> GetParameter<v8::Local<v8::Array> >(
147167
v8::Local<v8::Object> object, std::string field_name, v8::Local<v8::Array> def) {
148168
v8::Local<v8::String> field = Nan::New(field_name.c_str()).ToLocalChecked();

src/common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ template<> v8::Local<v8::Array> GetParameter<v8::Local<v8::Array> >(
3939
v8::Local<v8::Object>, std::string, v8::Local<v8::Array>);
4040
// template int GetParameter<int>(v8::Local<v8::Object, std::string, int);
4141
std::vector<std::string> v8ArrayToStringVector(v8::Local<v8::Array>);
42+
std::list<std::string> v8ArrayToStringList(v8::Local<v8::Array>);
4243

4344
class scoped_mutex_lock {
4445
public:

src/config.cc

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object,
8181
return NULL;
8282
}
8383
} else {
84-
// Do nothing - Connection::NodeConfigureCallbacks will handle this for each
85-
// of the three client types.
84+
// Do nothing - NodeConfigureCallbacks will handle this for each
85+
// of the three client types, called from within JavaScript.
8686
}
8787
}
8888

@@ -118,6 +118,23 @@ void Conf::ConfigureCallback(const std::string &string_key, const v8::Local<v8::
118118
offset_commit->dispatcher.RemoveCallback(cb);
119119
}
120120
}
121+
} else if (string_key.compare("oauthbearer_token_refresh_cb") == 0) {
122+
NodeKafka::Callbacks::OAuthBearerTokenRefresh *oauthbearer_token_refresh =
123+
oauthbearer_token_refresh_cb();
124+
if (add) {
125+
if (oauthbearer_token_refresh == NULL) {
126+
oauthbearer_token_refresh =
127+
new NodeKafka::Callbacks::OAuthBearerTokenRefresh();
128+
this->set(string_key, oauthbearer_token_refresh, errstr);
129+
}
130+
oauthbearer_token_refresh->dispatcher.AddCallback(cb);
131+
} else {
132+
if (oauthbearer_token_refresh != NULL) {
133+
oauthbearer_token_refresh->dispatcher.RemoveCallback(cb);
134+
}
135+
}
136+
} else {
137+
errstr = "Invalid callback type";
121138
}
122139
}
123140

@@ -131,6 +148,12 @@ void Conf::listen() {
131148
if (offset_commit) {
132149
offset_commit->dispatcher.Activate();
133150
}
151+
152+
NodeKafka::Callbacks::OAuthBearerTokenRefresh *oauthbearer_token_refresh =
153+
oauthbearer_token_refresh_cb();
154+
if (oauthbearer_token_refresh) {
155+
oauthbearer_token_refresh->dispatcher.Activate();
156+
}
134157
}
135158

136159
void Conf::stop() {
@@ -143,6 +166,12 @@ void Conf::stop() {
143166
if (offset_commit) {
144167
offset_commit->dispatcher.Deactivate();
145168
}
169+
170+
NodeKafka::Callbacks::OAuthBearerTokenRefresh *oauthbearer_token_refresh =
171+
oauthbearer_token_refresh_cb();
172+
if (oauthbearer_token_refresh) {
173+
oauthbearer_token_refresh->dispatcher.Deactivate();
174+
}
146175
}
147176

148177
Conf::~Conf() {
@@ -167,4 +196,21 @@ NodeKafka::Callbacks::OffsetCommit* Conf::offset_commit_cb() const {
167196
return static_cast<NodeKafka::Callbacks::OffsetCommit*>(cb);
168197
}
169198

199+
NodeKafka::Callbacks::OAuthBearerTokenRefresh *
200+
Conf::oauthbearer_token_refresh_cb() const {
201+
RdKafka::OAuthBearerTokenRefreshCb *cb = NULL;
202+
if (this->get(cb) != RdKafka::Conf::CONF_OK) {
203+
return NULL;
204+
}
205+
return static_cast<NodeKafka::Callbacks::OAuthBearerTokenRefresh *>(cb);
206+
}
207+
208+
bool Conf::is_sasl_oauthbearer() const {
209+
std::string sasl_mechanism;
210+
if (this->get("sasl.mechanisms", sasl_mechanism) != RdKafka::Conf::CONF_OK) {
211+
return false;
212+
}
213+
return sasl_mechanism.compare("OAUTHBEARER") == 0;
214+
}
215+
170216
} // namespace NodeKafka

src/config.h

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,23 @@ class Conf : public RdKafka::Conf {
3434

3535
void ConfigureCallback(const std::string &string_key, const v8::Local<v8::Function> &cb, bool add, std::string &errstr);
3636

37+
bool is_sasl_oauthbearer() const;
38+
3739
private:
38-
NodeKafka::Callbacks::Rebalance* rebalance_cb() const;
40+
NodeKafka::Callbacks::Rebalance *rebalance_cb() const;
3941
NodeKafka::Callbacks::OffsetCommit *offset_commit_cb() const;
42+
NodeKafka::Callbacks::OAuthBearerTokenRefresh *oauthbearer_token_refresh_cb()
43+
const;
4044

4145
// NOTE: Do NOT add any members to this class.
42-
// Internally, to get an instance of this class, we just cast RdKafka::Conf* that we
43-
// obtain from RdKafka::Conf::create(). However, that's internally an instance of a sub-class,
44-
// ConfImpl.
45-
// This means that any members here are aliased to that with the wrong name (for example, the first
46-
// member of this class, if it's a pointer, will be aliased to consume_cb_ in the ConfImpl, and
47-
// and changing one will change the other!)
48-
// TODO: Just don't inherit from RdKafka::Conf, and instead have a member of type RdKafka::Conf*.
46+
// Internally, to get an instance of this class, we just cast RdKafka::Conf*
47+
// that we obtain from RdKafka::Conf::create(). However, that's internally an
48+
// instance of a sub-class, ConfImpl. This means that any members here are
49+
// aliased to that with the wrong name (for example, the first member of this
50+
// class, if it's a pointer, will be aliased to consume_cb_ in the ConfImpl,
51+
// and and changing one will change the other!)
52+
// TODO: Just don't inherit from RdKafka::Conf, and instead have a member of
53+
// type RdKafka::Conf*.
4954
};
5055

5156
} // namespace NodeKafka

0 commit comments

Comments
 (0)