From 6da78b043d353c45222fd39c8fee935247dbd074 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoffer=20R=C3=B8dvik?= Date: Mon, 11 Aug 2025 14:04:38 +0200 Subject: [PATCH] feat: encapsulate snapshot --- libraries/aggregate.ts | 2 +- libraries/event-store.ts | 215 ++++++++++++++++++--------------- tests/store/create-snapshot.ts | 2 +- tests/store/reduce.ts | 2 +- 4 files changed, 118 insertions(+), 103 deletions(-) diff --git a/libraries/aggregate.ts b/libraries/aggregate.ts index a37eb00..2d14b12 100644 --- a/libraries/aggregate.ts +++ b/libraries/aggregate.ts @@ -153,7 +153,7 @@ export abstract class AggregateRoot { } await this.save(); const reducer = this.#store.aggregate.reducer(this.#self); - await this.#store.createSnapshot({ + await this.#store.snapshot.create({ name: this.#self.name, stream, reducer, diff --git a/libraries/event-store.ts b/libraries/event-store.ts index fc3f5a2..d59da2b 100644 --- a/libraries/event-store.ts +++ b/libraries/event-store.ts @@ -121,6 +121,14 @@ export class EventStore>[], @@ -141,6 +149,11 @@ export class EventStore>( aggregate: TAggregate, @@ -159,6 +172,11 @@ export class EventStore>( aggregate: TAggregate, @@ -177,6 +195,11 @@ export class EventStore>( aggregate: TAggregate, @@ -189,6 +212,12 @@ export class EventStore>( aggregate: TAggregate, @@ -382,18 +411,11 @@ export class EventStore( { name, stream, relation, reducer, ...query }: ReduceQuery, @@ -404,7 +426,7 @@ export class EventStore | undefined; let cursor: string | undefined; - const snapshot = await this.getSnapshot(name, id); + const snapshot = await this.snapshot.get(name, id); if (snapshot !== undefined) { cursor = snapshot.cursor; state = snapshot.state; @@ -436,99 +458,92 @@ export class EventStore({ - name, - stream, - relation, - reducer, - ...query - }: ReduceQuery): Promise { - const id = stream ?? relation; - const events = - stream !== undefined ? await this.getEventsByStreams([id], query) : await this.getEventsByRelations([id], query); - if (events.length === 0) { - return undefined; - } - await this.snapshots.insert(name, id, events.at(-1)!.created, reducer.reduce(events)); - } + readonly snapshot = { + /** + * Create a new snapshot for the given stream/relation and reducer. + * + * @param query - Reducer query to create snapshot from. + * + * @example + * ```ts + * await eventStore.createSnapshot({ stream, reducer }); + * await eventStore.createSnapshot({ relation: `foo:${foo}:bars`, reducer }); + * ``` + */ + create: async ({ + name, + stream, + relation, + reducer, + ...query + }: ReduceQuery): Promise => { + const id = stream ?? relation; + const events = + stream !== undefined + ? await this.getEventsByStreams([id], query) + : await this.getEventsByRelations([id], query); + if (events.length === 0) { + return undefined; + } + await this.snapshots.insert(name, id, events.at(-1)!.created, reducer.reduce(events)); + }, - /** - * Get an entity state snapshot from the database. These are useful for when we - * want to reduce the amount of events that has to be processed when fetching - * state history for a reducer. - * - * @param streamOrRelation - Stream, or Relation to get snapshot for. - * @param reducer - Reducer to get snapshot for. - * - * @example - * ```ts - * const snapshot = await eventStore.getSnapshot("foo:reducer", stream); - * console.log(snapshot); - * // { - * // cursor: "jxubdY-0", - * // state: { - * // foo: "bar" - * // } - * // } - * ``` - * - * @example - * ```ts - * const snapshot = await eventStore.getSnapshot("foo:reducer", `foo:${foo}:bars`); - * console.log(snapshot); - * // { - * // cursor: "jxubdY-0", - * // state: { - * // count: 1 - * // } - * // } - * ``` - */ - async getSnapshot>( - name: string, - streamOrRelation: string, - ): Promise<{ cursor: string; state: TState } | undefined> { - const snapshot = await this.snapshots.getByStream(name, streamOrRelation); - if (snapshot === undefined) { - return undefined; - } - return { cursor: snapshot.cursor, state: snapshot.state as TState }; - } + /** + * Get an entity state snapshot from the database. These are useful for when we + * want to reduce the amount of events that has to be processed when fetching + * state history for a reducer. + * + * @param streamOrRelation - Stream, or Relation to get snapshot for. + * @param reducer - Reducer to get snapshot for. + * + * @example + * ```ts + * const snapshot = await eventStore.getSnapshot("foo:reducer", stream); + * console.log(snapshot); + * // { + * // cursor: "jxubdY-0", + * // state: { + * // foo: "bar" + * // } + * // } + * + * const snapshot = await eventStore.getSnapshot("foo:reducer", `foo:${foo}:bars`); + * console.log(snapshot); + * // { + * // cursor: "jxubdY-0", + * // state: { + * // count: 1 + * // } + * // } + * ``` + */ + get: async >( + name: string, + streamOrRelation: string, + ): Promise<{ cursor: string; state: TState } | undefined> => { + const snapshot = await this.snapshots.getByStream(name, streamOrRelation); + if (snapshot === undefined) { + return undefined; + } + return { cursor: snapshot.cursor, state: snapshot.state as TState }; + }, - /** - * Delete a snapshot. - * - * @param streamOrRelation - Stream, or Relation to delete snapshot for. - * @param reducer - Reducer to remove snapshot for. - * - * @example - * ```ts - * await eventStore.deleteSnapshot("foo:reducer", stream); - * ``` - * - * @example - * ```ts - * await eventStore.deleteSnapshot("foo:reducer", `foo:${foo}:bars`); - * ``` - */ - async deleteSnapshot(name: string, streamOrRelation: string): Promise { - await this.snapshots.remove(name, streamOrRelation); - } + /** + * Delete a snapshot. + * + * @param streamOrRelation - Stream, or Relation to delete snapshot for. + * @param reducer - Reducer to remove snapshot for. + * + * @example + * ```ts + * await eventStore.deleteSnapshot("foo:reducer", stream); + * await eventStore.deleteSnapshot("foo:reducer", `foo:${foo}:bars`); + * ``` + */ + delete: async (name: string, streamOrRelation: string): Promise => { + await this.snapshots.remove(name, streamOrRelation); + }, + }; } /* diff --git a/tests/store/create-snapshot.ts b/tests/store/create-snapshot.ts index 76e75c6..0de245f 100644 --- a/tests/store/create-snapshot.ts +++ b/tests/store/create-snapshot.ts @@ -49,7 +49,7 @@ export default describe(".createSnapshot", (getEventStore) => { }), ); - await store.createSnapshot({ name: "user", stream, reducer: userReducer }); + await store.snapshot.create({ name: "user", stream, reducer: userReducer }); const snapshot = await store.snapshots.getByStream("user", stream); diff --git a/tests/store/reduce.ts b/tests/store/reduce.ts index bf48b04..a834311 100644 --- a/tests/store/reduce.ts +++ b/tests/store/reduce.ts @@ -81,7 +81,7 @@ export default describe(".reduce", (getEventStore) => { }), ); - await store.createSnapshot({ name: "user", stream, reducer: userReducer }); + await store.snapshot.create({ name: "user", stream, reducer: userReducer }); const state = await store.reduce({ name: "user", stream, reducer: userReducer });