diff --git a/README.md b/README.md index fa3bb76..9ca3ed3 100644 --- a/README.md +++ b/README.md @@ -9,5 +9,3 @@ Attempts to provide a practical data storage solution that utilizes MongoDB synt The database was developed to overcome limitations in storing substantial amounts of data on the client side, which is not feasible with traditional storage solutions like localStorage. Instead, Valkyr Database relies on configurable database adapters such as in memory and indexeddb for browsers and async storage for hybrid mobile solutions, offering a larger storage capacity. Additionally, the solution is tailored to provide native observability and effective management functionality, removing the reliance on client side state management utilities. - -Go to the [docs](https://valkyrjs.com/db-introduction) for more information. diff --git a/deno.json b/deno.json index 14d0573..3cf06b3 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@valkyr/db", - "version": "2.0.0-beta.2", + "version": "2.0.0-beta.3", "exports": { ".": "./src/mod.ts" }, @@ -15,5 +15,8 @@ "test:publish": "deno publish --dry-run", "ncu": "npx ncu -u -p npm" }, + "compilerOptions": { + "lib": ["deno.window", "dom"] + }, "nodeModulesDir": "auto" } diff --git a/src/databases/indexeddb/database.ts b/src/databases/indexeddb/database.ts index 8275246..00992ef 100644 --- a/src/databases/indexeddb/database.ts +++ b/src/databases/indexeddb/database.ts @@ -6,19 +6,8 @@ import { Document } from "../../types.ts"; import { Registrars } from "../registrars.ts"; import { IndexedDbStorage } from "./storage.ts"; -function log() {} - -type StringRecord = { [x: string]: T }; - -type Options = { - name: string; - version?: number; - registrars: Registrars[]; - log?: DBLogger; -}; - -export class IndexedDatabase> { - readonly #collections = new Map>(); +export class IndexedDatabase> { + readonly #collections = new Map>(); readonly #db: Promise>; constructor(readonly options: Options) { @@ -44,7 +33,9 @@ export class IndexedDatabase> { |-------------------------------------------------------------------------------- */ - collection(name: Name): Collection { + collection( + name: Name, + ): Collection { const collection = this.#collections.get(name); if (collection === undefined) { throw new Error(`Collection '${name as string}' not found`); @@ -72,3 +63,14 @@ export class IndexedDatabase> { this.#db.then((db) => db.close()); } } + +function log() {} + +type StringRecord = { [x: string]: TCollections }; + +type Options = { + name: string; + version?: number; + registrars: Registrars[]; + log?: DBLogger; +}; diff --git a/src/databases/indexeddb/pending.ts b/src/databases/indexeddb/pending.ts deleted file mode 100644 index c1c298f..0000000 --- a/src/databases/indexeddb/pending.ts +++ /dev/null @@ -1,76 +0,0 @@ -import { Document } from "../../types.ts"; -import { IndexedDbStorage } from "./storage.ts"; - -export class Pending { - #storage: IndexedDbStorage; - - readonly #upsert = new Map(); - readonly #remove = new Set(); - - #chunkSize = 500; - #saveScheduled = false; - #saving: Promise | null = null; - - constructor(storage: IndexedDbStorage) { - this.#storage = storage; - } - - get isSaving() { - return this.#saving !== null; - } - - upsert(document: any): void { - this.#remove.delete(document.id); - this.#upsert.set(document.id, document); - this.#schedule(); - } - - remove(id: any): void { - this.#upsert.delete(id); - this.#remove.add(id); - this.#schedule(); - } - - #schedule() { - if (!this.#saveScheduled) { - this.#saveScheduled = true; - queueMicrotask(() => { - this.#saveScheduled = false; - void this.save(); - }); - } - } - - async save() { - if (this.#saving) return; - - this.#saving = (async () => { - try { - while (this.#upsert.size > 0 || this.#remove.size > 0) { - const tx = this.#storage.db.transaction(this.#storage.name, "readwrite", { durability: "relaxed" }); - const store = tx.store; - - // Process removals - if (this.#remove.size > 0) { - const removals = Array.from(this.#remove).slice(0, this.#chunkSize); - removals.forEach((id) => this.#remove.delete(id)); - await Promise.all(removals.map((id) => store.delete(id))); - } - - // Process upserts - if (this.#upsert.size > 0) { - const upserts = Array.from(this.#upsert.values()).slice(0, this.#chunkSize); - upserts.forEach((doc) => this.#upsert.delete(doc.id)); - await Promise.all(upserts.map((doc) => store.put(doc))); - } - - await tx.done; - } - } finally { - this.#saving = null; - } - })(); - - await this.#saving; - } -} diff --git a/src/databases/indexeddb/storage.ts b/src/databases/indexeddb/storage.ts index f07f156..f4ca81d 100644 --- a/src/databases/indexeddb/storage.ts +++ b/src/databases/indexeddb/storage.ts @@ -13,18 +13,17 @@ import { } from "../../storage/operators/insert.ts"; import { RemoveResult } from "../../storage/operators/remove.ts"; import { UpdateResult } from "../../storage/operators/update.ts"; -import { addOptions, Options, Storage } from "../../storage/storage.ts"; +import { addOptions, Index, Options, Storage } from "../../storage/storage.ts"; import type { Document, Filter, WithId } from "../../types.ts"; import { IndexedDbCache } from "./cache.ts"; -import { Pending } from "./pending.ts"; + +const OBJECT_PROTOTYPE = Object.getPrototypeOf({}); +const OBJECT_TAG = "[object Object]"; const update = createUpdater({ cloneMode: "deep" }); export class IndexedDbStorage extends Storage { readonly #cache = new IndexedDbCache(); - readonly #documents = new Map>(); - - readonly #pending: Pending; readonly #promise: Promise; @@ -37,22 +36,21 @@ export class IndexedDbStorage extends Stora ) { super(name); this.#promise = promise; - this.#pending = new Pending(this); } async resolve() { if (this.#db === undefined) { this.#db = await this.#promise; } - const documents = await this.db.getAll(this.name); - for (const document of documents) { - this.#documents.set(document.id, document); - } return this; } async has(id: string): Promise { - return this.#documents.has(id); + const document = await this.db.getFromIndex(this.name, "id", id); + if (document !== undefined) { + return true; + } + return false; } get db() { @@ -71,13 +69,11 @@ export class IndexedDbStorage extends Stora async insertOne(data: Partial): Promise { const logger = new InsertLog(this.name); - const document = { ...data, id: data.id ?? crypto.randomUUID() } as WithId; + const document = { ...data, id: data.id ?? crypto.randomUUID() } as any; if (await this.has(document.id)) { throw new DuplicateDocumentError(document, this as any); } - - this.#documents.set(document.id, document); - this.#pending.upsert(document); + await this.db.transaction(this.name, "readwrite", { durability: "relaxed" }).store.add(document); this.broadcast("insertOne", document); this.#cache.flush(); @@ -87,23 +83,27 @@ export class IndexedDbStorage extends Stora return getInsertOneResult(document); } - async insertMany(documents: Partial[]): Promise { + async insertMany(data: Partial[]): Promise { const logger = new InsertLog(this.name); - const result: TSchema[] = []; - for (const data of documents) { - const document = { ...data, id: data.id ?? crypto.randomUUID() } as WithId; - result.push(document); - this.#documents.set(document.id, document); - this.#pending.upsert(document); - } + const documents: WithId[] = []; - this.broadcast("insertMany", result); + const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" }); + await Promise.all( + data.map((data) => { + const document = { ...data, id: data.id ?? crypto.randomUUID() } as WithId; + documents.push(document); + return tx.store.add(document); + }), + ); + await tx.done; + + this.broadcast("insertMany", documents); this.#cache.flush(); this.log(logger.result()); - return getInsertManyResult(result); + return getInsertManyResult(documents); } /* @@ -113,7 +113,7 @@ export class IndexedDbStorage extends Stora */ async findById(id: string): Promise | undefined> { - return this.#documents.get(id); + return this.db.getFromIndex(this.name, "id", id); } async find(filter: Filter>, options: Options = {}): Promise[]> { @@ -126,11 +126,101 @@ export class IndexedDbStorage extends Stora return cached; } - let cursor = new Query(filter ?? {}).find(Array.from(this.#documents.values())); + const indexes = this.#resolveIndexes(filter); + let cursor = new Query(filter).find(await this.#getAll({ ...options, ...indexes })); if (options !== undefined) { cursor = addOptions(cursor, options); } - return cursor.all() as WithId[]; + + const documents = cursor.all() as WithId[]; + this.#cache.set(this.#cache.hash(filter, options), documents); + + this.log(logger.result()); + + return documents; + } + + /** + * TODO: Prototype! Needs to cover more mongodb query cases and investigation around + * nested indexing in indexeddb. + */ + #resolveIndexes(filter: any): { index?: { [key: string]: any } } { + const indexNames = this.db.transaction(this.name, "readonly").store.indexNames; + const index: { [key: string]: any } = {}; + for (const key in filter) { + if (indexNames.contains(key) === true) { + let val: any; + if (isObject(filter[key]) === true) { + if ((filter as any)[key]["$in"] !== undefined) { + val = (filter as any)[key]["$in"]; + } + } else { + val = filter[key]; + } + if (val !== undefined) { + index[key] = val; + } + } + } + if (Object.keys(index).length > 0) { + return { index }; + } + return {}; + } + + async #getAll({ index, offset, range, limit }: Options) { + if (index !== undefined) { + return this.#getAllByIndex(index); + } + if (range !== undefined) { + return this.db.getAll(this.name, IDBKeyRange.bound(range.from, range.to)); + } + if (offset !== undefined) { + return this.#getAllByOffset(offset.value, offset.direction, limit); + } + return this.db.getAll(this.name, undefined, limit); + } + + async #getAllByIndex(index: Index) { + let result = new Set(); + for (const key in index) { + const value = index[key]; + if (Array.isArray(value)) { + for (const idx of value) { + const values = await this.db.getAllFromIndex(this.name, key, idx); + result = new Set([...result, ...values]); + } + } else { + const values = await this.db.getAllFromIndex(this.name, key, value); + result = new Set([...result, ...values]); + } + } + return Array.from(result); + } + + async #getAllByOffset(value: string, direction: 1 | -1, limit?: number) { + if (direction === 1) { + return this.db.getAll(this.name, IDBKeyRange.lowerBound(value), limit); + } + return this.#getAllByDescOffset(value, limit); + } + + async #getAllByDescOffset(value: string, limit?: number) { + if (limit === undefined) { + return this.db.getAll(this.name, IDBKeyRange.upperBound(value)); + } + const result = []; + let cursor = await this.db + .transaction(this.name, "readonly") + .store.openCursor(IDBKeyRange.upperBound(value), "prev"); + for (let i = 0; i < limit; i++) { + if (cursor === null) { + break; + } + result.push(cursor.value); + cursor = await cursor.continue(); + } + return result.reverse(); } /* @@ -146,18 +236,12 @@ export class IndexedDbStorage extends Stora condition?: Filter>, options?: UpdateOptions, ): Promise { - const query = new Query(filter); - for (const document of Array.from(this.#documents.values())) { - if (query.test(document) === true) { - const modified = update(document, expr, arrayFilters, condition, options); - if (modified.length > 0) { - this.#documents.set(document.id, document); - this.#pending.upsert(document); - this.broadcast("updateOne", document); - return new UpdateResult(1, 1); - } - return new UpdateResult(1, 0); - } + if (typeof filter.id === "string") { + return this.#update(filter.id, expr, arrayFilters, condition, options); + } + const documents = await this.find(filter); + if (documents.length > 0) { + return this.#update(documents[0].id, expr, arrayFilters, condition, options); } return new UpdateResult(0, 0); } @@ -170,60 +254,96 @@ export class IndexedDbStorage extends Stora options?: UpdateOptions, ): Promise { const logger = new UpdateLog(this.name, { filter, expr, arrayFilters, condition, options }); - const query = new Query(filter); + + const ids = await this.find(filter).then((data) => data.map((d) => d.id)); const documents: WithId[] = []; - - let matchedCount = 0; let modifiedCount = 0; - for (const document of Array.from(this.#documents.values())) { - if (query.test(document) === true) { - matchedCount += 1; - const modified = update(document, expr, arrayFilters, condition, options); - if (modified.length > 0) { - modifiedCount += 1; - documents.push(document); - this.#documents.set(document.id, document); - this.#pending.upsert(document); - } - } - } + const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" }); + await Promise.all( + ids.map((id) => + tx.store.get(id).then((current) => { + if (current === undefined) { + return; + } + const modified = update(current, expr, arrayFilters, condition, options); + if (modified.length > 0) { + modifiedCount += 1; + documents.push(current); + return tx.store.put(current); + } + }), + ), + ); + + await tx.done; this.broadcast("updateMany", documents); this.#cache.flush(); this.log(logger.result()); - return new UpdateResult(matchedCount, modifiedCount); + return new UpdateResult(ids.length, modifiedCount); } async replace(filter: Filter>, document: WithId): Promise { const logger = new ReplaceLog(this.name, document); - const query = new Query(filter); + const ids = await this.find(filter).then((data) => data.map((d) => d.id)); const documents: WithId[] = []; + const count = ids.length; - let matchedCount = 0; - let modifiedCount = 0; - - for (const current of Array.from(this.#documents.values())) { - if (query.test(current) === true) { - matchedCount += 1; - modifiedCount += 1; - documents.push(document); - this.#documents.set(document.id, document); - this.#pending.upsert(document); - } - } + const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" }); + await Promise.all( + ids.map((id) => { + const next = { ...document, id }; + documents.push(next); + return tx.store.put(next); + }), + ); + await tx.done; this.broadcast("updateMany", documents); this.#cache.flush(); - this.log(logger.result({ count: matchedCount })); + this.log(logger.result({ count })); - return new UpdateResult(matchedCount, modifiedCount); + return new UpdateResult(count, count); + } + + async #update( + id: string | number, + expr: UpdateExpression, + arrayFilters?: Filter>[], + condition?: Filter>, + options?: UpdateOptions, + ): Promise { + const logger = new UpdateLog(this.name, { id, expr }); + + const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" }); + + const current = await tx.store.get(id); + if (current === undefined) { + await tx.done; + return new UpdateResult(0, 0); + } + + const modified = await update(current, expr, arrayFilters, condition, options); + if (modified.length > 0) { + await tx.store.put(current); + } + await tx.done; + + if (modified.length > 0) { + this.broadcast("updateOne", current); + this.log(logger.result()); + this.#cache.flush(); + return new UpdateResult(1, 1); + } + + return new UpdateResult(1); } /* @@ -234,20 +354,19 @@ export class IndexedDbStorage extends Stora async remove(filter: Filter>): Promise { const logger = new RemoveLog(this.name, { filter }); - const documents = Array.from(this.#documents.values()); - const query = new Query(filter); - let count = 0; - for (const document of documents) { - if (query.test(document) === true) { - this.#documents.delete(document.id); - this.#pending.remove(document.id); - this.broadcast("remove", document); - count += 1; - } - } + + const documents = await this.find(filter); + const tx = this.db.transaction(this.name, "readwrite"); + + await Promise.all(documents.map((data) => tx.store.delete(data.id))); + await tx.done; + + this.broadcast("remove", documents); this.#cache.flush(); + this.log(logger.result({ count: documents.length })); - return new RemoveResult(count); + + return new RemoveResult(documents.length); } /* @@ -257,7 +376,10 @@ export class IndexedDbStorage extends Stora */ async count(filter?: Filter>): Promise { - return new Query(filter ?? {}).find(Array.from(this.#documents.values())).count(); + if (filter !== undefined) { + return (await this.find(filter)).length; + } + return this.db.count(this.name); } /* @@ -268,16 +390,19 @@ export class IndexedDbStorage extends Stora async flush(): Promise { await this.db.clear(this.name); - this.#documents.clear(); - } - - /* - |-------------------------------------------------------------------------------- - | Save - |-------------------------------------------------------------------------------- - */ - - async save(): Promise { - this.#pending.save(); } } + +/* + |-------------------------------------------------------------------------------- + | Utils + |-------------------------------------------------------------------------------- + */ + +export function isObject(v: any): v is object { + if (!v) { + return false; + } + const proto = Object.getPrototypeOf(v); + return (proto === OBJECT_PROTOTYPE || proto === null) && OBJECT_TAG === Object.prototype.toString.call(v); +} diff --git a/src/databases/memory/storage.ts b/src/databases/memory/storage.ts index d1e9bf9..dbd7a83 100644 --- a/src/databases/memory/storage.ts +++ b/src/databases/memory/storage.ts @@ -69,17 +69,14 @@ export class MemoryStorage extends Storage< condition?: Filter>, options?: UpdateOptions, ): Promise { - const query = new Query(filter); - for (const document of Array.from(this.#documents.values())) { - if (query.test(document) === true) { - const modified = update(document, expr, arrayFilters, condition, options); - if (modified.length > 0) { - this.#documents.set(document.id, document); - this.broadcast("updateOne", document); - return new UpdateResult(1, 1); - } - return new UpdateResult(1, 0); + for (const document of await this.find(filter)) { + const modified = update(document, expr, arrayFilters, condition, options); + if (modified.length > 0) { + this.#documents.set(document.id, document); + this.broadcast("updateOne", document); + return new UpdateResult(1, 1); } + return new UpdateResult(1, 0); } return new UpdateResult(0, 0); } @@ -91,41 +88,15 @@ export class MemoryStorage extends Storage< condition?: Filter>, options?: UpdateOptions, ): Promise { - const query = new Query(filter); - const documents: WithId[] = []; let matchedCount = 0; let modifiedCount = 0; - for (const document of Array.from(this.#documents.values())) { - if (query.test(document) === true) { - matchedCount += 1; - const modified = update(document, expr, arrayFilters, condition, options); - if (modified.length > 0) { - modifiedCount += 1; - documents.push(document); - this.#documents.set(document.id, document); - } - } - } - - this.broadcast("updateMany", documents); - - return new UpdateResult(matchedCount, modifiedCount); - } - - async replace(filter: Filter>, document: WithId): Promise { - const query = new Query(filter); - - const documents: WithId[] = []; - - let matchedCount = 0; - let modifiedCount = 0; - - for (const current of Array.from(this.#documents.values())) { - if (query.test(current) === true) { - matchedCount += 1; + for (const document of await this.find(filter)) { + matchedCount += 1; + const modified = update(document, expr, arrayFilters, condition, options); + if (modified.length > 0) { modifiedCount += 1; documents.push(document); this.#documents.set(document.id, document); @@ -137,18 +108,31 @@ export class MemoryStorage extends Storage< return new UpdateResult(matchedCount, modifiedCount); } - async remove(filter: Filter>): Promise { - const documents = Array.from(this.#documents.values()); - const query = new Query(filter); - let count = 0; - for (const document of documents) { - if (query.test(document) === true) { - this.#documents.delete(document.id); - this.broadcast("remove", document); - count += 1; - } + async replace(filter: Filter>, document: WithId): Promise { + const documents: WithId[] = []; + + let matchedCount = 0; + let modifiedCount = 0; + + for (const current of await this.find(filter)) { + matchedCount += 1; + modifiedCount += 1; + documents.push(document); + this.#documents.set(current.id, document); } - return new RemoveResult(count); + + this.broadcast("updateMany", documents); + + return new UpdateResult(matchedCount, modifiedCount); + } + + async remove(filter: Filter>): Promise { + const documents = await this.find(filter); + for (const document of documents) { + this.#documents.delete(document.id); + } + this.broadcast("remove", documents); + return new RemoveResult(documents.length); } async count(filter?: Filter>): Promise {