Skip to content

Commit 20291dc

Browse files
authored
stream-management: Send ack on close (#1059)
1 parent ee361dc commit 20291dc

File tree

5 files changed

+89
-3
lines changed

5 files changed

+89
-3
lines changed

packages/connection/index.js

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,8 @@ class Connection extends EventEmitter {
285285
* https://tools.ietf.org/html/rfc7395#section-3.6
286286
*/
287287
async _closeStream(timeout = this.timeout) {
288+
await this.#runHooks("close");
289+
288290
const fragment = this.footer(this.footerElement());
289291

290292
await this.write(fragment);
@@ -360,6 +362,49 @@ class Connection extends EventEmitter {
360362

361363
// Override
362364
socketParameters() {}
365+
366+
/* Experimental hooks */
367+
#hooks = new Map();
368+
#hook_events = new Set(["close"]);
369+
hook(event, handler /*priority = 0 TODO */) {
370+
this.#assertHookEventName(event);
371+
372+
if (!this.#hooks.has(event)) {
373+
this.#hooks.set(event, new Set());
374+
}
375+
376+
this.#hooks.get(event).add([handler]);
377+
}
378+
#assertHookEventName(event) {
379+
if (!this.#hook_events.has(event)) {
380+
throw new Error(`Hook event name "${event}" is unknown.`);
381+
}
382+
}
383+
unhook(event, handler) {
384+
this.#assertHookEventName(event);
385+
const handlers = this.#hooks.get("event");
386+
const item = [...handlers].find((item) => item.handler === handler);
387+
handlers.remove(item);
388+
}
389+
async #runHooks(event, ...args) {
390+
this.#assertHookEventName(event);
391+
392+
const hooks = this.#hooks.get(event);
393+
if (!hooks) return;
394+
395+
// TODO run hooks by priority
396+
// run hooks with the same priority in parallel
397+
398+
await Promise.all(
399+
[...hooks].map(async ([handler]) => {
400+
try {
401+
await handler(...args);
402+
} catch (err) {
403+
this.emit("error", err);
404+
}
405+
}),
406+
);
407+
}
363408
}
364409

365410
// Override

packages/middleware/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Supports Node.js and browsers.
66

77
## Install
88

9-
```
9+
```sh
1010
npm install @xmpp/middleware
1111
```
1212

packages/stream-management/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,19 @@ If the session fails to resume, entity will fallback to regular session establis
1313
- Automatically responds to acks.
1414
- Periodically request acks.
1515
- If server fails to respond, triggers a reconnect.
16+
- On reconnect retry sending the queue
17+
18+
When a stanza is re-sent, a [delay element](https://xmpp.org/extensions/xep-0203.html) will be added to it.
19+
20+
- `from` client jid
21+
- `stamp` [date/time](https://xmpp.org/extensions/xep-0082.html) at which the stanza was meant to be sent
22+
23+
```xml
24+
<delay xmlns="urn:xmpp:delay"
25+
from="username@example.net/resource"
26+
stamp="1990-01-01T00:00:00Z"
27+
/>
28+
```
1629

1730
## Events
1831

packages/stream-management/index.js

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,22 @@ export default function streamManagement({
6363
requestAckInterval: 30_000,
6464
});
6565

66+
async function sendAck() {
67+
try {
68+
await entity.send(xml("a", { xmlns: NS, h: sm.inbound }));
69+
} catch {}
70+
}
71+
6672
entity.on("disconnect", () => {
6773
clearTimeout(timeoutTimeout);
6874
clearTimeout(requestAckTimeout);
75+
sm.enabled = false;
76+
});
77+
78+
// It is RECOMMENDED that initiating entities (usually clients) send an element right before they gracefully close the stream, in order to inform the peer about received stanzas
79+
entity.hook("close", async () => {
80+
if (!sm.enabled) return;
81+
await sendAck();
6982
});
7083

7184
async function resumed(resumed) {
@@ -127,14 +140,14 @@ export default function streamManagement({
127140
sm.id = "";
128141
});
129142

130-
middleware.use((context, next) => {
143+
middleware.use(async (context, next) => {
131144
const { stanza } = context;
132145
clearTimeout(timeoutTimeout);
133146
if (["presence", "message", "iq"].includes(stanza.name)) {
134147
sm.inbound += 1;
135148
} else if (stanza.is("r", NS)) {
136149
// > When an <r/> element ("request") is received, the recipient MUST acknowledge it by sending an <a/> element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the <r/> element.
137-
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
150+
await sendAck();
138151
} else if (stanza.is("a", NS)) {
139152
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
140153
ackQueue(+stanza.attrs.h);

packages/stream-management/stream-features.test.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,3 +336,18 @@ test("resume - failed with something in queue", async () => {
336336
expect(entity.streamManagement.outbound).toBe(0);
337337
expect(entity.streamManagement.outbound_q).toBeEmpty();
338338
});
339+
340+
test("sends an <a/> element before closing", async () => {
341+
const { entity, streamManagement } = mockClient();
342+
streamManagement.enabled = true;
343+
streamManagement.inbound = 42;
344+
entity.status = "online";
345+
346+
const promise_disconnect = entity.disconnect();
347+
348+
expect(await entity.catchOutgoing()).toEqual(
349+
<a xmlns="urn:xmpp:sm:3" h={streamManagement.inbound} />,
350+
);
351+
352+
await promise_disconnect;
353+
});

0 commit comments

Comments
 (0)