From 05938a413bc0655c8b7ceed7be8de9b8ed6e1948 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoffer=20R=C3=B8dvik?= Date: Fri, 15 Aug 2025 01:45:33 +0200 Subject: [PATCH] feat: re-apply persistence --- deno.json | 2 +- src/databases/indexeddb/_storage.ts | 382 ---------------------------- src/databases/indexeddb/pending.ts | 61 +++++ src/databases/indexeddb/storage.ts | 31 +-- 4 files changed, 75 insertions(+), 401 deletions(-) delete mode 100644 src/databases/indexeddb/_storage.ts create mode 100644 src/databases/indexeddb/pending.ts diff --git a/deno.json b/deno.json index a4e275a..14d0573 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@valkyr/db", - "version": "2.0.0-beta.1", + "version": "2.0.0-beta.2", "exports": { ".": "./src/mod.ts" }, diff --git a/src/databases/indexeddb/_storage.ts b/src/databases/indexeddb/_storage.ts deleted file mode 100644 index 1de7bbe..0000000 --- a/src/databases/indexeddb/_storage.ts +++ /dev/null @@ -1,382 +0,0 @@ -import type { IDBPDatabase } from "idb"; -import { Query } from "mingo"; -import type { AnyVal } from "mingo/types"; - -import { DBLogger, InsertLog, QueryLog, RemoveLog, ReplaceLog, UpdateLog } from "../../logger.ts"; -import { Storage } from "../../storage/storage.ts"; -import { Document, Filter, UpdateFilter, WithId } from "../../types.ts"; -import { IndexedDbCache } from "./cache.ts"; - -const OBJECT_PROTOTYPE = Object.getPrototypeOf({}) as AnyVal; -const OBJECT_TAG = "[object Object]"; - -export class IndexedDbStorage extends Storage { - readonly #cache = new IndexedDbCache(); - readonly #promise: Promise; - - #db?: IDBPDatabase; - - constructor( - name: string, - promise: Promise, - readonly log: DBLogger, - ) { - super(name); - this.#promise = promise; - } - - async resolve() { - if (this.#db === undefined) { - this.#db = await this.#promise; - } - return this; - } - - async has(id: string): Promise { - const document = await this.db.getFromIndex(this.name, "id", id); - if (document !== undefined) { - return true; - } - return false; - } - - get db() { - if (this.#db === undefined) { - throw new Error("Database not initialized"); - } - return this.#db; - } - - /* - |-------------------------------------------------------------------------------- - | Insert - |-------------------------------------------------------------------------------- - */ - - async insertOne(data: Partial>): Promise { - const logger = new InsertLog(this.name); - - const document = { ...data, id: data.id ?? crypto.randomUUID() } as any; - if (await this.has(document.id)) { - throw new DuplicateDocumentError(document, this as any); - } - await this.db.transaction(this.name, "readwrite", { durability: "relaxed" }).store.add(document); - - this.broadcast("insertOne", document); - this.#cache.flush(); - - this.log(logger.result()); - - return getInsertOneResult(document); - } - - async insertMany(data: Partial>[]): Promise { - const logger = new InsertLog(this.name); - - const documents: WithId[] = []; - - const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" }); - await Promise.all( - data.map((data) => { - const document = { ...data, id: data.id ?? nanoid() } as WithId; - documents.push(document); - return tx.store.add(document); - }), - ); - await tx.done; - - this.broadcast("insertMany", documents); - this.#cache.flush(); - - this.log(logger.result()); - - return getInsertManyResult(documents); - } - - /* - |-------------------------------------------------------------------------------- - | Read - |-------------------------------------------------------------------------------- - */ - - async findById(id: string): Promise | undefined> { - return this.db.getFromIndex(this.name, "id", id); - } - - async find(filter: Filter>, options: Options = {}): Promise[]> { - const logger = new QueryLog(this.name, { filter, options }); - - const hashCode = this.#cache.hash(filter, options); - const cached = this.#cache.get(hashCode); - if (cached !== undefined) { - this.log(logger.result({ cached: true })); - return cached; - } - - const indexes = this.#resolveIndexes(filter); - let cursor = new Query(filter).find(await this.#getAll({ ...options, ...indexes })); - if (options !== undefined) { - cursor = addOptions(cursor, options); - } - - const documents = cursor.all() as WithId[]; - this.#cache.set(this.#cache.hash(filter, options), documents); - - this.log(logger.result()); - - return documents; - } - - /** - * TODO: Prototype! Needs to cover more mongodb query cases and investigation around - * nested indexing in indexeddb. - */ - #resolveIndexes(filter: any): { index?: { [key: string]: any } } { - const indexNames = this.db.transaction(this.name, "readonly").store.indexNames; - const index: { [key: string]: any } = {}; - for (const key in filter) { - if (indexNames.contains(key) === true) { - let val: any; - if (isObject(filter[key]) === true) { - if (filter[key]["$in"] !== undefined) { - val = filter[key]["$in"]; - } - } else { - val = filter[key]; - } - if (val !== undefined) { - index[key] = val; - } - } - } - if (Object.keys(index).length > 0) { - return { index }; - } - return {}; - } - - async #getAll({ index, offset, range, limit }: Options) { - if (index !== undefined) { - return this.#getAllByIndex(index); - } - if (range !== undefined) { - return this.db.getAll(this.name, IDBKeyRange.bound(range.from, range.to)); - } - if (offset !== undefined) { - return this.#getAllByOffset(offset.value, offset.direction, limit); - } - return this.db.getAll(this.name, undefined, limit); - } - - async #getAllByIndex(index: Index) { - let result = new Set(); - for (const key in index) { - const value = index[key]; - if (Array.isArray(value)) { - for (const idx of value) { - const values = await this.db.getAllFromIndex(this.name, key, idx); - result = new Set([...result, ...values]); - } - } else { - const values = await this.db.getAllFromIndex(this.name, key, value); - result = new Set([...result, ...values]); - } - } - return Array.from(result); - } - - async #getAllByOffset(value: string, direction: 1 | -1, limit?: number) { - if (direction === 1) { - return this.db.getAll(this.name, IDBKeyRange.lowerBound(value), limit); - } - return this.#getAllByDescOffset(value, limit); - } - - async #getAllByDescOffset(value: string, limit?: number) { - if (limit === undefined) { - return this.db.getAll(this.name, IDBKeyRange.upperBound(value)); - } - const result = []; - let cursor = await this.db - .transaction(this.name, "readonly") - .store.openCursor(IDBKeyRange.upperBound(value), "prev"); - for (let i = 0; i < limit; i++) { - if (cursor === null) { - break; - } - result.push(cursor.value); - cursor = await cursor.continue(); - } - return result.reverse(); - } - - /* - |-------------------------------------------------------------------------------- - | Update - |-------------------------------------------------------------------------------- - */ - - async updateOne(filter: Filter>, operators: UpdateFilter): Promise { - if (typeof filter.id === "string") { - return this.#update(filter.id, filter, operators); - } - const documents = await this.find(filter); - if (documents.length > 0) { - return this.#update(documents[0].id, filter, operators); - } - return new UpdateResult(0, 0); - } - - async updateMany(filter: Filter>, operators: UpdateFilter): Promise { - const logger = new UpdateLog(this.name, { filter, operators }); - - const ids = await this.find(filter).then((data) => data.map((d) => d.id)); - - const documents: WithId[] = []; - let modifiedCount = 0; - - const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" }); - await Promise.all( - ids.map((id) => - tx.store.get(id).then((current) => { - if (current === undefined) { - return; - } - const { modified, document } = update(filter, operators, current); - if (modified) { - modifiedCount += 1; - documents.push(document); - return tx.store.put(document); - } - }), - ), - ); - - await tx.done; - - this.broadcast("updateMany", documents); - this.#cache.flush(); - - this.log(logger.result()); - - return new UpdateResult(ids.length, modifiedCount); - } - - async replace(filter: Filter>, document: TSchema): Promise { - const logger = new ReplaceLog(this.name, document); - - const ids = await this.find(filter).then((data) => data.map((d) => d.id)); - - const documents: WithId[] = []; - const count = ids.length; - - const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" }); - await Promise.all( - ids.map((id) => { - const next = { ...document, id }; - documents.push(next); - return tx.store.put(next); - }), - ); - await tx.done; - - this.broadcast("updateMany", documents); - this.#cache.flush(); - - this.log(logger.result({ count })); - - return new UpdateResult(count, count); - } - - async #update( - id: string | number, - filter: Filter>, - operators: UpdateFilter, - ): Promise { - const logger = new UpdateLog(this.name, { filter, operators }); - - const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" }); - - const current = await tx.store.get(id); - if (current === undefined) { - await tx.done; - return new UpdateResult(0, 0); - } - - const { modified, document } = await update(filter, operators, current); - if (modified === true) { - await tx.store.put(document); - } - await tx.done; - - if (modified === true) { - this.broadcast("updateOne", document); - this.log(logger.result()); - this.#cache.flush(); - return new UpdateResult(1, 1); - } - - return new UpdateResult(1); - } - - /* - |-------------------------------------------------------------------------------- - | Remove - |-------------------------------------------------------------------------------- - */ - - async remove(filter: Filter>): Promise { - const logger = new RemoveLog(this.name, { filter }); - - const documents = await this.find(filter); - const tx = this.db.transaction(this.name, "readwrite"); - - await Promise.all(documents.map((data) => tx.store.delete(data.id))); - await tx.done; - - this.broadcast("remove", documents); - this.#cache.flush(); - - this.log(logger.result({ count: documents.length })); - - return new RemoveResult(documents.length); - } - - /* - |-------------------------------------------------------------------------------- - | Count - |-------------------------------------------------------------------------------- - */ - - async count(filter?: Filter>): Promise { - if (filter !== undefined) { - return (await this.find(filter)).length; - } - return this.db.count(this.name); - } - - /* - |-------------------------------------------------------------------------------- - | Flush - |-------------------------------------------------------------------------------- - */ - - async flush(): Promise { - await this.db.clear(this.name); - this.#cache.flush(); - } -} - -/* - |-------------------------------------------------------------------------------- - | Utils - |-------------------------------------------------------------------------------- - */ - -export function isObject(v: AnyVal): v is object { - if (!v) { - return false; - } - const proto = Object.getPrototypeOf(v) as AnyVal; - return (proto === OBJECT_PROTOTYPE || proto === null) && OBJECT_TAG === Object.prototype.toString.call(v); -} diff --git a/src/databases/indexeddb/pending.ts b/src/databases/indexeddb/pending.ts new file mode 100644 index 0000000..25ec13b --- /dev/null +++ b/src/databases/indexeddb/pending.ts @@ -0,0 +1,61 @@ +import { Document } from "../../types.ts"; +import { IndexedDbStorage } from "./storage.ts"; + +export class Pending { + readonly #upsert: any[] = []; + readonly #remove: string[] = []; + + readonly #chunkSize = 500; + + #saving: Promise | null = null; + + #storage: IndexedDbStorage; + + constructor(storage: IndexedDbStorage) { + this.#storage = storage; + } + + get isSaving() { + return this.#saving !== null; + } + + upsert(document: any): void { + this.#upsert.push(document); + this.save(); + } + + remove(id: any): void { + this.#remove.push(id); + this.save(); + } + + async save() { + if (this.#saving) { + return; + } + + this.#saving = (async () => { + try { + while (this.#upsert.length > 0 || this.#remove.length > 0) { + const tx = this.#storage.db.transaction(this.#storage.name, "readwrite", { durability: "relaxed" }); + + if (this.#remove.length > 0) { + const removals = this.#remove.splice(0, this.#chunkSize); + await Promise.all(removals.map((id) => tx.store.delete(id))); + } + + if (this.#upsert.length > 0) { + const upserts = this.#upsert.splice(0, this.#chunkSize); + await Promise.all(upserts.map((doc) => tx.store.put(doc))); + } + + await tx.done; + } + } finally { + this.#saving = null; + } + })(); + + await this.#saving; + } +} diff --git a/src/databases/indexeddb/storage.ts b/src/databases/indexeddb/storage.ts index 0d8da16..f07f156 100644 --- a/src/databases/indexeddb/storage.ts +++ b/src/databases/indexeddb/storage.ts @@ -16,12 +16,16 @@ import { UpdateResult } from "../../storage/operators/update.ts"; import { addOptions, Options, Storage } from "../../storage/storage.ts"; import type { Document, Filter, WithId } from "../../types.ts"; import { IndexedDbCache } from "./cache.ts"; +import { Pending } from "./pending.ts"; const update = createUpdater({ cloneMode: "deep" }); export class IndexedDbStorage extends Storage { readonly #cache = new IndexedDbCache(); readonly #documents = new Map>(); + + readonly #pending: Pending; + readonly #promise: Promise; #db?: IDBPDatabase; @@ -33,6 +37,7 @@ export class IndexedDbStorage extends Stora ) { super(name); this.#promise = promise; + this.#pending = new Pending(this); } async resolve() { @@ -70,7 +75,9 @@ export class IndexedDbStorage extends Stora if (await this.has(document.id)) { throw new DuplicateDocumentError(document, this as any); } + this.#documents.set(document.id, document); + this.#pending.upsert(document); this.broadcast("insertOne", document); this.#cache.flush(); @@ -88,6 +95,7 @@ export class IndexedDbStorage extends Stora const document = { ...data, id: data.id ?? crypto.randomUUID() } as WithId; result.push(document); this.#documents.set(document.id, document); + this.#pending.upsert(document); } this.broadcast("insertMany", result); @@ -144,6 +152,7 @@ export class IndexedDbStorage extends Stora const modified = update(document, expr, arrayFilters, condition, options); if (modified.length > 0) { this.#documents.set(document.id, document); + this.#pending.upsert(document); this.broadcast("updateOne", document); return new UpdateResult(1, 1); } @@ -176,6 +185,7 @@ export class IndexedDbStorage extends Stora modifiedCount += 1; documents.push(document); this.#documents.set(document.id, document); + this.#pending.upsert(document); } } } @@ -204,6 +214,7 @@ export class IndexedDbStorage extends Stora modifiedCount += 1; documents.push(document); this.#documents.set(document.id, document); + this.#pending.upsert(document); } } @@ -229,6 +240,7 @@ export class IndexedDbStorage extends Stora for (const document of documents) { if (query.test(document) === true) { this.#documents.delete(document.id); + this.#pending.remove(document.id); this.broadcast("remove", document); count += 1; } @@ -266,23 +278,6 @@ export class IndexedDbStorage extends Stora */ async save(): Promise { - // this.db. + this.#pending.save(); } } - -/* -const logger = new InsertLog(this.name); - - const document = { ...data, id: data.id ?? crypto.randomUUID() } as any; - if (await this.has(document.id)) { - throw new DuplicateDocumentError(document, this as any); - } - await this.db.transaction(this.name, "readwrite", { durability: "relaxed" }).store.add(document); - - this.broadcast("insertOne", document); - this.#cache.flush(); - - this.log(logger.result()); - - return getInsertOneResult(document); -*/