refactor: collection setup

This commit is contained in:
2026-01-03 00:44:59 +01:00
parent b9b12f790d
commit 7df5ed685d
30 changed files with 1056 additions and 1234 deletions

View File

@@ -1,4 +1,4 @@
import type { Document, WithId } from "./types.ts";
import type { AnyObject } from "mingo/types";
export const BroadcastChannel =
globalThis.BroadcastChannel ??
@@ -8,16 +8,16 @@ export const BroadcastChannel =
close() {}
};
export type StorageBroadcast<TSchema extends Document = Document> =
export type StorageBroadcast =
| {
name: string;
type: "insertOne" | "updateOne";
data: WithId<TSchema>;
data: AnyObject;
}
| {
name: string;
type: "insertMany" | "updateMany" | "remove";
data: WithId<TSchema>[];
data: AnyObject[];
}
| {
name: string;

View File

@@ -1,18 +1,12 @@
import { UpdateOptions } from "mingo/core";
import { UpdateExpression } from "mingo/updater";
import { Observable, Subject, Subscription } from "rxjs";
import type { AnyObject, Criteria } from "mingo/types";
import type { Modifier } from "mingo/updater";
import { Observable, type Subject, type Subscription } from "rxjs";
import type z from "zod";
import type { ZodObject, ZodRawShape } from "zod";
import { observe, observeOne } from "./observe/mod.ts";
import {
ChangeEvent,
InsertManyResult,
InsertOneResult,
Options,
RemoveResult,
Storage,
UpdateResult,
} from "./storage/mod.ts";
import { Document, Filter, WithId } from "./types.ts";
import type { ChangeEvent, InsertResult, QueryOptions, Storage, UpdateResult } from "./storage/mod.ts";
import type { AnyDocument } from "./types.ts";
/*
|--------------------------------------------------------------------------------
@@ -20,59 +14,100 @@ import { Document, Filter, WithId } from "./types.ts";
|--------------------------------------------------------------------------------
*/
export class Collection<TSchema extends Document = Document> {
constructor(
readonly name: string,
readonly storage: Storage<TSchema>,
) {}
export class Collection<
TOptions extends AnyCollectionOptions = AnyCollectionOptions,
TAdapter extends Storage = TOptions["adapter"],
TPrimaryKey extends string = TOptions["primaryKey"],
TSchema extends AnyDocument = z.output<ZodObject<TOptions["schema"]>>,
> {
declare readonly $schema: TSchema;
constructor(readonly options: TOptions) {}
get observable(): {
change: Subject<ChangeEvent<TSchema>>;
change: Subject<ChangeEvent>;
flush: Subject<void>;
} {
return this.storage.observable;
}
get storage(): TAdapter {
return this.options.adapter;
}
/*
|--------------------------------------------------------------------------------
| Mutators
|--------------------------------------------------------------------------------
*/
async insertOne(document: Partial<WithId<TSchema>>): Promise<InsertOneResult> {
return this.storage.resolve().then((storage) => storage.insertOne(document));
async insertOne(values: TSchema | Omit<TSchema, TPrimaryKey>): Promise<InsertResult> {
return this.storage.resolve().then((storage) =>
storage.insertOne({
collection: this.options.name,
pkey: this.options.primaryKey,
values,
}),
);
}
async insertMany(documents: Partial<WithId<TSchema>>[]): Promise<InsertManyResult> {
return this.storage.resolve().then((storage) => storage.insertMany(documents));
async insertMany(values: (TSchema | Omit<TSchema, TPrimaryKey>)[]): Promise<InsertResult> {
return this.storage.resolve().then((storage) =>
storage.insertMany({
collection: this.options.name,
pkey: this.options.primaryKey,
values,
}),
);
}
async updateOne(
filter: Filter<WithId<TSchema>>,
expr: UpdateExpression,
arrayFilters?: Filter<WithId<TSchema>>[],
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
condition: Criteria<TSchema>,
modifier: Modifier<TSchema>,
arrayFilters?: AnyObject[],
): Promise<UpdateResult> {
return this.storage.resolve().then((storage) => storage.updateOne(filter, expr, arrayFilters, condition, options));
return this.storage.resolve().then((storage) =>
storage.updateOne({
collection: this.options.name,
pkey: this.options.primaryKey,
condition,
modifier,
arrayFilters,
}),
);
}
async updateMany(
filter: Filter<WithId<TSchema>>,
expr: UpdateExpression,
arrayFilters?: Filter<WithId<TSchema>>[],
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
condition: Criteria<TSchema>,
modifier: Modifier<TSchema>,
arrayFilters?: AnyObject[],
): Promise<UpdateResult> {
return this.storage.resolve().then((storage) => storage.updateMany(filter, expr, arrayFilters, condition, options));
return this.storage.resolve().then((storage) =>
storage.updateMany({
collection: this.options.name,
pkey: this.options.primaryKey,
condition,
modifier,
arrayFilters,
}),
);
}
async replaceOne(filter: Filter<WithId<TSchema>>, document: TSchema): Promise<UpdateResult> {
return this.storage.resolve().then((storage) => storage.replace(filter, document));
async replaceOne(condition: Criteria<TSchema>, document: TSchema): Promise<UpdateResult> {
return this.storage.resolve().then((storage) =>
storage.replace({
collection: this.options.name,
pkey: this.options.primaryKey,
condition,
document,
}),
);
}
async remove(filter: Filter<WithId<TSchema>>): Promise<RemoveResult> {
return this.storage.resolve().then((storage) => storage.remove(filter));
async remove(condition: Criteria<TSchema>): Promise<number> {
return this.storage
.resolve()
.then((storage) => storage.remove({ collection: this.options.name, pkey: this.options.primaryKey, condition }));
}
/*
@@ -82,37 +117,37 @@ export class Collection<TSchema extends Document = Document> {
*/
subscribe(
filter?: Filter<WithId<TSchema>>,
condition?: Criteria<TSchema>,
options?: SubscribeToSingle,
next?: (document: WithId<TSchema> | undefined) => void,
next?: (document: TSchema | undefined) => void,
): Subscription;
subscribe(
filter?: Filter<WithId<TSchema>>,
condition?: Criteria<TSchema>,
options?: SubscribeToMany,
next?: (documents: WithId<TSchema>[], changed: WithId<TSchema>[], type: ChangeEvent["type"]) => void,
next?: (documents: TSchema[], changed: TSchema[], type: ChangeEvent["type"]) => void,
): Subscription;
subscribe(filter: Filter<WithId<TSchema>> = {}, options?: Options, next?: (...args: any[]) => void): Subscription {
subscribe(condition: Criteria<TSchema> = {}, options?: QueryOptions, next?: (...args: any[]) => void): Subscription {
if (options?.limit === 1) {
return this.#observeOne(filter).subscribe({ next });
return this.#observeOne(condition).subscribe({ next });
}
return this.#observe(filter, options).subscribe({
next: (value: [WithId<TSchema>[], WithId<TSchema>[], ChangeEvent["type"]]) => next?.(...value),
return this.#observe(condition, options).subscribe({
next: (value: [TSchema[], TSchema[], ChangeEvent["type"]]) => next?.(...value),
});
}
#observe(
filter: Filter<WithId<TSchema>> = {},
options?: Options,
): Observable<[WithId<TSchema>[], WithId<TSchema>[], ChangeEvent["type"]]> {
return new Observable<[WithId<TSchema>[], WithId<TSchema>[], ChangeEvent["type"]]>((subscriber) => {
filter: Criteria<TSchema> = {},
options?: QueryOptions,
): Observable<[TSchema[], TSchema[], ChangeEvent["type"]]> {
return new Observable<[TSchema[], TSchema[], ChangeEvent["type"]]>((subscriber) => {
return observe(this as any, filter, options, (values, changed, type) =>
subscriber.next([values, changed, type] as any),
);
});
}
#observeOne(filter: Filter<WithId<TSchema>> = {}): Observable<WithId<TSchema> | undefined> {
return new Observable<WithId<TSchema> | undefined>((subscriber) => {
#observeOne(filter: Criteria<TSchema> = {}): Observable<TSchema | undefined> {
return new Observable<TSchema | undefined>((subscriber) => {
return observeOne(this as any, filter, (values) => subscriber.next(values as any));
});
}
@@ -126,32 +161,34 @@ export class Collection<TSchema extends Document = Document> {
/**
* Retrieve a record by the document 'id' key.
*/
async findById(id: string): Promise<WithId<TSchema> | undefined> {
return this.storage.resolve().then((storage) => storage.findById(id));
async findById(id: string): Promise<TSchema | undefined> {
return this.storage.resolve().then((storage) => storage.findById({ collection: this.options.name, id }));
}
/**
* Performs a mingo filter search over the collection data and returns
* a single document if one was found matching the filter and options.
*/
async findOne(filter: Filter<WithId<TSchema>> = {}, options?: Options): Promise<WithId<TSchema> | undefined> {
return this.find(filter, options).then(([document]) => document);
async findOne(condition: Criteria<TSchema> = {}, options?: QueryOptions): Promise<TSchema | undefined> {
return this.find(condition, options).then(([document]) => document);
}
/**
* Performs a mingo filter search over the collection data and returns any
* documents matching the provided filter and options.
*/
async find(filter: Filter<WithId<TSchema>> = {}, options?: Options): Promise<WithId<TSchema>[]> {
return this.storage.resolve().then((storage) => storage.find(filter, options));
async find(condition: Criteria<TSchema> = {}, options?: QueryOptions): Promise<TSchema[]> {
return this.storage
.resolve()
.then((storage) => storage.find({ collection: this.options.name, condition, options }));
}
/**
* Performs a mingo filter search over the collection data and returns
* the count of all documents found matching the filter and options.
*/
async count(filter?: Filter<WithId<TSchema>>): Promise<number> {
return this.storage.resolve().then((storage) => storage.count(filter));
async count(condition?: Criteria<TSchema>): Promise<number> {
return this.storage.resolve().then((storage) => storage.count({ collection: this.options.name, condition }));
}
/**
@@ -172,18 +209,32 @@ export class Collection<TSchema extends Document = Document> {
*/
export type SubscriptionOptions = {
sort?: Options["sort"];
skip?: Options["skip"];
range?: Options["range"];
offset?: Options["offset"];
limit?: Options["limit"];
index?: Options["index"];
sort?: QueryOptions["sort"];
skip?: QueryOptions["skip"];
range?: QueryOptions["range"];
offset?: QueryOptions["offset"];
limit?: QueryOptions["limit"];
index?: QueryOptions["index"];
};
export type SubscribeToSingle = Options & {
export type SubscribeToSingle = QueryOptions & {
limit: 1;
};
export type SubscribeToMany = Options & {
export type SubscribeToMany = QueryOptions & {
limit?: number;
};
type AnyCollectionOptions = CollectionOptions<any, any, any, any>;
type CollectionOptions<
TName extends string,
TAdapter extends Storage,
TPrimaryKey extends string | number | symbol,
TSchema extends ZodRawShape,
> = {
name: TName;
adapter: TAdapter;
primaryKey: TPrimaryKey;
schema: TSchema;
};

View File

@@ -1,16 +1,16 @@
import { hashCodeQuery } from "../../hash.ts";
import { Options } from "../../storage/mod.ts";
import type { Document, Filter, WithId } from "../../types.ts";
import type { QueryOptions } from "../../storage/mod.ts";
import type { Document, Filter } from "../../types.ts";
export class IndexedDbCache<TSchema extends Document = Document> {
export class IndexedDBCache<TSchema extends Document = Document> {
readonly #cache = new Map<number, string[]>();
readonly #documents = new Map<string, WithId<TSchema>>();
readonly #documents = new Map<string, TSchema>();
hash(filter: Filter<WithId<TSchema>>, options: Options): number {
hash(filter: Filter<TSchema>, options: QueryOptions): number {
return hashCodeQuery(filter, options);
}
set(hashCode: number, documents: WithId<TSchema>[]) {
set(hashCode: number, documents: TSchema[]) {
this.#cache.set(
hashCode,
documents.map((document) => document.id),
@@ -20,10 +20,10 @@ export class IndexedDbCache<TSchema extends Document = Document> {
}
}
get(hashCode: number): WithId<TSchema>[] | undefined {
get(hashCode: number): TSchema[] | undefined {
const ids = this.#cache.get(hashCode);
if (ids !== undefined) {
return ids.map((id) => this.#documents.get(id) as WithId<TSchema>);
return ids.map((id) => this.#documents.get(id) as TSchema);
}
}

View File

@@ -1,29 +1,32 @@
import { IDBPDatabase, openDB } from "idb";
import { type IDBPDatabase, openDB } from "idb";
import { Collection } from "../../collection.ts";
import { DBLogger } from "../../logger.ts";
import { Document } from "../../types.ts";
import { Registrars } from "../registrars.ts";
import { IndexedDbStorage } from "./storage.ts";
import type { DBLogger } from "../../logger.ts";
import type { Document } from "../../types.ts";
import type { Registrars } from "../registrars.ts";
import { IndexedDBStorage } from "./storage.ts";
export class IndexedDatabase<TCollections extends StringRecord<Document>> {
export class IndexedDB<TCollections extends StringRecord<Document>> {
readonly #collections = new Map<keyof TCollections, Collection<TCollections[keyof TCollections]>>();
readonly #db: Promise<IDBPDatabase<unknown>>;
constructor(readonly options: Options) {
this.#db = openDB(options.name, options.version ?? 1, {
upgrade: (db: IDBPDatabase) => {
for (const { name, indexes = [] } of options.registrars) {
const store = db.createObjectStore(name as string, { keyPath: "id" });
store.createIndex("id", "id", { unique: true });
for (const { name, primaryKey = "id", indexes = [] } of options.registrars) {
const store = db.createObjectStore(name as string, { keyPath: primaryKey });
store.createIndex(primaryKey, primaryKey, { unique: true });
for (const [keyPath, options] of indexes) {
store.createIndex(keyPath, keyPath, options);
}
}
},
});
for (const { name } of options.registrars) {
this.#collections.set(name, new Collection(name, new IndexedDbStorage(name, this.#db, options.log ?? log)));
for (const { name, primaryKey = "id" } of options.registrars) {
this.#collections.set(
name,
new Collection(name, new IndexedDBStorage(name, primaryKey, this.#db, options.log ?? log)),
);
}
}
@@ -33,9 +36,7 @@ export class IndexedDatabase<TCollections extends StringRecord<Document>> {
|--------------------------------------------------------------------------------
*/
collection<TSchema extends TCollections[Name], Name extends keyof TCollections = keyof TCollections>(
name: Name,
): Collection<TSchema> {
collection<Name extends keyof TCollections = keyof TCollections>(name: Name) {
const collection = this.#collections.get(name);
if (collection === undefined) {
throw new Error(`Collection '${name as string}' not found`);

View File

@@ -1,9 +1,10 @@
import { IDBPDatabase } from "idb";
import { createUpdater, Query } from "mingo";
import { UpdateOptions } from "mingo/core";
import { UpdateExpression } from "mingo/updater";
import type { IDBPDatabase } from "idb";
import { Query, update } from "mingo";
import type { Criteria, Options } from "mingo/types";
import type { CloneMode, Modifier } from "mingo/updater";
import { DBLogger, InsertLog, QueryLog, RemoveLog, ReplaceLog, UpdateLog } from "../../logger.ts";
import { type DBLogger, InsertLog, QueryLog, RemoveLog, ReplaceLog, UpdateLog } from "../../logger.ts";
import { getDocumentWithPrimaryKey } from "../../primary-key.ts";
import { DuplicateDocumentError } from "../../storage/errors.ts";
import {
getInsertManyResult,
@@ -13,17 +14,18 @@ import {
} from "../../storage/operators/insert.ts";
import { RemoveResult } from "../../storage/operators/remove.ts";
import { UpdateResult } from "../../storage/operators/update.ts";
import { addOptions, Index, Options, Storage } from "../../storage/storage.ts";
import type { Document, Filter, WithId } from "../../types.ts";
import { IndexedDbCache } from "./cache.ts";
import { addOptions, type Index, type QueryOptions, Storage } from "../../storage/storage.ts";
import type { Document, Filter } from "../../types.ts";
import { IndexedDBCache } from "./cache.ts";
const OBJECT_PROTOTYPE = Object.getPrototypeOf({});
const OBJECT_TAG = "[object Object]";
const update = createUpdater({ cloneMode: "deep" });
export class IndexedDbStorage<TSchema extends Document = Document> extends Storage<TSchema> {
readonly #cache = new IndexedDbCache<TSchema>();
export class IndexedDBStorage<TPrimaryKey extends string, TSchema extends Document = Document> extends Storage<
TPrimaryKey,
TSchema
> {
readonly #cache = new IndexedDBCache<TSchema>();
readonly #promise: Promise<IDBPDatabase>;
@@ -31,10 +33,11 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
constructor(
name: string,
primaryKey: TPrimaryKey,
promise: Promise<IDBPDatabase>,
readonly log: DBLogger,
) {
super(name);
super(name, primaryKey);
this.#promise = promise;
}
@@ -66,11 +69,12 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
|--------------------------------------------------------------------------------
*/
async insertOne(data: Partial<TSchema>): Promise<InsertOneResult> {
async insertOne(values: TSchema | Omit<TSchema, TPrimaryKey>): Promise<InsertOneResult> {
const logger = new InsertLog(this.name);
const document = { ...data, id: data.id ?? crypto.randomUUID() } as any;
if (await this.has(document.id)) {
const document = getDocumentWithPrimaryKey(this.primaryKey, values);
if (await this.has(document[this.primaryKey])) {
throw new DuplicateDocumentError(document, this as any);
}
await this.db.transaction(this.name, "readwrite", { durability: "relaxed" }).store.add(document);
@@ -83,15 +87,15 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
return getInsertOneResult(document);
}
async insertMany(data: Partial<TSchema>[]): Promise<InsertManyResult> {
async insertMany(values: (TSchema | Omit<TSchema, TPrimaryKey>)[]): Promise<InsertManyResult> {
const logger = new InsertLog(this.name);
const documents: WithId<TSchema>[] = [];
const documents: TSchema[] = [];
const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" });
await Promise.all(
data.map((data) => {
const document = { ...data, id: data.id ?? crypto.randomUUID() } as WithId<TSchema>;
values.map((values) => {
const document = getDocumentWithPrimaryKey(this.primaryKey, values);
documents.push(document);
return tx.store.add(document);
}),
@@ -112,11 +116,11 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
|--------------------------------------------------------------------------------
*/
async findById(id: string): Promise<WithId<TSchema> | undefined> {
async findById(id: string): Promise<TSchema | undefined> {
return this.db.getFromIndex(this.name, "id", id);
}
async find(filter: Filter<WithId<TSchema>>, options: Options = {}): Promise<WithId<TSchema>[]> {
async find(filter: Filter<TSchema>, options: QueryOptions = {}): Promise<TSchema[]> {
const logger = new QueryLog(this.name, { filter, options });
const hashCode = this.#cache.hash(filter, options);
@@ -132,7 +136,7 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
cursor = addOptions(cursor, options);
}
const documents = cursor.all() as WithId<TSchema>[];
const documents = cursor.all() as TSchema[];
this.#cache.set(this.#cache.hash(filter, options), documents);
this.log(logger.result());
@@ -151,8 +155,8 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
if (indexNames.contains(key) === true) {
let val: any;
if (isObject(filter[key]) === true) {
if ((filter as any)[key]["$in"] !== undefined) {
val = (filter as any)[key]["$in"];
if ((filter as any)[key].$in !== undefined) {
val = (filter as any)[key].$in;
}
} else {
val = filter[key];
@@ -168,7 +172,7 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
return {};
}
async #getAll({ index, offset, range, limit }: Options) {
async #getAll({ index, offset, range, limit }: QueryOptions) {
if (index !== undefined) {
return this.#getAllByIndex(index);
}
@@ -230,34 +234,34 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
*/
async updateOne(
filter: Filter<WithId<TSchema>>,
expr: UpdateExpression,
arrayFilters?: Filter<WithId<TSchema>>[],
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
filter: Filter<TSchema>,
modifier: Modifier<TSchema>,
arrayFilters?: Filter<TSchema>[],
condition?: Criteria<TSchema>,
options: { cloneMode?: CloneMode; queryOptions?: Partial<Options> } = { cloneMode: "deep" },
): Promise<UpdateResult> {
if (typeof filter.id === "string") {
return this.#update(filter.id, expr, arrayFilters, condition, options);
return this.#update(filter.id, modifier, arrayFilters, condition, options);
}
const documents = await this.find(filter);
if (documents.length > 0) {
return this.#update(documents[0].id, expr, arrayFilters, condition, options);
return this.#update(documents[0].id, modifier, arrayFilters, condition, options);
}
return new UpdateResult(0, 0);
}
async updateMany(
filter: Filter<WithId<TSchema>>,
expr: UpdateExpression,
arrayFilters?: Filter<WithId<TSchema>>[],
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
filter: Filter<TSchema>,
modifier: Modifier<TSchema>,
arrayFilters?: Filter<TSchema>[],
condition?: Criteria<TSchema>,
options: { cloneMode?: CloneMode; queryOptions?: Partial<Options> } = { cloneMode: "deep" },
): Promise<UpdateResult> {
const logger = new UpdateLog(this.name, { filter, expr, arrayFilters, condition, options });
const logger = new UpdateLog(this.name, { filter, modifier, arrayFilters, condition, options });
const ids = await this.find(filter).then((data) => data.map((d) => d.id));
const documents: WithId<TSchema>[] = [];
const documents: TSchema[] = [];
let modifiedCount = 0;
const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" });
@@ -267,7 +271,7 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
if (current === undefined) {
return;
}
const modified = update(current, expr, arrayFilters, condition, options);
const modified = update(current, modifier, arrayFilters, condition, options);
if (modified.length > 0) {
modifiedCount += 1;
documents.push(current);
@@ -287,12 +291,12 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
return new UpdateResult(ids.length, modifiedCount);
}
async replace(filter: Filter<WithId<TSchema>>, document: WithId<TSchema>): Promise<UpdateResult> {
async replace(filter: Filter<TSchema>, document: TSchema): Promise<UpdateResult> {
const logger = new ReplaceLog(this.name, document);
const ids = await this.find(filter).then((data) => data.map((d) => d.id));
const documents: WithId<TSchema>[] = [];
const documents: TSchema[] = [];
const count = ids.length;
const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" });
@@ -315,12 +319,12 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
async #update(
id: string | number,
expr: UpdateExpression,
arrayFilters?: Filter<WithId<TSchema>>[],
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
modifier: Modifier<TSchema>,
arrayFilters?: Filter<TSchema>[],
condition?: Criteria<TSchema>,
options: { cloneMode?: CloneMode; queryOptions?: Partial<Options> } = { cloneMode: "deep" },
): Promise<UpdateResult> {
const logger = new UpdateLog(this.name, { id, expr });
const logger = new UpdateLog(this.name, { id, modifier });
const tx = this.db.transaction(this.name, "readwrite", { durability: "relaxed" });
@@ -330,7 +334,7 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
return new UpdateResult(0, 0);
}
const modified = await update(current, expr, arrayFilters, condition, options);
const modified = await update(current, modifier, arrayFilters, condition, options);
if (modified.length > 0) {
await tx.store.put(current);
}
@@ -352,7 +356,7 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
|--------------------------------------------------------------------------------
*/
async remove(filter: Filter<WithId<TSchema>>): Promise<RemoveResult> {
async remove(filter: Filter<TSchema>): Promise<RemoveResult> {
const logger = new RemoveLog(this.name, { filter });
const documents = await this.find(filter);
@@ -375,7 +379,7 @@ export class IndexedDbStorage<TSchema extends Document = Document> extends Stora
|--------------------------------------------------------------------------------
*/
async count(filter?: Filter<WithId<TSchema>>): Promise<number> {
async count(filter?: Filter<TSchema>): Promise<number> {
if (filter !== undefined) {
return (await this.find(filter)).length;
}

View File

@@ -1,6 +1,6 @@
import { Collection } from "../../collection.ts";
import { Document } from "../../types.ts";
import { Registrars } from "../registrars.ts";
import type { Document } from "../../types.ts";
import type { Registrars } from "../registrars.ts";
import { MemoryStorage } from "./storage.ts";
type Options = {

View File

@@ -1,145 +1,155 @@
import { createUpdater, Query } from "mingo";
import { UpdateOptions } from "mingo/core";
import { UpdateExpression } from "mingo/updater";
import { Query, update } from "mingo";
import type { AnyObject } from "mingo/types";
import { DuplicateDocumentError } from "../../storage/errors.ts";
import { getDocumentWithPrimaryKey } from "../../primary-key.ts";
import { Collections } from "../../storage/collections.ts";
import type { UpdatePayload } from "../../storage/mod.ts";
import type { InsertResult } from "../../storage/operators/insert.ts";
import type { UpdateResult } from "../../storage/operators/update.ts";
import {
getInsertManyResult,
getInsertOneResult,
type InsertManyResult,
type InsertOneResult,
} from "../../storage/operators/insert.ts";
import { RemoveResult } from "../../storage/operators/remove.ts";
import { UpdateResult } from "../../storage/operators/update.ts";
import { addOptions, Options, Storage } from "../../storage/storage.ts";
import type { Document, Filter, WithId } from "../../types.ts";
addOptions,
type CountPayload,
type FindByIdPayload,
type FindPayload,
type InsertManyPayload,
type InsertOnePayload,
type RemovePayload,
type ReplacePayload,
Storage,
} from "../../storage/storage.ts";
import type { AnyDocument } from "../../types.ts";
const update = createUpdater({ cloneMode: "deep" });
export class MemoryStorage<TSchema extends Document = Document> extends Storage<TSchema> {
readonly #documents = new Map<string, WithId<TSchema>>();
export class MemoryStorage extends Storage {
readonly #collections = new Collections();
async resolve() {
return this;
}
async has(id: string): Promise<boolean> {
return this.#documents.has(id);
}
async insertOne({ pkey, values, ...payload }: InsertOnePayload): Promise<InsertResult> {
const collection = this.#collections.get(payload.collection);
async insertOne(data: Partial<TSchema>): Promise<InsertOneResult> {
const document = { ...data, id: data.id ?? crypto.randomUUID() } as WithId<TSchema>;
if (await this.has(document.id)) {
throw new DuplicateDocumentError(document, this as any);
const document = getDocumentWithPrimaryKey(pkey, values);
if (collection.has(document[pkey])) {
return { insertCount: 0, insertIds: [] };
}
this.#documents.set(document.id, document);
collection.set(document[pkey], document);
this.broadcast("insertOne", document);
return getInsertOneResult(document);
return { insertCount: 1, insertIds: [document[pkey]] };
}
async insertMany(documents: Partial<TSchema>[]): Promise<InsertManyResult> {
const result: TSchema[] = [];
for (const data of documents) {
const document = { ...data, id: data.id ?? crypto.randomUUID() } as WithId<TSchema>;
result.push(document);
this.#documents.set(document.id, document);
async insertMany({ pkey, values, ...payload }: InsertManyPayload): Promise<InsertResult> {
const collection = this.#collections.get(payload.collection);
const documents: AnyDocument[] = [];
for (const insert of values) {
const document = getDocumentWithPrimaryKey(pkey, insert);
if (collection.has(document[pkey])) {
continue;
}
collection.set(document[pkey], document);
documents.push(document);
}
this.broadcast("insertMany", result);
if (documents.length > 0) {
this.broadcast("insertMany", documents);
}
return getInsertManyResult(result);
return { insertCount: documents.length, insertIds: documents.map((document) => document[pkey]) };
}
async findById(id: string): Promise<WithId<TSchema> | undefined> {
return this.#documents.get(id);
async findById({ collection, id }: FindByIdPayload): Promise<AnyObject | undefined> {
return this.#collections.get(collection).get(id);
}
async find(filter?: Filter<WithId<TSchema>>, options?: Options): Promise<WithId<TSchema>[]> {
let cursor = new Query(filter ?? {}).find<TSchema>(Array.from(this.#documents.values()));
async find({ condition = {}, options, ...payload }: FindPayload): Promise<AnyDocument[]> {
let cursor = new Query(condition).find<AnyDocument>(this.#collections.documents(payload.collection));
if (options !== undefined) {
cursor = addOptions(cursor, options);
}
return cursor.all() as WithId<TSchema>[];
return cursor.all();
}
async updateOne(
filter: Filter<WithId<TSchema>>,
expr: UpdateExpression,
arrayFilters?: Filter<WithId<TSchema>>[],
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
): Promise<UpdateResult> {
for (const document of await this.find(filter)) {
const modified = update(document, expr, arrayFilters, condition, options);
if (modified.length > 0) {
this.#documents.set(document.id, document);
this.broadcast("updateOne", document);
return new UpdateResult(1, 1);
}
return new UpdateResult(1, 0);
}
return new UpdateResult(0, 0);
}
async updateMany(
filter: Filter<WithId<TSchema>>,
expr: UpdateExpression,
arrayFilters?: Filter<WithId<TSchema>>[],
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
): Promise<UpdateResult> {
const documents: WithId<TSchema>[] = [];
async updateOne({ pkey, condition, modifier, arrayFilters, ...payload }: UpdatePayload): Promise<UpdateResult> {
const collection = this.#collections.get(payload.collection);
let matchedCount = 0;
let modifiedCount = 0;
for (const document of await this.find(filter)) {
for (const document of await this.find({ collection: payload.collection, condition, options: { limit: 1 } })) {
const modified = update(document, modifier, arrayFilters, undefined, { cloneMode: "deep" });
if (modified.length > 0) {
collection.set(document[pkey], document);
this.broadcast("updateOne", document);
modifiedCount += 1;
}
matchedCount += 1;
const modified = update(document, expr, arrayFilters, condition, options);
}
return { matchedCount, modifiedCount };
}
async updateMany({ pkey, condition, modifier, arrayFilters, ...payload }: UpdatePayload): Promise<UpdateResult> {
const collection = this.#collections.get(payload.collection);
const documents: AnyDocument[] = [];
let matchedCount = 0;
let modifiedCount = 0;
for (const document of await this.find({ collection: payload.collection, condition })) {
matchedCount += 1;
const modified = update(document, modifier, arrayFilters, undefined, { cloneMode: "deep" });
if (modified.length > 0) {
modifiedCount += 1;
documents.push(document);
this.#documents.set(document.id, document);
collection.set(document[pkey], document);
}
}
this.broadcast("updateMany", documents);
return new UpdateResult(matchedCount, modifiedCount);
return { matchedCount, modifiedCount };
}
async replace(filter: Filter<WithId<TSchema>>, document: WithId<TSchema>): Promise<UpdateResult> {
const documents: WithId<TSchema>[] = [];
async replace({ pkey, condition, document, ...payload }: ReplacePayload): Promise<UpdateResult> {
const collection = this.#collections.get(payload.collection);
let matchedCount = 0;
let modifiedCount = 0;
for (const current of await this.find(filter)) {
const documents: AnyDocument[] = [];
for (const current of await this.find({ collection: payload.collection, condition })) {
matchedCount += 1;
modifiedCount += 1;
documents.push(document);
this.#documents.set(current.id, document);
collection.set(current[pkey], document);
}
this.broadcast("updateMany", documents);
return new UpdateResult(matchedCount, modifiedCount);
return { matchedCount, modifiedCount };
}
async remove(filter: Filter<WithId<TSchema>>): Promise<RemoveResult> {
const documents = await this.find(filter);
async remove({ pkey, condition, ...payload }: RemovePayload): Promise<number> {
const collection = this.#collections.get(payload.collection);
const documents = await this.find({ collection: payload.collection, condition });
for (const document of documents) {
this.#documents.delete(document.id);
collection.delete(document[pkey]);
}
this.broadcast("remove", documents);
return new RemoveResult(documents.length);
return documents.length;
}
async count(filter?: Filter<WithId<TSchema>>): Promise<number> {
return new Query(filter ?? {}).find(Array.from(this.#documents.values())).count();
async count({ collection, condition = {} }: CountPayload): Promise<number> {
return new Query(condition).find(this.#collections.documents(collection)).all().length;
}
async flush(): Promise<void> {
this.#documents.clear();
this.#collections.flush();
}
}

View File

@@ -1,23 +1,16 @@
import { createUpdater, Query } from "mingo";
import { UpdateOptions } from "mingo/core";
import { UpdateExpression } from "mingo/updater";
import { Query, update } from "mingo";
import type { Criteria, Options } from "mingo/types";
import type { CloneMode, Modifier } from "mingo/updater";
import { getDocumentWithPrimaryKey } from "../../primary-key.ts";
import { DuplicateDocumentError } from "../../storage/errors.ts";
import {
getInsertManyResult,
getInsertOneResult,
InsertManyResult,
InsertOneResult,
} from "../../storage/operators/insert.ts";
import { RemoveResult } from "../../storage/operators/remove.ts";
import type { InsertResult } from "../../storage/operators/insert.ts";
import { UpdateResult } from "../../storage/operators/update.ts";
import { addOptions, Options, Storage } from "../../storage/storage.ts";
import { Document, Filter, WithId } from "../../types.ts";
import { addOptions, type QueryOptions, Storage } from "../../storage/storage.ts";
import type { AnyDocument } from "../../types.ts";
const update = createUpdater({ cloneMode: "deep" });
export class ObserverStorage<TSchema extends Document = Document> extends Storage<TSchema> {
readonly #documents = new Map<string, WithId<TSchema>>();
export class ObserverStorage extends Storage {
readonly #documents = new Map<string, AnyDocument>();
async resolve() {
return this;
@@ -27,48 +20,48 @@ export class ObserverStorage<TSchema extends Document = Document> extends Storag
return this.#documents.has(id);
}
async insertOne(data: Partial<TSchema>): Promise<InsertOneResult> {
const document = { ...data, id: data.id ?? crypto.randomUUID() } as WithId<TSchema>;
if (await this.has(document.id)) {
async insertOne(values: AnyDocument): Promise<InsertResult> {
const document = getDocumentWithPrimaryKey(this.primaryKey, values);
if (await this.has(document[this.primaryKey])) {
throw new DuplicateDocumentError(document, this as any);
}
this.#documents.set(document.id, document);
this.#documents.set(document[this.primaryKey], document);
return getInsertOneResult(document);
}
async insertMany(documents: Partial<TSchema>[]): Promise<InsertManyResult> {
async insertMany(list: TSchema[]): Promise<InsertResult> {
const result: TSchema[] = [];
for (const data of documents) {
const document = { ...data, id: data.id ?? crypto.randomUUID() } as WithId<TSchema>;
for (const values of list) {
const document = getDocumentWithPrimaryKey(this.primaryKey, values);
result.push(document);
this.#documents.set(document.id, document);
}
return getInsertManyResult(result);
}
async findById(id: string): Promise<WithId<TSchema> | undefined> {
async findById(id: string): Promise<TSchema | undefined> {
return this.#documents.get(id);
}
async find(filter?: Filter<WithId<TSchema>>, options?: Options): Promise<WithId<TSchema>[]> {
async find(filter?: Filter<TSchema>, options?: QueryOptions): Promise<TSchema[]> {
let cursor = new Query(filter ?? {}).find<TSchema>(Array.from(this.#documents.values()));
if (options !== undefined) {
cursor = addOptions(cursor, options);
}
return cursor.all() as WithId<TSchema>[];
return cursor.all();
}
async updateOne(
filter: Filter<WithId<TSchema>>,
expr: UpdateExpression,
arrayFilters?: Filter<WithId<TSchema>>[],
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
filter: Filter<TSchema>,
modifier: Modifier<TSchema>,
arrayFilters?: Filter<TSchema>[],
condition?: Criteria<TSchema>,
options: { cloneMode?: CloneMode; queryOptions?: Partial<Options> } = { cloneMode: "deep" },
): Promise<UpdateResult> {
const query = new Query(filter);
for (const document of Array.from(this.#documents.values())) {
if (query.test(document) === true) {
const modified = update(document, expr, arrayFilters, condition, options);
const modified = update(document, modifier, arrayFilters, condition, options);
if (modified.length > 0) {
this.#documents.set(document.id, document);
this.broadcast("updateOne", document);
@@ -81,15 +74,15 @@ export class ObserverStorage<TSchema extends Document = Document> extends Storag
}
async updateMany(
filter: Filter<WithId<TSchema>>,
expr: UpdateExpression,
arrayFilters?: Filter<WithId<TSchema>>[],
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
filter: Filter<TSchema>,
modifier: Modifier<TSchema>,
arrayFilters?: Filter<TSchema>[],
condition?: Criteria<TSchema>,
options: { cloneMode?: CloneMode; queryOptions?: Partial<Options> } = { cloneMode: "deep" },
): Promise<UpdateResult> {
const query = new Query(filter);
const documents: WithId<TSchema>[] = [];
const documents: TSchema[] = [];
let matchedCount = 0;
let modifiedCount = 0;
@@ -97,7 +90,7 @@ export class ObserverStorage<TSchema extends Document = Document> extends Storag
for (const document of Array.from(this.#documents.values())) {
if (query.test(document) === true) {
matchedCount += 1;
const modified = update(filter, expr, arrayFilters, condition, options);
const modified = update(document, modifier, arrayFilters, condition, options);
if (modified.length > 0) {
modifiedCount += 1;
documents.push(document);
@@ -111,10 +104,10 @@ export class ObserverStorage<TSchema extends Document = Document> extends Storag
return new UpdateResult(matchedCount, modifiedCount);
}
async replace(filter: Filter<WithId<TSchema>>, document: WithId<TSchema>): Promise<UpdateResult> {
async replace(filter: Filter<TSchema>, document: TSchema): Promise<UpdateResult> {
const query = new Query(filter);
const documents: WithId<TSchema>[] = [];
const documents: TSchema[] = [];
let matchedCount = 0;
let modifiedCount = 0;
@@ -131,7 +124,7 @@ export class ObserverStorage<TSchema extends Document = Document> extends Storag
return new UpdateResult(matchedCount, modifiedCount);
}
async remove(filter: Filter<WithId<TSchema>>): Promise<RemoveResult> {
async remove(filter: Filter<TSchema>): Promise<RemoveResult> {
const documents = Array.from(this.#documents.values());
const query = new Query(filter);
let count = 0;
@@ -144,8 +137,8 @@ export class ObserverStorage<TSchema extends Document = Document> extends Storag
return new RemoveResult(count);
}
async count(filter?: Filter<WithId<TSchema>>): Promise<number> {
return new Query(filter ?? {}).find(Array.from(this.#documents.values())).count();
async count(filter?: Filter<TSchema>): Promise<number> {
return new Query(filter ?? {}).find(Array.from(this.#documents.values())).all().length;
}
async flush(): Promise<void> {

View File

@@ -1,6 +1,23 @@
export type Registrars = {
/**
* Name of the collection.
*/
name: string;
/**
* Set the primary key of the collection.
* Default: "id"
*/
primaryKey?: string;
/**
* List of custom indexes for the collection.
*/
indexes?: Index[];
};
type Index = [string, any?];
type Index = [IndexKey, IndexOptions?];
type IndexKey = string;
type IndexOptions = { unique: boolean };

View File

@@ -1,5 +1,5 @@
import { Collection } from "../collection.ts";
import { Document, Filter, WithId } from "../types.ts";
import type { Collection } from "../collection.ts";
import type { Document, Filter, WithId } from "../types.ts";
import { isMatch } from "./is-match.ts";
export function observeOne<TSchema extends Document = Document>(

View File

@@ -1,23 +1,24 @@
import { Query } from "mingo";
import type { AnyObject, Criteria } from "mingo/types";
import { Collection } from "../collection.ts";
import { addOptions, ChangeEvent, Options } from "../storage/mod.ts";
import { Document, Filter, WithId } from "../types.ts";
import type { Collection } from "../collection.ts";
import { addOptions, type ChangeEvent, type QueryOptions } from "../storage/mod.ts";
import type { AnyDocument } from "../types.ts";
import { Store } from "./store.ts";
export function observe<TSchema extends Document = Document>(
collection: Collection<TSchema>,
filter: Filter<WithId<TSchema>>,
options: Options | undefined,
onChange: (documents: WithId<TSchema>[], changed: WithId<TSchema>[], type: ChangeEvent<TSchema>["type"]) => void,
export function observe<TCollection extends Collection, TSchema extends AnyObject = TCollection["$schema"]>(
collection: TCollection,
condition: Criteria<TSchema>,
options: QueryOptions | undefined,
onChange: (documents: TSchema[], changed: TSchema[], type: ChangeEvent["type"]) => void,
): {
unsubscribe: () => void;
} {
const store = Store.create<TSchema>();
const store = Store.create();
let debounce: any;
collection.find(filter, options).then(async (documents) => {
collection.find(condition, options).then(async (documents) => {
const resolved = await store.resolve(documents);
onChange(resolved, resolved, "insertMany");
});
@@ -29,17 +30,17 @@ export function observe<TSchema extends Document = Document>(
onChange([], [], "remove");
}),
collection.observable.change.subscribe(async ({ type, data }) => {
let changed: WithId<TSchema>[] = [];
let changed: AnyObject[] = [];
switch (type) {
case "insertOne":
case "updateOne": {
changed = await store[type](data, filter);
changed = await store[type](data, condition);
break;
}
case "insertMany":
case "updateMany":
case "remove": {
changed = await store[type](data, filter);
changed = await store[type](data, condition);
break;
}
}
@@ -64,12 +65,9 @@ export function observe<TSchema extends Document = Document>(
};
}
function applyQueryOptions<TSchema extends Document = Document>(
documents: WithId<TSchema>[],
options?: Options,
): WithId<TSchema>[] {
function applyQueryOptions(documents: AnyDocument[], options?: QueryOptions): AnyDocument[] {
if (options !== undefined) {
return addOptions(new Query({}).find<TSchema>(documents), options).all() as WithId<TSchema>[];
return addOptions(new Query({}).find<AnyDocument>(documents), options).all();
}
return documents;
}

View File

@@ -1,29 +1,29 @@
import { ObserverStorage } from "../databases/observer/storage.ts";
import { Storage } from "../storage/mod.ts";
import { Document, Filter, WithId } from "../types.ts";
import type { Storage } from "../storage/mod.ts";
import type { AnyDocument } from "../types.ts";
import { isMatch } from "./is-match.ts";
export class Store<TSchema extends Document = Document> {
private constructor(private storage: Storage<TSchema>) {}
export class Store {
private constructor(private storage: Storage) {}
static create<TSchema extends Document = Document>() {
return new Store<TSchema>(new ObserverStorage<TSchema>(`observer[${crypto.randomUUID()}]`));
static create() {
return new Store(new ObserverStorage(`observer[${crypto.randomUUID()}]`));
}
get destroy() {
return this.storage.destroy.bind(this.storage);
}
async resolve(documents: WithId<TSchema>[]): Promise<WithId<TSchema>[]> {
async resolve(documents: AnyDocument[]): Promise<AnyDocument[]> {
await this.storage.insertMany(documents);
return this.getDocuments();
}
async getDocuments(): Promise<WithId<TSchema>[]> {
async getDocuments(): Promise<AnyDocument[]> {
return this.storage.find();
}
async insertMany(documents: WithId<TSchema>[], filter: Filter<WithId<TSchema>>): Promise<WithId<TSchema>[]> {
async insertMany(documents: AnyDocument[], filter: Filter<AnyDocument>): Promise<AnyDocument[]> {
const matched = [];
for (const document of documents) {
matched.push(...(await this.insertOne(document, filter)));
@@ -31,15 +31,15 @@ export class Store<TSchema extends Document = Document> {
return matched;
}
async insertOne(document: WithId<TSchema>, filter: Filter<WithId<TSchema>>): Promise<WithId<TSchema>[]> {
if (isMatch<TSchema>(document, filter)) {
async insertOne(document: AnyDocument, filter: Filter<AnyDocument>): Promise<AnyDocument[]> {
if (isMatch<AnyDocument>(document, filter)) {
await this.storage.insertOne(document);
return [document];
}
return [];
}
async updateMany(documents: WithId<TSchema>[], filter: Filter<WithId<TSchema>>): Promise<WithId<TSchema>[]> {
async updateMany(documents: AnyDocument[], filter: Filter<AnyDocument>): Promise<AnyDocument[]> {
const matched = [];
for (const document of documents) {
matched.push(...(await this.updateOne(document, filter)));
@@ -47,33 +47,33 @@ export class Store<TSchema extends Document = Document> {
return matched;
}
async updateOne(document: WithId<TSchema>, filter: Filter<WithId<TSchema>>): Promise<WithId<TSchema>[]> {
async updateOne(document: AnyDocument, filter: Filter<AnyDocument>): Promise<AnyDocument[]> {
if (await this.storage.has(document.id)) {
await this.#updateOrRemove(document, filter);
return [document];
} else if (isMatch<TSchema>(document, filter)) {
} else if (isMatch<AnyDocument>(document, filter)) {
await this.storage.insertOne(document);
return [document];
}
return [];
}
async remove(documents: WithId<TSchema>[]): Promise<WithId<TSchema>[]> {
async remove(documents: AnyDocument[]): Promise<AnyDocument[]> {
const matched = [];
for (const document of documents) {
if (isMatch<TSchema>(document, { id: document.id } as WithId<TSchema>)) {
await this.storage.remove({ id: document.id } as WithId<TSchema>);
if (isMatch<AnyDocument>(document, { id: document.id } as AnyDocument)) {
await this.storage.remove({ id: document.id } as AnyDocument);
matched.push(document);
}
}
return matched;
}
async #updateOrRemove(document: WithId<TSchema>, filter: Filter<WithId<TSchema>>): Promise<void> {
if (isMatch<TSchema>(document, filter)) {
await this.storage.replace({ id: document.id } as WithId<TSchema>, document);
async #updateOrRemove(document: AnyDocument, filter: Filter<AnyDocument>): Promise<void> {
if (isMatch<AnyDocument>(document, filter)) {
await this.storage.replace({ id: document.id } as AnyDocument, document);
} else {
await this.storage.remove({ id: document.id } as WithId<TSchema>);
await this.storage.remove({ id: document.id } as AnyDocument);
}
}

8
src/primary-key.ts Normal file
View File

@@ -0,0 +1,8 @@
import type { AnyDocument } from "./types.ts";
export function getDocumentWithPrimaryKey<TPKey extends string>(pkey: TPKey, document: AnyDocument): AnyDocument {
if (Object.hasOwn(document, pkey) === true) {
return document;
}
return { [pkey]: crypto.randomUUID(), ...document };
}

View File

@@ -0,0 +1,33 @@
import type { AnyObject } from "mingo/types";
import { CollectionNotFoundError } from "./errors.ts";
export class Collections {
#collections = new Map<string, Documents>();
has(name: string): boolean {
return this.#collections.has(name);
}
documents(name: string): AnyObject[] {
return Array.from(this.get(name).values());
}
get(name: string): Documents {
const collection = this.#collections.get(name);
if (collection === undefined) {
throw new CollectionNotFoundError(name);
}
return collection;
}
delete(name: string): boolean {
return this.#collections.delete(name);
}
flush() {
this.#collections.clear();
}
}
type Documents = Map<string, AnyObject>;

View File

@@ -1,25 +1,28 @@
import { RawObject } from "mingo/types";
import { Document } from "../types.ts";
import type { Storage } from "./storage.ts";
import type { AnyObject } from "mingo/types";
export class DuplicateDocumentError extends Error {
readonly type = "DuplicateDocumentError";
constructor(
readonly document: Document,
storage: Storage,
readonly collection: string,
readonly document: AnyObject,
) {
super(
`Collection Insert Violation: Document '${document.id}' already exists in ${storage.name} collection ${storage.id}`,
);
super(`Collection Insert Violation: Document '${document.id}' already exists in '${collection}' collection`);
}
}
export class CollectionNotFoundError extends Error {
readonly type = "CollectionNotFoundError";
constructor(readonly collection: string) {
super(`Collection Retrieve Violation: Collection '${collection}' does not exist`);
}
}
export class DocumentNotFoundError extends Error {
readonly type = "DocumentNotFoundError";
constructor(readonly criteria: RawObject) {
constructor(readonly criteria: AnyObject) {
super(`Collection Update Violation: Document matching criteria does not exists`);
}
}

View File

@@ -1,40 +1,4 @@
import type { Document } from "../../types.ts";
export function getInsertManyResult(documents: Document[]): InsertManyResult {
return {
acknowledged: true,
insertedCount: documents.length,
insertedIds: documents.reduce<{ [key: number]: string | number }>((map, document, index) => {
map[index] = document.id;
return map;
}, {}),
};
}
export function getInsertOneResult(document: Document): InsertOneResult {
return {
acknowledged: true,
insertedId: document.id,
};
}
export type InsertManyResult =
| {
acknowledged: false;
}
| {
acknowledged: true;
insertedCount: number;
insertedIds: {
[key: number]: string | number;
};
};
export type InsertOneResult =
| {
acknowledged: false;
}
| {
acknowledged: true;
insertedId: string | number;
};
export type InsertResult = {
insertCount: number;
insertIds: (string | number | symbol)[];
};

View File

@@ -1,3 +0,0 @@
export class RemoveResult {
constructor(readonly matched = 0) {}
}

View File

@@ -1,6 +1,4 @@
export class UpdateResult {
constructor(
readonly matched = 0,
readonly modified = 0,
) {}
}
export type UpdateResult = {
matchedCount: number;
modifiedCount: number;
};

View File

@@ -1,20 +1,19 @@
import { UpdateOptions } from "mingo/core";
import { Cursor } from "mingo/cursor";
import { UpdateExpression } from "mingo/updater";
import type { Cursor } from "mingo/cursor";
import type { AnyObject, Criteria } from "mingo/types";
import type { Modifier } from "mingo/updater";
import { Subject } from "rxjs";
import { BroadcastChannel, StorageBroadcast } from "../broadcast.ts";
import { Document, Filter, WithId } from "../types.ts";
import { InsertManyResult, InsertOneResult } from "./operators/insert.ts";
import { RemoveResult } from "./operators/remove.ts";
import { UpdateResult } from "./operators/update.ts";
import { BroadcastChannel, type StorageBroadcast } from "../broadcast.ts";
import type { Prettify } from "../types.ts";
import type { InsertResult } from "./operators/insert.ts";
import type { UpdateResult } from "./operators/update.ts";
export abstract class Storage<TSchema extends Document = Document> {
export abstract class Storage {
readonly observable: {
change: Subject<ChangeEvent<TSchema>>;
change: Subject<ChangeEvent>;
flush: Subject<void>;
} = {
change: new Subject<ChangeEvent<TSchema>>(),
change: new Subject<ChangeEvent>(),
flush: new Subject<void>(),
};
@@ -22,12 +21,9 @@ export abstract class Storage<TSchema extends Document = Document> {
readonly #channel: BroadcastChannel;
constructor(
readonly name: string,
readonly id: string = crypto.randomUUID(),
) {
constructor(readonly name: string) {
this.#channel = new BroadcastChannel(`valkyr:db:${name}`);
this.#channel.onmessage = ({ data }: MessageEvent<StorageBroadcast<TSchema>>) => {
this.#channel.onmessage = ({ data }: MessageEvent<StorageBroadcast>) => {
if (data.name !== this.name) {
return;
}
@@ -72,7 +68,7 @@ export abstract class Storage<TSchema extends Document = Document> {
|
*/
broadcast(type: StorageBroadcast<TSchema>["type"], data?: TSchema | TSchema[]): void {
broadcast(type: StorageBroadcast["type"], data?: AnyObject | AnyObject[]): void {
switch (type) {
case "flush": {
this.observable.flush.next();
@@ -92,37 +88,23 @@ export abstract class Storage<TSchema extends Document = Document> {
|--------------------------------------------------------------------------------
*/
abstract has(id: string): Promise<boolean>;
abstract insertOne(payload: InsertOnePayload): Promise<InsertResult>;
abstract insertOne(document: Partial<WithId<TSchema>>): Promise<InsertOneResult>;
abstract insertMany(payload: InsertManyPayload): Promise<InsertResult>;
abstract insertMany(documents: Partial<WithId<TSchema>>[]): Promise<InsertManyResult>;
abstract findById(payload: FindByIdPayload): Promise<AnyObject | undefined>;
abstract findById(id: string): Promise<WithId<TSchema> | undefined>;
abstract find(payload: FindPayload): Promise<AnyObject[]>;
abstract find(filter?: Filter<WithId<TSchema>>, options?: Options): Promise<WithId<TSchema>[]>;
abstract updateOne(payload: UpdatePayload): Promise<UpdateResult>;
abstract updateOne(
filter: Filter<WithId<TSchema>>,
expr: UpdateExpression,
arrayFilters?: Filter<WithId<TSchema>>[],
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
): Promise<UpdateResult>;
abstract updateMany(payload: UpdatePayload): Promise<UpdateResult>;
abstract updateMany(
filter: Filter<WithId<TSchema>>,
expr: UpdateExpression,
arrayFilters?: Filter<WithId<TSchema>>[],
condition?: Filter<WithId<TSchema>>,
options?: UpdateOptions,
): Promise<UpdateResult>;
abstract replace(payload: ReplacePayload): Promise<UpdateResult>;
abstract replace(filter: Filter<WithId<TSchema>>, document: TSchema): Promise<UpdateResult>;
abstract remove(payload: RemovePayload): Promise<number>;
abstract remove(filter: Filter<WithId<TSchema>>): Promise<RemoveResult>;
abstract count(filter?: Filter<WithId<TSchema>>): Promise<number>;
abstract count(payload: CountPayload): Promise<number>;
abstract flush(): Promise<void>;
@@ -143,9 +125,9 @@ export abstract class Storage<TSchema extends Document = Document> {
|--------------------------------------------------------------------------------
*/
export function addOptions<TSchema extends Document = Document>(
export function addOptions<TSchema extends AnyObject = AnyObject>(
cursor: Cursor<TSchema>,
options: Options,
options: QueryOptions,
): Cursor<TSchema> {
if (options.sort) {
cursor.sort(options.sort);
@@ -167,17 +149,17 @@ export function addOptions<TSchema extends Document = Document>(
type Status = "loading" | "ready";
export type ChangeEvent<TSchema extends Document = Document> =
export type ChangeEvent =
| {
type: "insertOne" | "updateOne";
data: WithId<TSchema>;
data: AnyObject;
}
| {
type: "insertMany" | "updateMany" | "remove";
data: WithId<TSchema>[];
data: AnyObject[];
};
export type Options = {
export type QueryOptions = {
sort?: {
[key: string]: 1 | -1;
};
@@ -197,3 +179,68 @@ export type Options = {
export type Index = {
[index: string]: any;
};
export type InsertOnePayload = Prettify<
CollectionPayload &
PrimaryKeyPayload & {
values: AnyObject;
}
>;
export type InsertManyPayload = Prettify<
CollectionPayload &
PrimaryKeyPayload & {
values: AnyObject[];
}
>;
export type FindByIdPayload = Prettify<
CollectionPayload & {
id: string;
}
>;
export type FindPayload = Prettify<
CollectionPayload & {
condition?: Criteria<AnyObject>;
options?: QueryOptions;
}
>;
export type UpdatePayload = Prettify<
CollectionPayload &
PrimaryKeyPayload & {
condition: Criteria<AnyObject>;
modifier: Modifier<AnyObject>;
arrayFilters?: AnyObject[];
}
>;
export type ReplacePayload = Prettify<
CollectionPayload &
PrimaryKeyPayload & {
condition: Criteria<AnyObject>;
document: AnyObject;
}
>;
export type RemovePayload = Prettify<
CollectionPayload &
PrimaryKeyPayload & {
condition: Criteria<AnyObject>;
}
>;
export type CountPayload = Prettify<
CollectionPayload & {
condition?: Criteria<AnyObject>;
}
>;
type CollectionPayload = {
collection: string;
};
type PrimaryKeyPayload = {
pkey: string;
};

View File

@@ -1,12 +1,8 @@
import type { BSONRegExp, BSONType } from "bson";
export type Document = {
[key: string]: any;
};
export type Prettify<T> = { [K in keyof T]: T[K] } & {};
export type WithId<TSchema> = {
id: string;
} & TSchema;
export type AnyDocument = Record<string, any>;
export type Filter<TSchema> = {
[P in keyof TSchema]?: Condition<TSchema[P]>;
@@ -129,12 +125,11 @@ type Flatten<Type> = Type extends ReadonlyArray<infer Item> ? Item : Type;
type IsAny<Type, ResultIfAny, ResultIfNotAny> = true extends false & Type ? ResultIfAny : ResultIfNotAny;
type FilterOperations<T> =
T extends Record<string, any>
? {
[key in keyof T]?: FilterOperators<T[key]>;
}
: FilterOperators<T>;
type FilterOperations<T> = T extends Record<string, any>
? {
[key in keyof T]?: FilterOperators<T[key]>;
}
: FilterOperators<T>;
type ArrayOperator<Type> = {
$each?: Array<Flatten<Type>>;