feat: version 2 beta
This commit is contained in:
131
adapters/mongo/providers/events.ts
Normal file
131
adapters/mongo/providers/events.ts
Normal file
@@ -0,0 +1,131 @@
|
||||
import type { Collection, FindCursor } from "mongodb";
|
||||
|
||||
import { EventRecord } from "../../../libraries/event.ts";
|
||||
import type { EventsProvider } from "../../../types/adapter.ts";
|
||||
import type { EventReadOptions } from "../../../types/query.ts";
|
||||
import { type EventSchema, schema } from "../collections/events.ts";
|
||||
import { DatabaseAccessor } from "../types.ts";
|
||||
import { toParsedRecord, toParsedRecords } from "../utilities.ts";
|
||||
|
||||
export class MongoEventsProvider implements EventsProvider {
|
||||
readonly #accessor: DatabaseAccessor;
|
||||
|
||||
constructor(accessor: DatabaseAccessor) {
|
||||
this.#accessor = accessor;
|
||||
}
|
||||
|
||||
get collection(): Collection<EventSchema> {
|
||||
return this.#accessor.db.collection<EventSchema>("events");
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert a new event record to the events table.
|
||||
*
|
||||
* @param record - Event record to insert.
|
||||
* @param tx - Transaction to insert the record within. (Optional)
|
||||
*/
|
||||
async insert(record: EventRecord): Promise<void> {
|
||||
await this.collection.insertOne(record, { forceServerObjectId: true });
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert many new event records to the events table.
|
||||
*
|
||||
* @param records - Event records to insert.
|
||||
*/
|
||||
async insertMany(records: EventRecord[]): Promise<void> {
|
||||
await this.collection.insertMany(records, { forceServerObjectId: true });
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve all the events in the events table. Optionally a cursor and direction
|
||||
* can be provided to reduce the list of events returned.
|
||||
*
|
||||
* @param options - Find options.
|
||||
*/
|
||||
async get(options: EventReadOptions = {}): Promise<EventRecord[]> {
|
||||
return (await this.#withReadOptions(this.collection.find(this.#withFilters(options)), options)
|
||||
.sort({ created: 1 })
|
||||
.toArray()
|
||||
.then(toParsedRecords(schema))) as EventRecord[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Get events within the given stream.
|
||||
*
|
||||
* @param stream - Stream to fetch events for.
|
||||
* @param options - Read options for modifying the result.
|
||||
*/
|
||||
async getByStream(stream: string, options: EventReadOptions = {}): Promise<EventRecord[]> {
|
||||
return (await this.#withReadOptions(this.collection.find({ stream, ...this.#withFilters(options) }), options)
|
||||
.sort({ created: 1 })
|
||||
.toArray()
|
||||
.then(toParsedRecords(schema))) as EventRecord[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Get events within given list of streams.
|
||||
*
|
||||
* @param streams - Stream to get events for.
|
||||
* @param options - Read options for modifying the result.
|
||||
*/
|
||||
async getByStreams(streams: string[], options: EventReadOptions = {}): Promise<EventRecord[]> {
|
||||
return (await this.#withReadOptions(this.collection.find({ stream: { $in: streams }, ...this.#withFilters(options) }), options)
|
||||
.sort({ created: 1 })
|
||||
.toArray()
|
||||
.then(toParsedRecords(schema))) as EventRecord[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a single event by its id.
|
||||
*
|
||||
* @param id - Event id.
|
||||
*/
|
||||
async getById(id: string): Promise<EventRecord | undefined> {
|
||||
return (await this.collection.findOne({ id }).then(toParsedRecord(schema))) as EventRecord | undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the given event is outdated in relation to the local event data.
|
||||
*
|
||||
* @param event - Event record to check for outdated state for.
|
||||
*/
|
||||
async checkOutdated({ stream, type, created }: EventRecord): Promise<boolean> {
|
||||
const count = await this.collection.countDocuments({
|
||||
stream,
|
||||
type,
|
||||
created: {
|
||||
$gt: created,
|
||||
},
|
||||
});
|
||||
return count > 0;
|
||||
}
|
||||
|
||||
/*
|
||||
|--------------------------------------------------------------------------------
|
||||
| Utilities
|
||||
|--------------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#withFilters({ filter }: EventReadOptions): { type?: { $in: string[] } } {
|
||||
const types = filter?.types;
|
||||
if (types !== undefined) {
|
||||
return { type: { $in: types } };
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
#withReadOptions(fc: FindCursor, { cursor, direction, limit }: EventReadOptions): FindCursor {
|
||||
if (cursor !== undefined) {
|
||||
if (direction === "desc" || direction === -1) {
|
||||
fc.filter({ created: { $lt: cursor } });
|
||||
} else {
|
||||
fc.filter({ created: { $gt: cursor } });
|
||||
}
|
||||
}
|
||||
if (limit !== undefined) {
|
||||
fc.limit(limit);
|
||||
}
|
||||
return fc;
|
||||
}
|
||||
}
|
||||
168
adapters/mongo/providers/relations.ts
Normal file
168
adapters/mongo/providers/relations.ts
Normal file
@@ -0,0 +1,168 @@
|
||||
import type { Collection } from "mongodb";
|
||||
|
||||
import { Relation, RelationPayload, RelationsProvider } from "../../../types/adapter.ts";
|
||||
import { type RelationSchema, schema } from "../collections/relations.ts";
|
||||
import { DatabaseAccessor } from "../types.ts";
|
||||
import { toParsedRecord, toParsedRecords } from "../utilities.ts";
|
||||
|
||||
export class MongoRelationsProvider implements RelationsProvider {
|
||||
readonly #accessor: DatabaseAccessor;
|
||||
|
||||
constructor(accessor: DatabaseAccessor) {
|
||||
this.#accessor = accessor;
|
||||
}
|
||||
|
||||
get collection(): Collection<RelationSchema> {
|
||||
return this.#accessor.db.collection<RelationSchema>("relations");
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming relation operations.
|
||||
*
|
||||
* @param relations - List of relation operations to execute.
|
||||
*/
|
||||
async handle(relations: Relation[]): Promise<void> {
|
||||
await Promise.all([
|
||||
this.insertMany(relations.filter((relation) => relation.op === "insert")),
|
||||
this.removeMany(relations.filter((relation) => relation.op === "remove")),
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add stream to the relations table.
|
||||
*
|
||||
* @param key - Relational key to add stream to.
|
||||
* @param stream - Stream to add to the key.
|
||||
*/
|
||||
async insert(key: string, stream: string): Promise<void> {
|
||||
await this.collection.updateOne({ key }, { $addToSet: { streams: stream } }, { upsert: true });
|
||||
}
|
||||
|
||||
/**
|
||||
* Add stream to many relational keys onto the relations table.
|
||||
*
|
||||
* @param relations - Relations to insert.
|
||||
*/
|
||||
async insertMany(relations: RelationPayload[], batchSize = 1_000): Promise<void> {
|
||||
const reduced = relations.reduce((map, { key, stream }) => {
|
||||
if (map.has(key) === false) {
|
||||
map.set(key, new Set<string>());
|
||||
}
|
||||
map.get(key)!.add(stream);
|
||||
return map;
|
||||
}, new Map<string, Set<string>>());
|
||||
|
||||
const bulkOps = [];
|
||||
for (const [key, streams] of reduced) {
|
||||
bulkOps.push({
|
||||
updateOne: {
|
||||
filter: { key },
|
||||
update: { $addToSet: { streams: { $each: Array.from(streams) } } },
|
||||
upsert: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
for (let i = 0; i < bulkOps.length; i += batchSize) {
|
||||
await this.collection.bulkWrite(bulkOps.slice(i, i + batchSize), { ordered: false });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a list of event streams registered under the given relational key.
|
||||
*
|
||||
* @param key - Relational key to get event streams for.
|
||||
*/
|
||||
async getByKey(key: string): Promise<string[]> {
|
||||
const relations = await this.collection.findOne({ key }).then(toParsedRecord(schema));
|
||||
if (relations === undefined) {
|
||||
return [];
|
||||
}
|
||||
return relations.streams;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a list of event streams registered under the given relational keys.
|
||||
*
|
||||
* @param keys - Relational keys to get event streams for.
|
||||
*/
|
||||
async getByKeys(keys: string[]): Promise<string[]> {
|
||||
const streams = new Set<string>();
|
||||
|
||||
const documents = await this.collection
|
||||
.find({ key: { $in: keys } })
|
||||
.toArray()
|
||||
.then(toParsedRecords(schema));
|
||||
documents.forEach((document) => {
|
||||
for (const stream of document.streams) {
|
||||
streams.add(stream);
|
||||
}
|
||||
});
|
||||
|
||||
return Array.from(streams);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a stream from the relational table.
|
||||
*
|
||||
* @param key - Relational key to remove stream from.
|
||||
* @param stream - Stream to remove from relation.
|
||||
*/
|
||||
async remove(key: string, stream: string): Promise<void> {
|
||||
await this.collection.updateOne({ key }, { $pull: { streams: stream } });
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes multiple relational entries.
|
||||
*
|
||||
* @param relations - Relations to remove stream from.
|
||||
*/
|
||||
async removeMany(relations: RelationPayload[], batchSize = 1_000): Promise<void> {
|
||||
const reduced = relations.reduce((map, { key, stream }) => {
|
||||
if (!map.has(key)) {
|
||||
map.set(key, new Set());
|
||||
}
|
||||
map.get(key)!.add(stream);
|
||||
return map;
|
||||
}, new Map<string, Set<string>>());
|
||||
|
||||
const bulkOps = [];
|
||||
for (const [key, streams] of reduced) {
|
||||
bulkOps.push({
|
||||
updateOne: {
|
||||
filter: { key },
|
||||
update: { $pull: { streams: { $in: Array.from(streams) } } },
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
for (let i = 0; i < bulkOps.length; i += batchSize) {
|
||||
await this.collection.bulkWrite(bulkOps.slice(i, i + batchSize), { ordered: false });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all relations bound to the given relational keys.
|
||||
*
|
||||
* @param keys - Relational keys to remove from the relational table.
|
||||
*/
|
||||
async removeByKeys(keys: string[]): Promise<void> {
|
||||
await this.collection.deleteMany({ key: { $in: keys } });
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all relations bound to the given streams.
|
||||
*
|
||||
* @param streams - Streams to remove from the relational table.
|
||||
*/
|
||||
async removeByStreams(streams: string[]): Promise<void> {
|
||||
await this.collection.bulkWrite(
|
||||
streams.map((stream) => ({
|
||||
updateOne: {
|
||||
filter: { streams: stream },
|
||||
update: { $pull: { streams: stream } },
|
||||
},
|
||||
})),
|
||||
);
|
||||
}
|
||||
}
|
||||
50
adapters/mongo/providers/snapshots.ts
Normal file
50
adapters/mongo/providers/snapshots.ts
Normal file
@@ -0,0 +1,50 @@
|
||||
import type { Collection } from "mongodb";
|
||||
|
||||
import { SnapshotsProvider } from "../../../types/adapter.ts";
|
||||
import { schema, type SnapshotSchema } from "../collections/snapshots.ts";
|
||||
import { DatabaseAccessor } from "../types.ts";
|
||||
import { toParsedRecord } from "../utilities.ts";
|
||||
|
||||
export class MongoSnapshotsProvider implements SnapshotsProvider {
|
||||
readonly #accessor: DatabaseAccessor;
|
||||
|
||||
constructor(accessor: DatabaseAccessor) {
|
||||
this.#accessor = accessor;
|
||||
}
|
||||
|
||||
get collection(): Collection<SnapshotSchema> {
|
||||
return this.#accessor.db.collection<SnapshotSchema>("snapshots");
|
||||
}
|
||||
|
||||
/**
|
||||
* Add snapshot state under given reducer stream.
|
||||
*
|
||||
* @param name - Name of the reducer the snapshot is attached to.
|
||||
* @param stream - Stream the snapshot is attached to.
|
||||
* @param cursor - Cursor timestamp for the last event used in the snapshot.
|
||||
* @param state - State of the reduced events.
|
||||
*/
|
||||
async insert(name: string, stream: string, cursor: string, state: Record<string, unknown>): Promise<void> {
|
||||
await this.collection.updateOne({ name }, { $set: { stream, cursor, state } }, { upsert: true });
|
||||
}
|
||||
|
||||
/**
|
||||
* Get snapshot state by stream.
|
||||
*
|
||||
* @param name - Name of the reducer which the state was created.
|
||||
* @param stream - Stream the state was reduced for.
|
||||
*/
|
||||
async getByStream(name: string, stream: string): Promise<SnapshotSchema | undefined> {
|
||||
return this.collection.findOne({ name, stream }).then(toParsedRecord(schema));
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a snapshot for the given reducer stream.
|
||||
*
|
||||
* @param name - Name of the reducer the snapshot is attached to.
|
||||
* @param stream - Stream to remove from snapshots.
|
||||
*/
|
||||
async remove(name: string, stream: string): Promise<void> {
|
||||
await this.collection.deleteOne({ name, stream });
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user