Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@
"@hono/node-server": "^1.18.2",
"@hono/node-ws": "^1.1.1",
"@rivet-gg/actor-core": "^25.1.0",
"@rivetkit/engine-runner": "https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@472",
"@rivetkit/engine-runner": "https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@7f23f3a",
"@types/invariant": "^2",
"@types/node": "^22.13.1",
"@types/ws": "^8",
Expand Down
32 changes: 12 additions & 20 deletions packages/core/src/drivers/engine/actor-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,9 @@ import type {
} from "@rivetkit/engine-runner";
import { Runner } from "@rivetkit/engine-runner";
import * as cbor from "cbor-x";
import { WSContext, WSContextInit } from "hono/ws";
import { WSContext } from "hono/ws";
import invariant from "invariant";
import { ActionContext } from "@/actor/action";
import { generateConnId, generateConnToken } from "@/actor/connection";
import {
CONN_DRIVER_GENERIC_HTTP,
type GenericHttpDriverState,
} from "@/actor/generic-conn-driver";
import * as protoHttpAction from "@/actor/protocol/http/action";
import { deserialize, EncodingSchema, serialize } from "@/actor/protocol/serde";
import { ActorHandle } from "@/client/actor-handle";
import { EncodingSchema } from "@/actor/protocol/serde";
import type { Client } from "@/client/client";
import {
type ActorDriver,
Expand All @@ -27,10 +19,10 @@ import {
} from "@/driver-helpers/mod";
import type {
ActorRouter,
AnyActorInstance as CoreAnyActorInstance,
RegistryConfig,
RunConfig,
UniversalWebSocket,
UpgradeWebSocketArgs,
} from "@/mod";
import {
createActorRouter,
Expand All @@ -51,7 +43,6 @@ import { logger } from "./log";
interface ActorHandler {
actor?: AnyActorInstance;
actorStartPromise?: PromiseWithResolvers<void>;
metadata: RunnerActorConfig["metadata"];
genericConnGlobalState: GenericConnGlobalState;
persistedData?: Uint8Array;
}
Expand Down Expand Up @@ -93,6 +84,7 @@ export class EngineActorDriver implements ActorDriver {
addresses: config.addresses,
totalSlots: config.totalSlots,
runnerName: config.runnerName,
runnerKey: config.runnerKey,
prepopulateActorNames: Object.keys(this.#registryConfig.use),
onConnected: () => {
if (hasDisconnected) {
Expand Down Expand Up @@ -201,13 +193,13 @@ export class EngineActorDriver implements ActorDriver {
): Promise<void> {
logger().debug("runner actor starting", {
actorId,
name: config.metadata.actor.name,
keys: config.metadata.actor.keys,
name: config.name,
key: config.key,
generation,
});

// Deserialize input
let input;
let input: any;
if (config.input) {
input = cbor.decode(config.input);
}
Expand All @@ -218,19 +210,19 @@ export class EngineActorDriver implements ActorDriver {
handler = {
genericConnGlobalState: new GenericConnGlobalState(),
actorStartPromise: Promise.withResolvers(),
metadata: config.metadata,
persistedData: serializeEmptyPersistData(input),
};
this.#actors.set(actorId, handler);
}

const name = config.metadata.actor.name as string;
const key = deserializeActorKey(config.metadata.actor.keys[0]);
const name = config.name as string;
invariant(config.key, "actor should have a key");
const key = deserializeActorKey(config.key);

// Create actor instance
const definition = lookupInRegistry(
this.#registryConfig,
config.metadata.actor.name as string, // TODO: Remove cast
config.name as string, // TODO: Remove cast
);
handler.actor = definition.instantiate();

Expand Down Expand Up @@ -299,7 +291,7 @@ export class EngineActorDriver implements ActorDriver {
// Fetch WS handler
//
// We store the promise since we need to add WebSocket event listeners immediately that will wait for the promise to resolve
let wsHandlerPromise;
let wsHandlerPromise: Promise<UpgradeWebSocketArgs>;
if (url.pathname === PATH_CONNECT_WEBSOCKET) {
wsHandlerPromise = handleWebSocketConnect(
request,
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/drivers/engine/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ export const ConfigSchema = z
runnerName: z
.string()
.default(getEnvUniversal("RIVET_RUNNER") ?? "rivetkit"),
// TODO: Automatically attempt ot determine key by common env vars (e.g. k8s pod name)
runnerKey: z
.string()
.default(getEnvUniversal("RIVET_RUNNER_KEY") ?? crypto.randomUUID()),
totalSlots: z.number().default(100_000),
addresses: z
.record(
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/drivers/engine/kv.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export const KEYS = {
PERSIST_DATA: [Uint8Array.from([1, 1])],
PERSIST_DATA: Uint8Array.from([1, 1]),
};
1 change: 0 additions & 1 deletion packages/core/tsup.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ import defaultConfig from "../../tsup.base.ts";

export default defineConfig({
...defaultConfig,
noExternal: ["@rivetkit/engine-runner", "@rivetkit/engine-runner-protocol"],
});
67 changes: 54 additions & 13 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading