import { Cursor } from "mingo/cursor"; import { nanoid } from "nanoid"; import { Subject } from "rxjs"; import { BroadcastChannel, StorageBroadcast } from "../Broadcast.js"; import { Document, Filter, UpdateFilter, WithId } from "../Types.js"; import { InsertManyResult, InsertOneResult } from "./Operators/Insert/mod.js"; import { RemoveResult } from "./Operators/Remove/mod.js"; import { UpdateResult } from "./Operators/Update/mod.js"; export abstract class Storage { readonly observable = { change: new Subject>(), flush: new Subject() }; status: Status = "loading"; readonly #channel: BroadcastChannel; constructor(readonly name: string, readonly id = nanoid()) { this.#channel = new BroadcastChannel(`valkyr:db:${name}`); this.#channel.onmessage = ({ data }: MessageEvent>) => { if (data.name !== this.name) { return; } switch (data.type) { case "flush": { this.observable.flush.next(); break; } default: { this.observable.change.next(data); break; } } }; } /* |-------------------------------------------------------------------------------- | Resolver |-------------------------------------------------------------------------------- */ abstract resolve(): Promise; /* |-------------------------------------------------------------------------------- | Status |-------------------------------------------------------------------------------- */ is(status: Status): boolean { return this.status === status; } /* |-------------------------------------------------------------------------------- | Broadcaster |-------------------------------------------------------------------------------- | | Broadcast local changes with any change listeners in the current and other | browser tabs and window. | */ broadcast(type: StorageBroadcast["type"], data?: TSchema | TSchema[]): void { switch (type) { case "flush": { this.observable.flush.next(); break; } default: { this.observable.change.next({ type, data: data as any }); break; } } this.#channel.postMessage({ name: this.name, type, data }); } /* |-------------------------------------------------------------------------------- | Operations |-------------------------------------------------------------------------------- */ abstract has(id: string): Promise; abstract insertOne(document: Partial>): Promise; abstract insertMany(documents: Partial>[]): Promise; abstract findById(id: string): Promise | undefined>; abstract find(filter?: Filter>, options?: Options): Promise[]>; abstract updateOne(filter: Filter>, operators: UpdateFilter): Promise; abstract updateMany(filter: Filter>, operators: UpdateFilter): Promise; abstract replace(filter: Filter>, document: TSchema): Promise; abstract remove(filter: Filter>): Promise; abstract count(filter?: Filter>): Promise; abstract flush(): Promise; /* |-------------------------------------------------------------------------------- | Destructor |-------------------------------------------------------------------------------- */ destroy() { this.#channel.close(); } } /* |-------------------------------------------------------------------------------- | Utilities |-------------------------------------------------------------------------------- */ export function addOptions(cursor: Cursor, options: Options): Cursor { if (options.sort) { cursor.sort(options.sort); } if (options.skip !== undefined) { cursor.skip(options.skip); } if (options.limit !== undefined) { cursor.limit(options.limit); } return cursor; } /* |-------------------------------------------------------------------------------- | Types |-------------------------------------------------------------------------------- */ type Status = "loading" | "ready"; export type ChangeEvent = | { type: "insertOne" | "updateOne"; data: WithId; } | { type: "insertMany" | "updateMany" | "remove"; data: WithId[]; }; export type Options = { sort?: { [key: string]: 1 | -1; }; skip?: number; range?: { from: string; to: string; }; offset?: { value: string; direction: 1 | -1; }; limit?: number; index?: Index; }; export type Index = { [index: string]: any; };