Skip to content

Commit a4d0f29

Browse files
authored
Merge pull request #686 from nats-io/pub-msg
[FEAT] [CORE] added the ability to publish by providing a Msg argument
2 parents e3ccac2 + 9781b7b commit a4d0f29

File tree

4 files changed

+117
-2
lines changed

4 files changed

+117
-2
lines changed

Makefile

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
.PHONY: build test bundle lint
22

3-
export DENO_JOBS=4
4-
53
build: test
64

75
lint:

nats-base-client/core.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,20 @@ export interface NatsConnection {
549549
options?: PublishOptions,
550550
): void;
551551

552+
/**
553+
* Publishes using the subject of the specified message, specifying the
554+
* data, headers and reply found in the message if any.
555+
* @param msg
556+
*/
557+
publishMessage(msg: Msg): void;
558+
559+
/**
560+
* Replies using the reply subject of the specified message, specifying the
561+
* data, headers in the message if any.
562+
* @param msg
563+
*/
564+
respondMessage(msg: Msg): boolean;
565+
552566
/**
553567
* Subscribe expresses interest in the specified subject. The subject may
554568
* have wildcards. Messages are delivered to the {@link SubOpts#callback |SubscriptionOptions callback}

nats-base-client/nats.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,24 @@ export class NatsConnectionImpl implements NatsConnection {
126126
this.protocol.publish(subject, data, options);
127127
}
128128

129+
publishMessage(msg: Msg) {
130+
return this.publish(msg.subject, msg.data, {
131+
reply: msg.reply,
132+
headers: msg.headers,
133+
});
134+
}
135+
136+
respondMessage(msg: Msg) {
137+
if (msg.reply) {
138+
this.publish(msg.reply, msg.data, {
139+
reply: msg.reply,
140+
headers: msg.headers,
141+
});
142+
return true;
143+
}
144+
return false;
145+
}
146+
129147
subscribe(
130148
subject: string,
131149
opts: SubscriptionOptions = {},

tests/basics_test.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,17 @@ import {
5151
headers,
5252
isIP,
5353
NatsConnectionImpl,
54+
Payload,
55+
PublishOptions,
5456
RequestStrategy,
5557
SubscriptionImpl,
5658
} from "../nats-base-client/internal_mod.ts";
5759
import { Feature } from "../nats-base-client/semver.ts";
5860
import { syncIterator } from "../nats-base-client/core.ts";
61+
import {
62+
MsgHdrs,
63+
Publisher,
64+
} from "https://deno.land/x/nats@v1.18.0/nats-base-client/core.ts";
5965

6066
Deno.test("basics - connect port", async () => {
6167
const ns = await NatsServer.start();
@@ -1374,3 +1380,82 @@ Deno.test("basics - sync subscription", async () => {
13741380

13751381
await cleanup(ns, nc);
13761382
});
1383+
1384+
Deno.test("basics - publish message", async () => {
1385+
const { ns, nc } = await setup();
1386+
const sub = nc.subscribe("q");
1387+
1388+
const nis = new MM(nc);
1389+
nis.data = new TextEncoder().encode("not in service");
1390+
1391+
(async () => {
1392+
for await (const m of sub) {
1393+
if (m.reply) {
1394+
nis.subject = m.reply;
1395+
nc.publishMessage(nis);
1396+
}
1397+
}
1398+
})().then();
1399+
1400+
const r = await nc.request("q");
1401+
assertEquals(r.string(), "not in service");
1402+
1403+
await cleanup(ns, nc);
1404+
});
1405+
1406+
Deno.test("basics - respond message", async () => {
1407+
const { ns, nc } = await setup();
1408+
const sub = nc.subscribe("q");
1409+
1410+
const nis = new MM(nc);
1411+
nis.data = new TextEncoder().encode("not in service");
1412+
1413+
(async () => {
1414+
for await (const m of sub) {
1415+
if (m.reply) {
1416+
nis.reply = m.reply;
1417+
nc.respondMessage(nis);
1418+
}
1419+
}
1420+
})().then();
1421+
1422+
const r = await nc.request("q");
1423+
assertEquals(r.string(), "not in service");
1424+
1425+
await cleanup(ns, nc);
1426+
});
1427+
1428+
class MM implements Msg {
1429+
data!: Uint8Array;
1430+
sid: number;
1431+
subject!: string;
1432+
reply?: string;
1433+
headers?: MsgHdrs;
1434+
publisher: Publisher;
1435+
1436+
constructor(p: Publisher) {
1437+
this.publisher = p;
1438+
this.sid = -1;
1439+
}
1440+
1441+
json<T>(): T {
1442+
throw new Error("not implemented");
1443+
}
1444+
1445+
respond(payload?: Payload, opts?: PublishOptions): boolean {
1446+
if (!this.reply) {
1447+
return false;
1448+
}
1449+
payload = payload || Empty;
1450+
this.publisher.publish(this.reply, payload, opts);
1451+
return true;
1452+
}
1453+
1454+
respondMessage(m: Msg): boolean {
1455+
return this.respond(m.data, { headers: m.headers, reply: m.reply });
1456+
}
1457+
1458+
string(): string {
1459+
return "";
1460+
}
1461+
}

0 commit comments

Comments
 (0)