import type { IDBPDatabase } from "idb"; import { Query } from "mingo"; import type { AnyVal } from "mingo/types"; import { DBLogger, InsertLog, QueryLog, RemoveLog, ReplaceLog, UpdateLog } from "../../logger.ts"; import { Storage } from "../../storage/storage.ts"; import { Document, Filter, UpdateFilter, WithId } from "../../types.ts"; import { IndexedDbCache } from "./cache.ts"; const OBJECT_PROTOTYPE = Object.getPrototypeOf({}) as AnyVal; const OBJECT_TAG = "[object Object]"; export class IndexedDbStorage extends Storage { readonly #cache = new IndexedDbCache(); readonly #promise: Promise; #db?: IDBPDatabase; constructor( name: string, promise: Promise, readonly log: DBLogger, ) { super(name); this.#promise = promise; } async resolve() { if (this.#db === undefined) { this.#db = await this.#promise; } return this; } async has(id: string): Promise { const document = await this.db.getFromIndex(this.name, "id", id); if (document !== undefined) { return true; } return false; } get db() { if (this.#db === undefined) { throw new Error("Database not initialized"); } return this.#db; } /* |-------------------------------------------------------------------------------- | Insert |-------------------------------------------------------------------------------- */ async insertOne(data: Partial>): Promise { const logger = new InsertLog(this.name); const document = { ...data, id: data.id ?? crypto.randomUUID() } as any; if (await this.has(document.id)) { throw new DuplicateDocumentError(document, this as any); } await this.db.transaction(this.name, "readwrite", { durability: "relaxed" }).store.add(document); this.broadcast("insertOne", document); this.#cache.flush(); this.log(logger.result()); return getInsertOneResult(document); } async insertMany(data: Partial>[]): Promise { const logger = new InsertLog(this.name); const documents: WithId[] = []; const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" }); await Promise.all( data.map((data) => { const document = { ...data, id: data.id ?? nanoid() } as WithId; documents.push(document); return tx.store.add(document); }), ); await tx.done; this.broadcast("insertMany", documents); this.#cache.flush(); this.log(logger.result()); return getInsertManyResult(documents); } /* |-------------------------------------------------------------------------------- | Read |-------------------------------------------------------------------------------- */ async findById(id: string): Promise | undefined> { return this.db.getFromIndex(this.name, "id", id); } async find(filter: Filter>, options: Options = {}): Promise[]> { const logger = new QueryLog(this.name, { filter, options }); const hashCode = this.#cache.hash(filter, options); const cached = this.#cache.get(hashCode); if (cached !== undefined) { this.log(logger.result({ cached: true })); return cached; } const indexes = this.#resolveIndexes(filter); let cursor = new Query(filter).find(await this.#getAll({ ...options, ...indexes })); if (options !== undefined) { cursor = addOptions(cursor, options); } const documents = cursor.all() as WithId[]; this.#cache.set(this.#cache.hash(filter, options), documents); this.log(logger.result()); return documents; } /** * TODO: Prototype! Needs to cover more mongodb query cases and investigation around * nested indexing in indexeddb. */ #resolveIndexes(filter: any): { index?: { [key: string]: any } } { const indexNames = this.db.transaction(this.name, "readonly").store.indexNames; const index: { [key: string]: any } = {}; for (const key in filter) { if (indexNames.contains(key) === true) { let val: any; if (isObject(filter[key]) === true) { if (filter[key]["$in"] !== undefined) { val = filter[key]["$in"]; } } else { val = filter[key]; } if (val !== undefined) { index[key] = val; } } } if (Object.keys(index).length > 0) { return { index }; } return {}; } async #getAll({ index, offset, range, limit }: Options) { if (index !== undefined) { return this.#getAllByIndex(index); } if (range !== undefined) { return this.db.getAll(this.name, IDBKeyRange.bound(range.from, range.to)); } if (offset !== undefined) { return this.#getAllByOffset(offset.value, offset.direction, limit); } return this.db.getAll(this.name, undefined, limit); } async #getAllByIndex(index: Index) { let result = new Set(); for (const key in index) { const value = index[key]; if (Array.isArray(value)) { for (const idx of value) { const values = await this.db.getAllFromIndex(this.name, key, idx); result = new Set([...result, ...values]); } } else { const values = await this.db.getAllFromIndex(this.name, key, value); result = new Set([...result, ...values]); } } return Array.from(result); } async #getAllByOffset(value: string, direction: 1 | -1, limit?: number) { if (direction === 1) { return this.db.getAll(this.name, IDBKeyRange.lowerBound(value), limit); } return this.#getAllByDescOffset(value, limit); } async #getAllByDescOffset(value: string, limit?: number) { if (limit === undefined) { return this.db.getAll(this.name, IDBKeyRange.upperBound(value)); } const result = []; let cursor = await this.db .transaction(this.name, "readonly") .store.openCursor(IDBKeyRange.upperBound(value), "prev"); for (let i = 0; i < limit; i++) { if (cursor === null) { break; } result.push(cursor.value); cursor = await cursor.continue(); } return result.reverse(); } /* |-------------------------------------------------------------------------------- | Update |-------------------------------------------------------------------------------- */ async updateOne(filter: Filter>, operators: UpdateFilter): Promise { if (typeof filter.id === "string") { return this.#update(filter.id, filter, operators); } const documents = await this.find(filter); if (documents.length > 0) { return this.#update(documents[0].id, filter, operators); } return new UpdateResult(0, 0); } async updateMany(filter: Filter>, operators: UpdateFilter): Promise { const logger = new UpdateLog(this.name, { filter, operators }); const ids = await this.find(filter).then((data) => data.map((d) => d.id)); const documents: WithId[] = []; let modifiedCount = 0; const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" }); await Promise.all( ids.map((id) => tx.store.get(id).then((current) => { if (current === undefined) { return; } const { modified, document } = update(filter, operators, current); if (modified) { modifiedCount += 1; documents.push(document); return tx.store.put(document); } }), ), ); await tx.done; this.broadcast("updateMany", documents); this.#cache.flush(); this.log(logger.result()); return new UpdateResult(ids.length, modifiedCount); } async replace(filter: Filter>, document: TSchema): Promise { const logger = new ReplaceLog(this.name, document); const ids = await this.find(filter).then((data) => data.map((d) => d.id)); const documents: WithId[] = []; const count = ids.length; const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" }); await Promise.all( ids.map((id) => { const next = { ...document, id }; documents.push(next); return tx.store.put(next); }), ); await tx.done; this.broadcast("updateMany", documents); this.#cache.flush(); this.log(logger.result({ count })); return new UpdateResult(count, count); } async #update( id: string | number, filter: Filter>, operators: UpdateFilter, ): Promise { const logger = new UpdateLog(this.name, { filter, operators }); const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" }); const current = await tx.store.get(id); if (current === undefined) { await tx.done; return new UpdateResult(0, 0); } const { modified, document } = await update(filter, operators, current); if (modified === true) { await tx.store.put(document); } await tx.done; if (modified === true) { this.broadcast("updateOne", document); this.log(logger.result()); this.#cache.flush(); return new UpdateResult(1, 1); } return new UpdateResult(1); } /* |-------------------------------------------------------------------------------- | Remove |-------------------------------------------------------------------------------- */ async remove(filter: Filter>): Promise { const logger = new RemoveLog(this.name, { filter }); const documents = await this.find(filter); const tx = this.db.transaction(this.name, "readwrite"); await Promise.all(documents.map((data) => tx.store.delete(data.id))); await tx.done; this.broadcast("remove", documents); this.#cache.flush(); this.log(logger.result({ count: documents.length })); return new RemoveResult(documents.length); } /* |-------------------------------------------------------------------------------- | Count |-------------------------------------------------------------------------------- */ async count(filter?: Filter>): Promise { if (filter !== undefined) { return (await this.find(filter)).length; } return this.db.count(this.name); } /* |-------------------------------------------------------------------------------- | Flush |-------------------------------------------------------------------------------- */ async flush(): Promise { await this.db.clear(this.name); this.#cache.flush(); } } /* |-------------------------------------------------------------------------------- | Utils |-------------------------------------------------------------------------------- */ export function isObject(v: AnyVal): v is object { if (!v) { return false; } const proto = Object.getPrototypeOf(v) as AnyVal; return (proto === OBJECT_PROTOTYPE || proto === null) && OBJECT_TAG === Object.prototype.toString.call(v); }