Skip to content

Commit b1877a0

Browse files
committed
feat: subscribe to events before connect
1 parent f8d641c commit b1877a0

File tree

15 files changed

+91
-4
lines changed

15 files changed

+91
-4
lines changed

examples/counter/scripts/connect.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ async function main() {
88

99
const counter = await client.counter.getOrCreate().connect();
1010

11+
counter.on("innit", () => console.log("Connected to counter actor"));
1112
counter.on("newCount", (count: number) => console.log("Event:", count));
1213

1314
for (let i = 0; i < 5; i++) {

examples/counter/src/registry.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ const counter = actor({
77
onAuth: () => {
88
return true;
99
},
10+
onConnect: (c) => {
11+
c.broadcast("innit", "Connected to counter actor");
12+
},
1013
actions: {
1114
increment: (c, x: number) => {
1215
c.state.count += x;

examples/hono-react/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"type": "module",
66
"scripts": {
77
"dev": "concurrently \"npm run dev:backend\" \"npm run dev:frontend\"",
8-
"dev:backend": "tsx --watch src/backend/server.ts",
8+
"dev:backend": "_LOG_LEVEL=1 tsx --watch src/backend/server.ts",
99
"dev:frontend": "vite",
1010
"build": "vite build",
1111
"check-types": "tsc --noEmit",

examples/hono-react/src/backend/registry.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export const counter = actor({
88
actions: {
99
increment: (c, x: number) => {
1010
c.state.count += x;
11+
console.log("increment", c.state.count);
1112
c.broadcast("newCount", c.state.count);
1213
return c.state.count;
1314
},

packages/core/src/actor/instance.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,7 @@ export class ActorInstance<
786786
state: CS,
787787
driverId: string,
788788
driverState: unknown,
789+
subs: string[] | undefined,
789790
authData: unknown,
790791
): Promise<Conn<S, CP, CS, V, I, AD, DB>> {
791792
this.#assertReady();
@@ -814,6 +815,12 @@ export class ActorInstance<
814815
);
815816
this.#connections.set(conn.id, conn);
816817

818+
if (subs) {
819+
for (const sub of subs) {
820+
this.#addSubscription(sub, conn, true);
821+
}
822+
}
823+
817824
// Add to persistence & save immediately
818825
this.#persist.c.push(persist);
819826
this.saveState({ immediate: true });
@@ -875,6 +882,7 @@ export class ActorInstance<
875882
return await this.executeAction(ctx, name, args);
876883
},
877884
onSubscribe: async (eventName, conn) => {
885+
console.log("subscribing to event", { eventName, connId: conn.id });
878886
this.inspector.emitter.emit("eventFired", {
879887
type: "subscribe",
880888
eventName,
@@ -1251,6 +1259,13 @@ export class ActorInstance<
12511259
_broadcast<Args extends Array<unknown>>(name: string, ...args: Args) {
12521260
this.#assertReady();
12531261

1262+
console.log("broadcasting event", {
1263+
name,
1264+
args,
1265+
actorId: this.id,
1266+
subscriptions: this.#subscriptionIndex.size,
1267+
connections: this.conns.size,
1268+
});
12541269
this.inspector.emitter.emit("eventFired", {
12551270
type: "broadcast",
12561271
eventName: name,

packages/core/src/actor/router-endpoints.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ export async function handleWebSocketConnect(
104104
actorId: string,
105105
encoding: Encoding,
106106
parameters: unknown,
107+
subs: string[] | undefined,
107108
authData: unknown,
108109
): Promise<UpgradeWebSocketArgs> {
109110
const exposeInternalError = c ? getRequestExposeInternalError(c.req) : false;
@@ -173,6 +174,7 @@ export async function handleWebSocketConnect(
173174
connState,
174175
CONN_DRIVER_GENERIC_WEBSOCKET,
175176
{ encoding } satisfies GenericWebSocketDriverState,
177+
subs,
176178
authData,
177179
);
178180

@@ -319,6 +321,7 @@ export async function handleSseConnect(
319321
runConfig: RunConfig,
320322
actorDriver: ActorDriver,
321323
actorId: string,
324+
subs: string[] | undefined,
322325
authData: unknown,
323326
) {
324327
const encoding = getRequestEncoding(c.req);
@@ -354,6 +357,7 @@ export async function handleSseConnect(
354357
connState,
355358
CONN_DRIVER_GENERIC_SSE,
356359
{ encoding } satisfies GenericSseDriverState,
360+
subs,
357361
authData,
358362
);
359363

@@ -482,6 +486,7 @@ export async function handleAction(
482486
connState,
483487
CONN_DRIVER_GENERIC_HTTP,
484488
{} satisfies GenericHttpDriverState,
489+
[],
485490
authData,
486491
);
487492

@@ -698,6 +703,8 @@ export const HEADER_CONN_ID = "X-RivetKit-Conn";
698703

699704
export const HEADER_CONN_TOKEN = "X-RivetKit-Conn-Token";
700705

706+
export const HEADER_CONN_SUBS = "X-RivetKit-Conn-Subs";
707+
701708
/**
702709
* Headers that publics can send from public clients.
703710
*
@@ -712,6 +719,7 @@ export const ALLOWED_PUBLIC_HEADERS = [
712719
HEADER_ACTOR_ID,
713720
HEADER_CONN_ID,
714721
HEADER_CONN_TOKEN,
722+
HEADER_CONN_SUBS,
715723
];
716724

717725
// Helper to get connection parameters for the request

packages/core/src/actor/router.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
HEADER_AUTH_DATA,
1313
HEADER_CONN_ID,
1414
HEADER_CONN_PARAMS,
15+
HEADER_CONN_SUBS,
1516
HEADER_CONN_TOKEN,
1617
HEADER_ENCODING,
1718
handleAction,
@@ -83,12 +84,14 @@ export function createActorRouter(
8384
const encodingRaw = c.req.header(HEADER_ENCODING);
8485
const connParamsRaw = c.req.header(HEADER_CONN_PARAMS);
8586
const authDataRaw = c.req.header(HEADER_AUTH_DATA);
87+
const subsRaw = c.req.header(HEADER_CONN_SUBS);
8688

8789
const encoding = EncodingSchema.parse(encodingRaw);
8890
const connParams = connParamsRaw
8991
? JSON.parse(connParamsRaw)
9092
: undefined;
9193
const authData = authDataRaw ? JSON.parse(authDataRaw) : undefined;
94+
const subs = subsRaw ? JSON.parse(subsRaw) : undefined;
9295

9396
return await handleWebSocketConnect(
9497
c as HonoContext,
@@ -97,6 +100,7 @@ export function createActorRouter(
97100
c.env.actorId,
98101
encoding,
99102
connParams,
103+
subs,
100104
authData,
101105
);
102106
})(c, noopNext());
@@ -114,8 +118,20 @@ export function createActorRouter(
114118
if (authDataRaw) {
115119
authData = JSON.parse(authDataRaw);
116120
}
121+
const subsRaw = c.req.header(HEADER_CONN_SUBS);
122+
let subs: string[] | undefined;
123+
if (subsRaw) {
124+
subs = JSON.parse(subsRaw);
125+
}
117126

118-
return handleSseConnect(c, runConfig, actorDriver, c.env.actorId, authData);
127+
return handleSseConnect(
128+
c,
129+
runConfig,
130+
actorDriver,
131+
c.env.actorId,
132+
subs,
133+
authData,
134+
);
119135
});
120136

121137
router.post("/action/:action", async (c) => {

packages/core/src/client/actor-conn.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ enc
259259
this.#actorQuery,
260260
this.#encodingKind,
261261
this.#params,
262+
["innit"],
262263
signal ? { signal } : undefined,
263264
);
264265
this.#transport = { websocket: ws };

packages/core/src/client/actor-handle.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ export class ActorHandleRaw {
4848
this.#params = params;
4949
}
5050

51+
#eventListeners = new Map<string, Set<Function>>();
52+
5153
/**
5254
* Call a raw action. This method sends an HTTP request to invoke the named action.
5355
*

packages/core/src/client/client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ export interface ClientDriver {
181181
actorQuery: ActorQuery,
182182
encodingKind: Encoding,
183183
params: unknown,
184+
subs: string[] | undefined,
184185
opts: { signal?: AbortSignal } | undefined,
185186
): Promise<WebSocket>;
186187
connectSse(

0 commit comments

Comments
 (0)