From d32213850233cbc9806db98ec6971bf079a865ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoffer=20R=C3=B8dvik?= Date: Mon, 18 Aug 2025 01:05:23 +0200 Subject: [PATCH] feat: add procedure to relay --- api/libraries/server/context.ts | 33 ++++- api/libraries/server/request.ts | 42 +++--- api/libraries/server/storage.ts | 3 +- api/libraries/socket/mod.ts | 1 - api/libraries/socket/upgrade.ts | 33 +++-- api/package.json | 1 + api/procedures/event.ts | 22 +++ deno.lock | 9 ++ spec/relay/libraries/procedure.ts | 239 ++++++++++++++++++++++++++++++ spec/relay/libraries/route.ts | 4 +- spec/relay/libraries/types.ts | 6 + spec/relay/mod.ts | 1 + 12 files changed, 346 insertions(+), 48 deletions(-) delete mode 100644 api/libraries/socket/mod.ts create mode 100644 api/procedures/event.ts create mode 100644 spec/relay/libraries/procedure.ts create mode 100644 spec/relay/libraries/types.ts diff --git a/api/libraries/server/context.ts b/api/libraries/server/context.ts index 5f40a10..e030d46 100644 --- a/api/libraries/server/context.ts +++ b/api/libraries/server/context.ts @@ -1,4 +1,6 @@ -import { ServerContext, UnauthorizedError } from "@spec/relay"; +import { ServerContext } from "@spec/relay"; + +import type { Sockets } from "~libraries/socket/sockets.ts"; import { Session } from "../auth/auth.ts"; import { req } from "./request.ts"; @@ -11,15 +13,25 @@ declare module "@spec/relay" { request: Request; /** - * Get request session instance. + * Is the request authenticated. */ - session: Session; + isAuthenticated: boolean; /** * Get account id from session, throws an error if the request * does not have a valid session. */ accountId: string; + + /** + * Get request session instance. + */ + session: Session; + + /** + * Sockets instance attached to the server. + */ + sockets: Sockets; } } @@ -27,15 +39,20 @@ export function getRequestContext(request: Request): ServerContext { return { request, - get session(): Session { - if (req.session === undefined) { - throw new UnauthorizedError(); - } - return req.session; + get isAuthenticated(): boolean { + return req.isAuthenticated; }, get accountId() { return this.session.accountId; }, + + get session(): Session { + return req.session; + }, + + get sockets(): Sockets { + return req.sockets; + }, }; } diff --git a/api/libraries/server/request.ts b/api/libraries/server/request.ts index 7b5a425..1a8dad9 100644 --- a/api/libraries/server/request.ts +++ b/api/libraries/server/request.ts @@ -1,3 +1,5 @@ +import { InternalServerError, UnauthorizedError } from "@spec/relay"; + import { Session } from "../auth/auth.ts"; import { asyncLocalStorage } from "./storage.ts"; @@ -5,21 +7,16 @@ export const req = { get store() { const store = asyncLocalStorage.getStore(); if (store === undefined) { - throw new Error("Request > AsyncLocalStorage not defined."); + throw new InternalServerError("AsyncLocalStorage not defined."); } return store; }, - get socket() { - return this.store.socket; - }, - - /** - * Get store that is potentially undefined. - * Typically used when utility functions might run in and out of request scope. - */ - get unsafeStore() { - return asyncLocalStorage.getStore(); + get sockets() { + if (this.store.sockets === undefined) { + throw new InternalServerError("Sockets not defined."); + } + return this.store.sockets; }, /** @@ -32,7 +29,10 @@ export const req = { /** * Get current session. */ - get session(): Session | undefined { + get session(): Session { + if (this.store.session === undefined) { + throw new UnauthorizedError(); + } return this.store.session; }, @@ -44,14 +44,18 @@ export const req = { }, /** - * Sends a JSON-RPC 2.0 notification to the request if sent through a - * WebSocket connection. - * - * @param method - Method to send notification to. - * @param params - Params to pass to the method. + * Get current session. */ - notify(method: string, params: any): void { - this.socket?.send(JSON.stringify({ jsonrpc: "2.0", method, params })); + getSession(): Session | undefined { + return this.store.session; + }, + + /** + * Get store that is potentially undefined. + * Typically used when utility functions might run in and out of request scope. + */ + getStore() { + return asyncLocalStorage.getStore(); }, } as const; diff --git a/api/libraries/server/storage.ts b/api/libraries/server/storage.ts index 3cdf826..693aa76 100644 --- a/api/libraries/server/storage.ts +++ b/api/libraries/server/storage.ts @@ -1,6 +1,7 @@ import { AsyncLocalStorage } from "node:async_hooks"; import type { Session } from "~libraries/auth/mod.ts"; +import type { Sockets } from "~libraries/socket/sockets.ts"; export const asyncLocalStorage = new AsyncLocalStorage<{ session?: Session; @@ -9,7 +10,7 @@ export const asyncLocalStorage = new AsyncLocalStorage<{ start: number; end?: number; }; - socket?: WebSocket; + sockets?: Sockets; response: { headers: Headers; }; diff --git a/api/libraries/socket/mod.ts b/api/libraries/socket/mod.ts deleted file mode 100644 index 8496e56..0000000 --- a/api/libraries/socket/mod.ts +++ /dev/null @@ -1 +0,0 @@ -export { Sockets } from "./sockets.ts"; diff --git a/api/libraries/socket/upgrade.ts b/api/libraries/socket/upgrade.ts index 3bc7962..824faf8 100644 --- a/api/libraries/socket/upgrade.ts +++ b/api/libraries/socket/upgrade.ts @@ -2,6 +2,7 @@ import { toJsonRpc } from "@valkyr/json-rpc"; import { Session } from "~libraries/auth/mod.ts"; import { logger } from "~libraries/logger/mod.ts"; +import { asyncLocalStorage } from "~libraries/server/storage.ts"; import { sockets } from "./sockets.ts"; @@ -23,35 +24,35 @@ export function upgrade(request: Request, session?: Session) { return; } - const body = toJsonRpc(event.data); + const message = toJsonRpc(event.data); - logger.prefix("Socket").info(body); + logger.prefix("Socket").info(message); asyncLocalStorage.run( { session, info: { - method: body.method!, + method: message.method!, start: Date.now(), }, - socket, + sockets, response: { headers: new Headers(), }, }, async () => { - api - .handleCommand(body) - .then((response) => { - if (response !== undefined) { - logger.info({ response }); - socket.send(JSON.stringify(response)); - } - }) - .catch((error) => { - logger.info({ error }); - socket.send(JSON.stringify(error)); - }); + // api + // .send(body) + // .then((response) => { + // if (response !== undefined) { + // logger.info({ response }); + // socket.send(JSON.stringify(response)); + // } + // }) + // .catch((error) => { + // logger.info({ error }); + // socket.send(JSON.stringify(error)); + // }); }, ); }); diff --git a/api/package.json b/api/package.json index 06b5a34..12ca5fe 100644 --- a/api/package.json +++ b/api/package.json @@ -16,6 +16,7 @@ "@valkyr/auth": "npm:@jsr/valkyr__auth@2", "@valkyr/event-store": "npm:@jsr/valkyr__event-store@2.0.0-beta.6", "@valkyr/inverse": "npm:@jsr/valkyr__inverse@1", + "@valkyr/json-rpc": "npm:@jsr/valkyr__json-rpc@1", "cookie": "1", "mongodb": "6", "zod": "4" diff --git a/api/procedures/event.ts b/api/procedures/event.ts new file mode 100644 index 0000000..a252e41 --- /dev/null +++ b/api/procedures/event.ts @@ -0,0 +1,22 @@ +import { procedure } from "@spec/relay/mod.ts"; +import z from "zod"; + +const EventSchema = z.object({ + id: z.uuid(), + stream: z.uuid(), + type: z.string(), + data: z.any(), + meta: z.any(), + recorded: z.string(), + created: z.string(), +}); + +export default procedure + .method("event") + .access("public") + .params(EventSchema) + .response(z.uuid()) + .handle(async (event) => { + console.log(event); + return crypto.randomUUID(); + }); diff --git a/deno.lock b/deno.lock index 03e9af1..682ef00 100644 --- a/deno.lock +++ b/deno.lock @@ -14,6 +14,7 @@ "npm:@jsr/valkyr__event-emitter@1": "1.0.1", "npm:@jsr/valkyr__event-store@2.0.0-beta.6": "2.0.0-beta.6", "npm:@jsr/valkyr__inverse@1": "1.0.1", + "npm:@jsr/valkyr__json-rpc@1": "1.1.0", "npm:@tailwindcss/vite@4": "4.1.12_vite@7.1.2__picomatch@4.0.3_@types+node@22.15.15", "npm:@tanstack/react-query@5": "5.85.3_react@19.1.1", "npm:@tanstack/react-router-devtools@1": "1.131.18_@tanstack+react-router@1.131.18__react@19.1.1__react-dom@19.1.1___react@19.1.1_react@19.1.1_react-dom@19.1.1__react@19.1.1", @@ -567,6 +568,13 @@ "integrity": "sha512-uZpzPct9FGobgl6H+iR3VJlzZbTFVmJSrB4z5In8zHgIJCkmgYj0diU3soU6MuiKR7SFBfD4PGSuUpTTJHNMlg==", "tarball": "https://npm.jsr.io/~/11/@jsr/valkyr__inverse/1.0.1.tgz" }, + "@jsr/valkyr__json-rpc@1.1.0": { + "integrity": "sha512-i1dwWLI29i5mqRvS2NbI3jUyw8uZuO71hJRvT5+sGAexG8RmQJP4N+ETJkxq0RNwNAGGG1bocuzdqnawa2ahIA==", + "dependencies": [ + "@jsr/valkyr__event-emitter" + ], + "tarball": "https://npm.jsr.io/~/11/@jsr/valkyr__json-rpc/1.1.0.tgz" + }, "@jsr/valkyr__testcontainers@2.0.2": { "integrity": "sha512-YnmfraYFr3msoUGrIFeElm03nbQqXOaPu0QUT6JI3w6/mIYpVfzPxghkB7gn2RIc81QgrqjwKJE/AL3dltlR1w==", "dependencies": [ @@ -2136,6 +2144,7 @@ "npm:@jsr/valkyr__auth@2", "npm:@jsr/valkyr__event-store@2.0.0-beta.6", "npm:@jsr/valkyr__inverse@1", + "npm:@jsr/valkyr__json-rpc@1", "npm:cookie@1", "npm:mongodb@6", "npm:zod@4" diff --git a/spec/relay/libraries/procedure.ts b/spec/relay/libraries/procedure.ts new file mode 100644 index 0000000..1334758 --- /dev/null +++ b/spec/relay/libraries/procedure.ts @@ -0,0 +1,239 @@ +import z, { ZodType } from "zod"; + +import { ServerError, ServerErrorClass } from "./errors.ts"; +import { Access, ServerContext } from "./types.ts"; + +export class Procedure { + readonly type = "procedure" as const; + + declare readonly $params: TState["params"] extends ZodType ? z.input : never; + declare readonly $response: TState["response"] extends ZodType ? z.output : never; + + /** + * Instantiate a new Procedure instance. + * + * @param state - Procedure state. + */ + constructor(readonly state: TState) {} + + /** + * Procedure method value. + */ + get method(): State["method"] { + return this.state.method; + } + + /** + * Access level of the procedure which acts as the first barrier of entry + * to ensure that requests are valid. + * + * By default on the server the lack of access definition will result + * in an error as all procedures needs an access definition. + * + * @param access - Access level of the procedure. + * + * @examples + * + * ```ts + * procedure + * .method("users:create") + * .access("public") + * .handle(async () => { + * // ... + * }); + * + * procedure + * .method("users:get-by-id") + * .access("session") + * .params(z.string()) + * .handle(async (userId, context) => { + * if (userId !== context.session.userId) { + * return new ForbiddenError("Cannot read other users details."); + * } + * }); + * + * procedure + * .method("users:update") + * .access([resource("users", "update")]) + * .params(z.array(z.string(), z.object({ name: z.string() }))) + * .handle(async ([userId, payload], context) => { + * if (userId !== context.session.userId) { + * return new ForbiddenError("Cannot update other users details."); + * } + * console.log(userId, payload); // => string, { name: string } + * }); + * ``` + */ + access(access: TAccess): Procedure & { access: TAccess }> { + return new Procedure({ ...this.state, access: access as TAccess }); + } + + /** + * Defines the payload forwarded to the handler. + * + * @param params - Method payload. + * + * @examples + * + * ```ts + * procedure + * .method("users:create") + * .access([resource("users", "create")]) + * .params(z.object({ + * name: z.string(), + * email: z.email(), + * })) + * .handle(async ({ name, email }, context) => { + * return { name, email, createdBy: context.session.userId }; + * }); + * ``` + */ + params(params: TParams): Procedure & { params: TParams }> { + return new Procedure({ ...this.state, params }); + } + + /** + * Instances of the possible error responses this procedure produces. + * + * @param errors - Error shapes of the procedure. + * + * @examples + * + * ```ts + * procedure + * .method("users:list") + * .errors([ + * BadRequestError + * ]) + * .handle(async () => { + * return new BadRequestError(); + * }); + * ``` + */ + errors(errors: TErrors): Procedure & { errors: TErrors }> { + return new Procedure({ ...this.state, errors }); + } + + /** + * Shape of the success response this procedure produces. This is used by the transform + * tools to ensure the client receives parsed data. + * + * @param response - Response shape of the procedure. + * + * @examples + * + * ```ts + * procedure + * .post("users:list") + * .response( + * z.array( + * z.object({ + * name: z.string() + * }), + * ) + * ) + * .handle(async () => { + * return [{ name: "John Doe" }]; + * }); + * ``` + */ + response( + response: TResponse, + ): Procedure & { response: TResponse }> { + return new Procedure({ ...this.state, response }); + } + + /** + * Server handler callback method. + * + * Handler receives the params, query, body, actions in order of definition. + * So if your route has params, and body the route handle method will + * receive (params, body) as arguments. + * + * @param handle - Handle function to trigger when the route is executed. + * + * @examples + * + * ```ts + * procedure + * .method("users:list") + * .response( + * z.array( + * z.object({ + * name: z.string() + * }), + * ) + * ) + * .handle(async () => { + * return [{ name: "John Doe" }]; + * }); + * ``` + */ + handle, TState["response"]>>( + handle: THandleFn, + ): Procedure & { handle: THandleFn }> { + return new Procedure({ ...this.state, handle }); + } +} + +/* + |-------------------------------------------------------------------------------- + | Factories + |-------------------------------------------------------------------------------- + */ + +/** + * Route factories allowing for easy generation of relay compliant routes. + */ +export const procedure: { + /** + * Create a new procedure with given method name. + * + * @param method Name of the procedure used to match requests against. + * + * @examples + * + * ```ts + * procedure + * .method("users:get-by-id") + * .params( + * z.string().describe("Users unique identifier") + * ); + * ``` + */ + method(method: TMethod): Procedure<{ method: TMethod }>; +} = { + method(method: TMethod) { + return new Procedure({ method }); + }, +}; + +/* + |-------------------------------------------------------------------------------- + | Types + |-------------------------------------------------------------------------------- + */ + +export type Procedures = { + [key: string]: Procedures | Procedure; +}; + +type State = { + method: string; + access?: Access; + params?: ZodType; + errors?: ServerErrorClass[]; + response?: ZodType; + handle?: HandleFn; +}; + +type HandleFn = any[], TResponse = any> = ( + ...args: TArgs +) => TResponse extends ZodType + ? Promise | Response | ServerError> + : Promise; + +type ServerArgs = + HasInputArgs extends true ? [z.output, ServerContext] : [ServerContext]; + +type HasInputArgs = TState["params"] extends ZodType ? true : false; diff --git a/spec/relay/libraries/route.ts b/spec/relay/libraries/route.ts index 75c9e74..2942648 100644 --- a/spec/relay/libraries/route.ts +++ b/spec/relay/libraries/route.ts @@ -3,6 +3,7 @@ import z, { ZodObject, ZodRawShape, ZodType } from "zod"; import { ServerError, ServerErrorClass } from "./errors.ts"; import { Hooks } from "./hooks.ts"; +import { ServerContext } from "./types.ts"; export class Route { readonly type = "route" as const; @@ -447,9 +448,6 @@ export type RouteAccess = "public" | "session" | (() => boolean)[]; export type AccessFn = (resource: string, action: string) => () => boolean; -// eslint-disable-next-line @typescript-eslint/no-empty-object-type -export interface ServerContext {} - type HandleFn = any[], TResponse = any> = ( ...args: TArgs ) => TResponse extends ZodType diff --git a/spec/relay/libraries/types.ts b/spec/relay/libraries/types.ts new file mode 100644 index 0000000..3b879af --- /dev/null +++ b/spec/relay/libraries/types.ts @@ -0,0 +1,6 @@ +export type Access = "public" | "session" | (() => boolean)[]; + +export type AccessFn = (resource: string, action: string) => () => boolean; + +// eslint-disable-next-line @typescript-eslint/no-empty-object-type +export interface ServerContext {} diff --git a/spec/relay/mod.ts b/spec/relay/mod.ts index ad6275c..b2c68e6 100644 --- a/spec/relay/mod.ts +++ b/spec/relay/mod.ts @@ -2,4 +2,5 @@ export * from "./libraries/adapter.ts"; export * from "./libraries/client.ts"; export * from "./libraries/errors.ts"; export * from "./libraries/hooks.ts"; +export * from "./libraries/procedure.ts"; export * from "./libraries/route.ts";