Skip to content

Commit cc0062c

Browse files
committed
chore(core): impl rivet engine alarms (#1320)
1 parent 89b58dd commit cc0062c

File tree

13 files changed

+148
-101
lines changed

13 files changed

+148
-101
lines changed

packages/rivetkit/fixtures/driver-test-suite/sleep.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { actor, type UniversalWebSocket } from "rivetkit";
22
import { promiseWithResolvers } from "rivetkit/utils";
33

4-
export const SLEEP_TIMEOUT = 500;
4+
export const SLEEP_TIMEOUT = 1000;
55

66
export const sleep = actor({
77
state: { startCount: 0, sleepCount: 0 },

packages/rivetkit/src/actor/instance.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,9 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
373373
this.#config.options.connectionLivenessInterval,
374374
);
375375
this.#checkConnectionsLiveness();
376+
377+
// Trigger any pending alarms
378+
await this._onAlarm();
376379
}
377380

378381
async #scheduleEventInner(newEvent: PersistedScheduleEvent) {
@@ -401,6 +404,12 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
401404
}
402405
}
403406

407+
/**
408+
* Triggers any pending alarms.
409+
*
410+
* This method is idempotent. It's called automatically when the actor wakes
411+
* in order to trigger any pending alarms.
412+
*/
404413
async _onAlarm() {
405414
const now = Date.now();
406415
this.actorContext.log.debug({
@@ -424,7 +433,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
424433
this.#rLog.warn({ msg: "no events are due yet, time may have broken" });
425434
if (this.#persist.scheduledEvents.length > 0) {
426435
const nextTs = this.#persist.scheduledEvents[0].timestamp;
427-
this.actorContext.log.warn({
436+
this.actorContext.log.debug({
428437
msg: "alarm fired early, rescheduling for next event",
429438
now,
430439
nextTs,
@@ -786,7 +795,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
786795
}
787796

788797
/**
789-
* Connection disconnected.
798+
* Call when conn is disconnected. Used by transports.
790799
*
791800
* If a clean diconnect, will be removed immediately.
792801
*
@@ -800,7 +809,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
800809
// If socket ID is provided, check if it matches the current socket ID
801810
// If it doesn't match, this is a stale disconnect event from an old socket
802811
if (socketId && conn.__socket && socketId !== conn.__socket.socketId) {
803-
this.rLog.debug({
812+
this.#rLog.debug({
804813
msg: "ignoring stale disconnect event",
805814
connId: conn.id,
806815
eventSocketId: socketId,
@@ -825,6 +834,9 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
825834

826835
// Remove socket
827836
conn.__socket = undefined;
837+
838+
// Update sleep
839+
this.#resetSleepTimer();
828840
}
829841
}
830842

@@ -848,6 +860,7 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
848860

849861
// Remove from state
850862
this.#connections.delete(conn.id);
863+
this.#rLog.debug({ msg: "removed conn", connId: conn.id });
851864

852865
// Remove subscriptions
853866
for (const eventName of [...conn.subscriptions.values()]) {

packages/rivetkit/src/actor/router-endpoints.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,8 @@ export async function handleWebSocketConnect(
292292
// Handle cleanup asynchronously
293293
handlersPromise
294294
.then(({ conn, actor }) => {
295-
actor.__connDisconnected(conn, event.wasClean, socketId);
295+
const wasClean = event.wasClean || event.code === 1000;
296+
actor.__connDisconnected(conn, wasClean, socketId);
296297
})
297298
.catch((error) => {
298299
deconstructError(

packages/rivetkit/src/actor/router.ts

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ export type ActorRouter = Hono<{ Bindings: ActorRouterBindings }>;
6969
export function createActorRouter(
7070
runConfig: RunConfig,
7171
actorDriver: ActorDriver,
72+
isTest: boolean,
7273
): ActorRouter {
7374
const router = new Hono<{ Bindings: ActorRouterBindings }>({ strict: false });
7475

@@ -84,37 +85,39 @@ export function createActorRouter(
8485
return c.text("ok");
8586
});
8687

87-
// Test endpoint to force disconnect a connection non-cleanly
88-
router.post("/.test/force-disconnect", async (c) => {
89-
const connId = c.req.query("conn");
88+
if (isTest) {
89+
// Test endpoint to force disconnect a connection non-cleanly
90+
router.post("/.test/force-disconnect", async (c) => {
91+
const connId = c.req.query("conn");
9092

91-
if (!connId) {
92-
return c.text("Missing conn query parameter", 400);
93-
}
93+
if (!connId) {
94+
return c.text("Missing conn query parameter", 400);
95+
}
9496

95-
const actor = await actorDriver.loadActor(c.env.actorId);
96-
const conn = actor.__getConnForId(connId);
97+
const actor = await actorDriver.loadActor(c.env.actorId);
98+
const conn = actor.__getConnForId(connId);
9799

98-
if (!conn) {
99-
return c.text(`Connection not found: ${connId}`, 404);
100-
}
100+
if (!conn) {
101+
return c.text(`Connection not found: ${connId}`, 404);
102+
}
101103

102-
// Force close the websocket/SSE connection without clean shutdown
103-
const driverState = conn.__driverState;
104-
if (driverState && ConnDriverKind.WEBSOCKET in driverState) {
105-
const ws = driverState[ConnDriverKind.WEBSOCKET].websocket;
104+
// Force close the websocket/SSE connection without clean shutdown
105+
const driverState = conn.__driverState;
106+
if (driverState && ConnDriverKind.WEBSOCKET in driverState) {
107+
const ws = driverState[ConnDriverKind.WEBSOCKET].websocket;
106108

107-
// Force close without sending close frame
108-
(ws.raw as any).terminate();
109-
} else if (driverState && ConnDriverKind.SSE in driverState) {
110-
const stream = driverState[ConnDriverKind.SSE].stream;
109+
// Force close without sending close frame
110+
(ws.raw as any).terminate();
111+
} else if (driverState && ConnDriverKind.SSE in driverState) {
112+
const stream = driverState[ConnDriverKind.SSE].stream;
111113

112-
// Force close the SSE stream
113-
stream.abort();
114-
}
114+
// Force close the SSE stream
115+
stream.abort();
116+
}
115117

116-
return c.json({ success: true });
117-
});
118+
return c.json({ success: true });
119+
});
120+
}
118121

119122
router.get(PATH_CONNECT_WEBSOCKET, async (c) => {
120123
const upgradeWebSocket = runConfig.getUpgradeWebSocket?.();

packages/rivetkit/src/client/actor-conn.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,8 @@ enc
914914
if (!this.#transport) {
915915
// Nothing to do
916916
} else if ("websocket" in this.#transport) {
917+
logger().debug("closing ws");
918+
917919
const ws = this.#transport.websocket;
918920
// Check if WebSocket is already closed or closing
919921
if (
@@ -927,10 +929,12 @@ enc
927929
logger().debug({ msg: "ws closed" });
928930
resolve(undefined);
929931
});
930-
ws.close();
932+
ws.close(1000, "Normal closure");
931933
await promise;
932934
}
933935
} else if ("sse" in this.#transport) {
936+
logger().debug("closing sse");
937+
934938
// Send close request to server for SSE connections
935939
if (this.#connectionId && this.#connectionToken) {
936940
try {

packages/rivetkit/src/driver-test-suite/mod.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export interface SkipTests {
3535
schedule?: boolean;
3636
sleep?: boolean;
3737
sse?: boolean;
38+
inline?: boolean;
3839
}
3940

4041
export interface DriverTestConfig {
@@ -79,7 +80,10 @@ export interface DriverDeployOutput {
7980
export function runDriverTests(
8081
driverTestConfigPartial: Omit<DriverTestConfig, "clientType" | "transport">,
8182
) {
82-
for (const clientType of ["http", "inline"] as ClientType[]) {
83+
const clientTypes: ClientType[] = driverTestConfigPartial.skip?.inline
84+
? ["http"]
85+
: ["http", "inline"];
86+
for (const clientType of clientTypes) {
8387
const driverTestConfig: DriverTestConfig = {
8488
...driverTestConfigPartial,
8589
clientType,
@@ -148,7 +152,12 @@ export function runDriverTests(
148152
export async function createTestRuntime(
149153
registryPath: string,
150154
driverFactory: (registry: Registry<any>) => Promise<{
151-
rivetEngine?: { endpoint: string; namespace: string; runnerName: string };
155+
rivetEngine?: {
156+
endpoint: string;
157+
namespace: string;
158+
runnerName: string;
159+
token: string;
160+
};
152161
driver: DriverConfig;
153162
cleanup?: () => Promise<void>;
154163
}>,

packages/rivetkit/src/driver-test-suite/tests/actor-schedule.ts

Lines changed: 14 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,21 @@ export function runActorScheduleTests(driverTestConfig: DriverTestConfig) {
66
describe.skipIf(driverTestConfig.skip?.schedule)(
77
"Actor Schedule Tests",
88
() => {
9+
// See alarm + actor sleeping test in actor-sleep.ts
10+
911
describe("Scheduled Alarms", () => {
1012
test("executes c.schedule.at() with specific timestamp", async (c) => {
1113
const { client } = await setupDriverTest(c, driverTestConfig);
1214

1315
// Create instance
1416
const scheduled = client.scheduled.getOrCreate();
1517

16-
// Schedule a task to run in 100ms using timestamp
17-
const timestamp = Date.now() + 100;
18+
// Schedule a task to run using timestamp
19+
const timestamp = Date.now() + 250;
1820
await scheduled.scheduleTaskAt(timestamp);
1921

2022
// Wait for longer than the scheduled time
21-
await waitFor(driverTestConfig, 200);
23+
await waitFor(driverTestConfig, 500);
2224

2325
// Verify the scheduled task ran
2426
const lastRun = await scheduled.getLastRun();
@@ -34,11 +36,11 @@ export function runActorScheduleTests(driverTestConfig: DriverTestConfig) {
3436
// Create instance
3537
const scheduled = client.scheduled.getOrCreate();
3638

37-
// Schedule a task to run in 100ms using delay
38-
await scheduled.scheduleTaskAfter(100);
39+
// Schedule a task to run using delay
40+
await scheduled.scheduleTaskAfter(250);
3941

4042
// Wait for longer than the scheduled time
41-
await waitFor(driverTestConfig, 200);
43+
await waitFor(driverTestConfig, 500);
4244

4345
// Verify the scheduled task ran
4446
const lastRun = await scheduled.getLastRun();
@@ -48,31 +50,6 @@ export function runActorScheduleTests(driverTestConfig: DriverTestConfig) {
4850
expect(scheduledCount).toBe(1);
4951
});
5052

51-
test("scheduled tasks persist across actor restarts", async (c) => {
52-
const { client } = await setupDriverTest(c, driverTestConfig);
53-
54-
// Create instance and schedule
55-
const scheduled = client.scheduled.getOrCreate();
56-
await scheduled.scheduleTaskAfter(200);
57-
58-
// Wait a little so the schedule is stored but hasn't triggered yet
59-
await waitFor(driverTestConfig, 100);
60-
61-
// Get a new reference to simulate actor restart
62-
const newInstance = client.scheduled.getOrCreate();
63-
64-
// Verify the schedule still exists but hasn't run yet
65-
const initialCount = await newInstance.getScheduledCount();
66-
expect(initialCount).toBe(0);
67-
68-
// Wait for the scheduled task to execute
69-
await waitFor(driverTestConfig, 200);
70-
71-
// Verify the scheduled task ran after "restart"
72-
const scheduledCount = await newInstance.getScheduledCount();
73-
expect(scheduledCount).toBe(1);
74-
});
75-
7653
test("multiple scheduled tasks execute in order", async (c) => {
7754
const { client } = await setupDriverTest(c, driverTestConfig);
7855

@@ -83,22 +60,22 @@ export function runActorScheduleTests(driverTestConfig: DriverTestConfig) {
8360
await scheduled.clearHistory();
8461

8562
// Schedule multiple tasks with different delays
86-
await scheduled.scheduleTaskAfterWithId("first", 100);
87-
await scheduled.scheduleTaskAfterWithId("second", 300);
88-
await scheduled.scheduleTaskAfterWithId("third", 500);
63+
await scheduled.scheduleTaskAfterWithId("first", 250);
64+
await scheduled.scheduleTaskAfterWithId("second", 750);
65+
await scheduled.scheduleTaskAfterWithId("third", 1250);
8966

9067
// Wait for first task only
91-
await waitFor(driverTestConfig, 200);
68+
await waitFor(driverTestConfig, 500);
9269
const history1 = await scheduled.getTaskHistory();
9370
expect(history1).toEqual(["first"]);
9471

9572
// Wait for second task
96-
await waitFor(driverTestConfig, 200);
73+
await waitFor(driverTestConfig, 500);
9774
const history2 = await scheduled.getTaskHistory();
9875
expect(history2).toEqual(["first", "second"]);
9976

10077
// Wait for third task
101-
await waitFor(driverTestConfig, 200);
78+
await waitFor(driverTestConfig, 500);
10279
const history3 = await scheduled.getTaskHistory();
10380
expect(history3).toEqual(["first", "second", "third"]);
10481
});

0 commit comments

Comments
 (0)