feat: add in original idb implementation

This commit is contained in:
2025-08-16 16:50:34 +02:00
parent 9d57f4b751
commit c9211086d6
6 changed files with 275 additions and 239 deletions

View File

@@ -6,19 +6,8 @@ import { Document } from "../../types.ts";
import { Registrars } from "../registrars.ts";
import { IndexedDbStorage } from "./storage.ts";
function log() {}
type StringRecord<T> = { [x: string]: T };
type Options = {
name: string;
version?: number;
registrars: Registrars[];
log?: DBLogger;
};
export class IndexedDatabase<T extends StringRecord<Document>> {
readonly #collections = new Map<keyof T, Collection<T[keyof T]>>();
export class IndexedDatabase<TCollections extends StringRecord<Document>> {
readonly #collections = new Map<keyof TCollections, Collection<TCollections[keyof TCollections]>>();
readonly #db: Promise<IDBPDatabase<unknown>>;
constructor(readonly options: Options) {
@@ -44,7 +33,9 @@ export class IndexedDatabase<T extends StringRecord<Document>> {
|--------------------------------------------------------------------------------
*/
collection<TSchema extends T[Name], Name extends keyof T = keyof T>(name: Name): Collection<TSchema> {
collection<TSchema extends TCollections[Name], Name extends keyof TCollections = keyof TCollections>(
name: Name,
): Collection<TSchema> {
const collection = this.#collections.get(name);
if (collection === undefined) {
throw new Error(`Collection '${name as string}' not found`);
@@ -72,3 +63,14 @@ export class IndexedDatabase<T extends StringRecord<Document>> {
this.#db.then((db) => db.close());
}
}
function log() {}
type StringRecord<TCollections> = { [x: string]: TCollections };
type Options = {
name: string;
version?: number;
registrars: Registrars[];
log?: DBLogger;
};

View File

@@ -1,76 +0,0 @@
import { Document } from "../../types.ts";
import { IndexedDbStorage } from "./storage.ts";
export class Pending<TSchema extends Document = Document> {
#storage: IndexedDbStorage<TSchema>;
readonly #upsert = new Map<string, TSchema>();
readonly #remove = new Set<string>();
#chunkSize = 500;
#saveScheduled = false;
#saving: Promise<void> | null = null;
constructor(storage: IndexedDbStorage<TSchema>) {
this.#storage = storage;
}
get isSaving() {
return this.#saving !== null;
}
upsert(document: any): void {
this.#remove.delete(document.id);
this.#upsert.set(document.id, document);
this.#schedule();
}
remove(id: any): void {
this.#upsert.delete(id);
this.#remove.add(id);
this.#schedule();
}
#schedule() {
if (!this.#saveScheduled) {
this.#saveScheduled = true;
queueMicrotask(() => {
this.#saveScheduled = false;
void this.save();
});
}
}
async save() {
if (this.#saving) return;
this.#saving = (async () => {
try {
while (this.#upsert.size > 0 || this.#remove.size > 0) {
const tx = this.#storage.db.transaction(this.#storage.name, "readwrite", { durability: "relaxed" });
const store = tx.store;
// Process removals
if (this.#remove.size > 0) {
const removals = Array.from(this.#remove).slice(0, this.#chunkSize);
removals.forEach((id) => this.#remove.delete(id));
await Promise.all(removals.map((id) => store.delete(id)));
}
// Process upserts
if (this.#upsert.size > 0) {
const upserts = Array.from(this.#upsert.values()).slice(0, this.#chunkSize);
upserts.forEach((doc) => this.#upsert.delete(doc.id));
await Promise.all(upserts.map((doc) => store.put(doc)));
}
await tx.done;
}
} finally {
this.#saving = null;
}
})();
await this.#saving;
}
}

View File

@@ -13,18 +13,17 @@ import {
} from "../../storage/operators/insert.ts";
import { RemoveResult } from "../../storage/operators/remove.ts";
import { UpdateResult } from "../../storage/operators/update.ts";
import { addOptions, Options, Storage } from "../../storage/storage.ts";
import { addOptions, Index, Options, Storage } from "../../storage/storage.ts";
import type { Document, Filter, WithId } from "../../types.ts";
import { IndexedDbCache } from "./cache.ts";
import { Pending } from "./pending.ts";
const OBJECT_PROTOTYPE = Object.getPrototypeOf({});
const OBJECT_TAG = "[object Object]";
const update = createUpdater({ cloneMode: "deep" });
export class IndexedDbStorage<TSchema extends Document = Document> extends Storage<TSchema> {
readonly #cache = new IndexedDbCache<TSchema>();
readonly #documents = new Map<string, WithId<TSchema>>();
readonly #pending: Pending<TSchema>;
readonly #promise: Promise<IDBPDatabase>;
@@ -37,22 +36,21 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
) {
super(name);
this.#promise = promise;
this.#pending = new Pending(this);
}
async resolve() {
if (this.#db === undefined) {
this.#db = await this.#promise;
}
const documents = await this.db.getAll(this.name);
for (const document of documents) {
this.#documents.set(document.id, document);
}
return this;
}
async has(id: string): Promise<boolean> {
return this.#documents.has(id);
const document = await this.db.getFromIndex(this.name, "id", id);
if (document !== undefined) {
return true;
}
return false;
}
get db() {
@@ -71,13 +69,11 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
async insertOne(data: Partial<TSchema>): Promise<InsertOneResult> {
const logger = new InsertLog(this.name);
const document = { ...data, id: data.id ?? crypto.randomUUID() } as WithId<TSchema>;
const document = { ...data, id: data.id ?? crypto.randomUUID() } as any;
if (await this.has(document.id)) {
throw new DuplicateDocumentError(document, this as any);
}
this.#documents.set(document.id, document);
this.#pending.upsert(document);
await this.db.transaction(this.name, "readwrite", { durability: "relaxed" }).store.add(document);
this.broadcast("insertOne", document);
this.#cache.flush();
@@ -87,23 +83,27 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
return getInsertOneResult(document);
}
async insertMany(documents: Partial<TSchema>[]): Promise<InsertManyResult> {
async insertMany(data: Partial<TSchema>[]): Promise<InsertManyResult> {
const logger = new InsertLog(this.name);
const result: TSchema[] = [];
for (const data of documents) {
const document = { ...data, id: data.id ?? crypto.randomUUID() } as WithId<TSchema>;
result.push(document);
this.#documents.set(document.id, document);
this.#pending.upsert(document);
}
const documents: WithId<TSchema>[] = [];
this.broadcast("insertMany", result);
const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" });
await Promise.all(
data.map((data) => {
const document = { ...data, id: data.id ?? crypto.randomUUID() } as WithId<TSchema>;
documents.push(document);
return tx.store.add(document);
}),
);
await tx.done;
this.broadcast("insertMany", documents);
this.#cache.flush();
this.log(logger.result());
return getInsertManyResult(result);
return getInsertManyResult(documents);
}
/*
@@ -113,7 +113,7 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
*/
async findById(id: string): Promise<WithId<TSchema> | undefined> {
return this.#documents.get(id);
return this.db.getFromIndex(this.name, "id", id);
}
async find(filter: Filter<WithId<TSchema>>, options: Options = {}): Promise<WithId<TSchema>[]> {
@@ -126,11 +126,101 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
return cached;
}
let cursor = new Query(filter ?? {}).find<TSchema>(Array.from(this.#documents.values()));
const indexes = this.#resolveIndexes(filter);
let cursor = new Query(filter).find<TSchema>(await this.#getAll({ ...options, ...indexes }));
if (options !== undefined) {
cursor = addOptions(cursor, options);
}
return cursor.all() as WithId<TSchema>[];
const documents = cursor.all() as WithId<TSchema>[];
this.#cache.set(this.#cache.hash(filter, options), documents);
this.log(logger.result());
return documents;
}
/**
* TODO: Prototype! Needs to cover more mongodb query cases and investigation around
* nested indexing in indexeddb.
*/
#resolveIndexes(filter: any): { index?: { [key: string]: any } } {
const indexNames = this.db.transaction(this.name, "readonly").store.indexNames;
const index: { [key: string]: any } = {};
for (const key in filter) {
if (indexNames.contains(key) === true) {
let val: any;
if (isObject(filter[key]) === true) {
if ((filter as any)[key]["$in"] !== undefined) {
val = (filter as any)[key]["$in"];
}
} else {
val = filter[key];
}
if (val !== undefined) {
index[key] = val;
}
}
}
if (Object.keys(index).length > 0) {
return { index };
}
return {};
}
async #getAll({ index, offset, range, limit }: Options) {
if (index !== undefined) {
return this.#getAllByIndex(index);
}
if (range !== undefined) {
return this.db.getAll(this.name, IDBKeyRange.bound(range.from, range.to));
}
if (offset !== undefined) {
return this.#getAllByOffset(offset.value, offset.direction, limit);
}
return this.db.getAll(this.name, undefined, limit);
}
async #getAllByIndex(index: Index) {
let result = new Set();
for (const key in index) {
const value = index[key];
if (Array.isArray(value)) {
for (const idx of value) {
const values = await this.db.getAllFromIndex(this.name, key, idx);
result = new Set([...result, ...values]);
}
} else {
const values = await this.db.getAllFromIndex(this.name, key, value);
result = new Set([...result, ...values]);
}
}
return Array.from(result);
}
async #getAllByOffset(value: string, direction: 1 | -1, limit?: number) {
if (direction === 1) {
return this.db.getAll(this.name, IDBKeyRange.lowerBound(value), limit);
}
return this.#getAllByDescOffset(value, limit);
}
async #getAllByDescOffset(value: string, limit?: number) {
if (limit === undefined) {
return this.db.getAll(this.name, IDBKeyRange.upperBound(value));
}
const result = [];
let cursor = await this.db
.transaction(this.name, "readonly")
.store.openCursor(IDBKeyRange.upperBound(value), "prev");
for (let i = 0; i < limit; i++) {
if (cursor === null) {
break;
}
result.push(cursor.value);
cursor = await cursor.continue();
}
return result.reverse();
}
/*
@@ -146,18 +236,12 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
): Promise<UpdateResult> {
const query = new Query(filter);
for (const document of Array.from(this.#documents.values())) {
if (query.test(document) === true) {
const modified = update(document, expr, arrayFilters, condition, options);
if (modified.length > 0) {
this.#documents.set(document.id, document);
this.#pending.upsert(document);
this.broadcast("updateOne", document);
return new UpdateResult(1, 1);
}
return new UpdateResult(1, 0);
}
if (typeof filter.id === "string") {
return this.#update(filter.id, expr, arrayFilters, condition, options);
}
const documents = await this.find(filter);
if (documents.length > 0) {
return this.#update(documents[0].id, expr, arrayFilters, condition, options);
}
return new UpdateResult(0, 0);
}
@@ -170,60 +254,96 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
options?: UpdateOptions,
): Promise<UpdateResult> {
const logger = new UpdateLog(this.name, { filter, expr, arrayFilters, condition, options });
const query = new Query(filter);
const ids = await this.find(filter).then((data) => data.map((d) => d.id));
const documents: WithId<TSchema>[] = [];
let matchedCount = 0;
let modifiedCount = 0;
for (const document of Array.from(this.#documents.values())) {
if (query.test(document) === true) {
matchedCount += 1;
const modified = update(document, expr, arrayFilters, condition, options);
if (modified.length > 0) {
modifiedCount += 1;
documents.push(document);
this.#documents.set(document.id, document);
this.#pending.upsert(document);
}
}
}
const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" });
await Promise.all(
ids.map((id) =>
tx.store.get(id).then((current) => {
if (current === undefined) {
return;
}
const modified = update(current, expr, arrayFilters, condition, options);
if (modified.length > 0) {
modifiedCount += 1;
documents.push(current);
return tx.store.put(current);
}
}),
),
);
await tx.done;
this.broadcast("updateMany", documents);
this.#cache.flush();
this.log(logger.result());
return new UpdateResult(matchedCount, modifiedCount);
return new UpdateResult(ids.length, modifiedCount);
}
async replace(filter: Filter<WithId<TSchema>>, document: WithId<TSchema>): Promise<UpdateResult> {
const logger = new ReplaceLog(this.name, document);
const query = new Query(filter);
const ids = await this.find(filter).then((data) => data.map((d) => d.id));
const documents: WithId<TSchema>[] = [];
const count = ids.length;
let matchedCount = 0;
let modifiedCount = 0;
for (const current of Array.from(this.#documents.values())) {
if (query.test(current) === true) {
matchedCount += 1;
modifiedCount += 1;
documents.push(document);
this.#documents.set(document.id, document);
this.#pending.upsert(document);
}
}
const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" });
await Promise.all(
ids.map((id) => {
const next = { ...document, id };
documents.push(next);
return tx.store.put(next);
}),
);
await tx.done;
this.broadcast("updateMany", documents);
this.#cache.flush();
this.log(logger.result({ count: matchedCount }));
this.log(logger.result({ count }));
return new UpdateResult(matchedCount, modifiedCount);
return new UpdateResult(count, count);
}
async #update(
id: string | number,
expr: UpdateExpression,
arrayFilters?: Filter<WithId<TSchema>>[],
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
): Promise<UpdateResult> {
const logger = new UpdateLog(this.name, { id, expr });
const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" });
const current = await tx.store.get(id);
if (current === undefined) {
await tx.done;
return new UpdateResult(0, 0);
}
const modified = await update(current, expr, arrayFilters, condition, options);
if (modified.length > 0) {
await tx.store.put(current);
}
await tx.done;
if (modified.length > 0) {
this.broadcast("updateOne", current);
this.log(logger.result());
this.#cache.flush();
return new UpdateResult(1, 1);
}
return new UpdateResult(1);
}
/*
@@ -234,20 +354,19 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
async remove(filter: Filter<WithId<TSchema>>): Promise<RemoveResult> {
const logger = new RemoveLog(this.name, { filter });
const documents = Array.from(this.#documents.values());
const query = new Query(filter);
let count = 0;
for (const document of documents) {
if (query.test(document) === true) {
this.#documents.delete(document.id);
this.#pending.remove(document.id);
this.broadcast("remove", document);
count += 1;
}
}
const documents = await this.find(filter);
const tx = this.db.transaction(this.name, "readwrite");
await Promise.all(documents.map((data) => tx.store.delete(data.id)));
await tx.done;
this.broadcast("remove", documents);
this.#cache.flush();
this.log(logger.result({ count: documents.length }));
return new RemoveResult(count);
return new RemoveResult(documents.length);
}
/*
@@ -257,7 +376,10 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
*/
async count(filter?: Filter<WithId<TSchema>>): Promise<number> {
return new Query(filter ?? {}).find(Array.from(this.#documents.values())).count();
if (filter !== undefined) {
return (await this.find(filter)).length;
}
return this.db.count(this.name);
}
/*
@@ -268,16 +390,19 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
async flush(): Promise<void> {
await this.db.clear(this.name);
this.#documents.clear();
}
/*
|--------------------------------------------------------------------------------
| Save
|--------------------------------------------------------------------------------
*/
async save(): Promise<void> {
this.#pending.save();
}
}
/*
|--------------------------------------------------------------------------------
| Utils
|--------------------------------------------------------------------------------
*/
export function isObject(v: any): v is object {
if (!v) {
return false;
}
const proto = Object.getPrototypeOf(v);
return (proto === OBJECT_PROTOTYPE || proto === null) && OBJECT_TAG === Object.prototype.toString.call(v);
}

View File

@@ -69,17 +69,14 @@ export class MemoryStorage<TSchema extends Document = Document> extends Storage<
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
): Promise<UpdateResult> {
const query = new Query(filter);
for (const document of Array.from(this.#documents.values())) {
if (query.test(document) === true) {
const modified = update(document, expr, arrayFilters, condition, options);
if (modified.length > 0) {
this.#documents.set(document.id, document);
this.broadcast("updateOne", document);
return new UpdateResult(1, 1);
}
return new UpdateResult(1, 0);
for (const document of await this.find(filter)) {
const modified = update(document, expr, arrayFilters, condition, options);
if (modified.length > 0) {
this.#documents.set(document.id, document);
this.broadcast("updateOne", document);
return new UpdateResult(1, 1);
}
return new UpdateResult(1, 0);
}
return new UpdateResult(0, 0);
}
@@ -91,41 +88,15 @@ export class MemoryStorage<TSchema extends Document = Document> extends Storage<
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
): Promise<UpdateResult> {
const query = new Query(filter);
const documents: WithId<TSchema>[] = [];
let matchedCount = 0;
let modifiedCount = 0;
for (const document of Array.from(this.#documents.values())) {
if (query.test(document) === true) {
matchedCount += 1;
const modified = update(document, expr, arrayFilters, condition, options);
if (modified.length > 0) {
modifiedCount += 1;
documents.push(document);
this.#documents.set(document.id, document);
}
}
}
this.broadcast("updateMany", documents);
return new UpdateResult(matchedCount, modifiedCount);
}
async replace(filter: Filter<WithId<TSchema>>, document: WithId<TSchema>): Promise<UpdateResult> {
const query = new Query(filter);
const documents: WithId<TSchema>[] = [];
let matchedCount = 0;
let modifiedCount = 0;
for (const current of Array.from(this.#documents.values())) {
if (query.test(current) === true) {
matchedCount += 1;
for (const document of await this.find(filter)) {
matchedCount += 1;
const modified = update(document, expr, arrayFilters, condition, options);
if (modified.length > 0) {
modifiedCount += 1;
documents.push(document);
this.#documents.set(document.id, document);
@@ -137,18 +108,31 @@ export class MemoryStorage<TSchema extends Document = Document> extends Storage<
return new UpdateResult(matchedCount, modifiedCount);
}
async remove(filter: Filter<WithId<TSchema>>): Promise<RemoveResult> {
const documents = Array.from(this.#documents.values());
const query = new Query(filter);
let count = 0;
for (const document of documents) {
if (query.test(document) === true) {
this.#documents.delete(document.id);
this.broadcast("remove", document);
count += 1;
}
async replace(filter: Filter<WithId<TSchema>>, document: WithId<TSchema>): Promise<UpdateResult> {
const documents: WithId<TSchema>[] = [];
let matchedCount = 0;
let modifiedCount = 0;
for (const current of await this.find(filter)) {
matchedCount += 1;
modifiedCount += 1;
documents.push(document);
this.#documents.set(current.id, document);
}
return new RemoveResult(count);
this.broadcast("updateMany", documents);
return new UpdateResult(matchedCount, modifiedCount);
}
async remove(filter: Filter<WithId<TSchema>>): Promise<RemoveResult> {
const documents = await this.find(filter);
for (const document of documents) {
this.#documents.delete(document.id);
}
this.broadcast("remove", documents);
return new RemoveResult(documents.length);
}
async count(filter?: Filter<WithId<TSchema>>): Promise<number> {