diff --git a/adapters/postgres/database.ts b/adapters/postgres/database.ts index b3dd20d..39760e0 100644 --- a/adapters/postgres/database.ts +++ b/adapters/postgres/database.ts @@ -1,4 +1,4 @@ -import postgres, { type Sql } from "postgres"; +import postgres, { type Sql, TransactionSql } from "postgres"; import { PostgresConnection } from "./connection.ts"; @@ -34,3 +34,7 @@ export class PostgresDatabase { export type DatabaseAccessor = { sql: Sql; }; + +export type Options = { + tx?: TransactionSql; +}; diff --git a/adapters/postgres/providers/event.ts b/adapters/postgres/providers/event.ts index 274f49e..c8fbe30 100644 --- a/adapters/postgres/providers/event.ts +++ b/adapters/postgres/providers/event.ts @@ -3,7 +3,7 @@ import type { Helper } from "postgres"; import type { EventRecord } from "../../../libraries/event.ts"; import type { EventsProvider } from "../../../types/adapter.ts"; import type { EventReadOptions } from "../../../types/query.ts"; -import type { PostgresDatabase } from "../database.ts"; +import type { Options, PostgresDatabase } from "../database.ts"; type PGEventRecord = Omit & { data: string; meta: string }; @@ -25,8 +25,8 @@ export class PostgresEventsProvider implements EventsProvider { * * @param record - Event record to insert. */ - async insert(record: EventRecord): Promise { - await this.db.sql`INSERT INTO ${this.table} ${this.db.sql(this.#toDriver(record))}`.catch((error) => { + async insert(record: EventRecord, { tx }: Options = {}): Promise { + await (tx ?? this.db.sql)`INSERT INTO ${this.table} ${this.db.sql(this.#toDriver(record))}`.catch((error) => { throw new Error(`EventStore > 'events.insert' failed with postgres error: ${error.message}`); }); } @@ -37,16 +37,22 @@ export class PostgresEventsProvider implements EventsProvider { * @param records - Event records to insert. * @param batchSize - Batch size for the insert loop. */ - async insertMany(records: EventRecord[], batchSize: number = 1_000): Promise { - await this.db.sql - .begin(async (sql) => { - for (let i = 0; i < records.length; i += batchSize) { - await sql`INSERT INTO ${this.table} ${this.db.sql(records.slice(i, i + batchSize).map(this.#toDriver))}`; - } - }) - .catch((error) => { - throw new Error(`EventStore > 'events.insertMany' failed with postgres error: ${error.message}`); - }); + async insertMany(records: EventRecord[], batchSize: number = 1_000, { tx }: Options = {}): Promise { + if (tx !== undefined) { + for (let i = 0; i < records.length; i += batchSize) { + await tx`INSERT INTO ${this.table} ${this.db.sql(records.slice(i, i + batchSize).map(this.#toDriver))}`; + } + } else { + await this.db.sql + .begin(async (sql) => { + for (let i = 0; i < records.length; i += batchSize) { + await sql`INSERT INTO ${this.table} ${this.db.sql(records.slice(i, i + batchSize).map(this.#toDriver))}`; + } + }) + .catch((error) => { + throw new Error(`EventStore > 'events.insertMany' failed with postgres error: ${error.message}`); + }); + } } /** @@ -55,10 +61,10 @@ export class PostgresEventsProvider implements EventsProvider { * * @param options - Find options. */ - async get(options: EventReadOptions): Promise { + async get(options: EventReadOptions, { tx }: Options = {}): Promise { if (options !== undefined) { const { filter, cursor, direction, limit } = options; - return this.db.sql` + return (tx ?? this.db.sql)` SELECT * FROM ${this.table} WHERE ${filter?.types ? this.#withTypes(filter.types) : this.db.sql``} @@ -79,8 +85,9 @@ export class PostgresEventsProvider implements EventsProvider { async getByStream( stream: string, { filter, cursor, direction, limit }: EventReadOptions = {}, + { tx }: Options = {}, ): Promise { - return this.db.sql` + return (tx ?? this.db.sql)` SELECT * FROM ${this.table} WHERE stream = ${stream} @@ -100,8 +107,9 @@ export class PostgresEventsProvider implements EventsProvider { async getByStreams( streams: string[], { filter, cursor, direction, limit }: EventReadOptions = {}, + { tx }: Options = {}, ): Promise { - return this.db.sql` + return (tx ?? this.db.sql)` SELECT * FROM ${this.table} WHERE stream IN ${this.db.sql(streams)} @@ -117,8 +125,8 @@ export class PostgresEventsProvider implements EventsProvider { * * @param id - Event id. */ - async getById(id: string): Promise { - return this.db.sql`SELECT * FROM ${this.table} WHERE id = ${id}` + async getById(id: string, { tx }: Options = {}): Promise { + return (tx ?? this.db.sql)`SELECT * FROM ${this.table} WHERE id = ${id}` .then(this.#fromDriver) .then(([record]) => record); } @@ -126,8 +134,8 @@ export class PostgresEventsProvider implements EventsProvider { /** * Check if the given event is outdated in relation to the local event data. */ - async checkOutdated({ stream, type, created }: EventRecord): Promise { - const count = await await this.db.sql` + async checkOutdated({ stream, type, created }: EventRecord, { tx }: Options = {}): Promise { + const count = await (tx ?? this.db.sql)` SELECT COUNT(*) AS count FROM ${this.table} WHERE diff --git a/adapters/postgres/providers/relations.ts b/adapters/postgres/providers/relations.ts index e827c1c..1d38f2c 100644 --- a/adapters/postgres/providers/relations.ts +++ b/adapters/postgres/providers/relations.ts @@ -1,7 +1,7 @@ import type { Helper } from "postgres"; import type { Relation, RelationPayload, RelationsProvider } from "../../../types/adapter.ts"; -import type { PostgresDatabase } from "../database.ts"; +import type { Options, PostgresDatabase } from "../database.ts"; export class PostgresRelationsProvider implements RelationsProvider { constructor( @@ -34,8 +34,9 @@ export class PostgresRelationsProvider implements RelationsProvider { * @param key - Relational key to add stream to. * @param stream - Stream to add to the key. */ - async insert(key: string, stream: string): Promise { - await this.db.sql`INSERT INTO ${this.table} (key, stream) VALUES (${key}, ${stream}) ON CONFLICT DO NOTHING`.catch( + async insert(key: string, stream: string, { tx }: Options = {}): Promise { + await (tx ?? + this.db.sql)`INSERT INTO ${this.table} (key, stream) VALUES (${key}, ${stream}) ON CONFLICT DO NOTHING`.catch( (error) => { throw new Error(`EventStore > 'relations.insert' failed with postgres error: ${error.message}`); }, @@ -48,17 +49,24 @@ export class PostgresRelationsProvider implements RelationsProvider { * @param relations - Relations to insert. * @param batchSize - Batch size for the insert loop. */ - async insertMany(relations: RelationPayload[], batchSize: number = 1_000): Promise { - await this.db.sql - .begin(async (sql) => { - for (let i = 0; i < relations.length; i += batchSize) { - const values = relations.slice(i, i + batchSize).map(({ key, stream }) => [key, stream]); - await sql`INSERT INTO ${this.table} (key, stream) VALUES ${sql(values)} ON CONFLICT DO NOTHING`; - } - }) - .catch((error) => { - throw new Error(`EventStore > 'relations.insertMany' failed with postgres error: ${error.message}`); - }); + async insertMany(relations: RelationPayload[], batchSize: number = 1_000, { tx }: Options = {}): Promise { + if (tx !== undefined) { + for (let i = 0; i < relations.length; i += batchSize) { + const values = relations.slice(i, i + batchSize).map(({ key, stream }) => [key, stream]); + await tx`INSERT INTO ${this.table} (key, stream) VALUES ${this.db.sql(values)} ON CONFLICT DO NOTHING`; + } + } else { + await this.db.sql + .begin(async (sql) => { + for (let i = 0; i < relations.length; i += batchSize) { + const values = relations.slice(i, i + batchSize).map(({ key, stream }) => [key, stream]); + await sql`INSERT INTO ${this.table} (key, stream) VALUES ${sql(values)} ON CONFLICT DO NOTHING`; + } + }) + .catch((error) => { + throw new Error(`EventStore > 'relations.insertMany' failed with postgres error: ${error.message}`); + }); + } } /** @@ -66,8 +74,8 @@ export class PostgresRelationsProvider implements RelationsProvider { * * @param key - Relational key to get event streams for. */ - async getByKey(key: string): Promise { - return this.db.sql`SELECT stream FROM ${this.table} WHERE key = ${key}` + async getByKey(key: string, { tx }: Options = {}): Promise { + return (tx ?? this.db.sql)`SELECT stream FROM ${this.table} WHERE key = ${key}` .then((rows) => rows.map(({ stream }) => stream)) .catch((error) => { throw new Error(`EventStore > 'relations.getByKey' failed with postgres error: ${error.message}`); @@ -79,8 +87,8 @@ export class PostgresRelationsProvider implements RelationsProvider { * * @param keys - Relational keys to get event streams for. */ - async getByKeys(keys: string[]): Promise { - return this.db.sql`SELECT DISTINCT stream FROM ${this.table} WHERE key IN ${this.db.sql(keys)}` + async getByKeys(keys: string[], { tx }: Options = {}): Promise { + return (tx ?? this.db.sql)`SELECT DISTINCT stream FROM ${this.table} WHERE key IN ${this.db.sql(keys)}` .then((rows) => rows.map(({ stream }) => stream)) .catch((error) => { throw new Error(`EventStore > 'relations.getByKeys' failed with postgres error: ${error.message}`); @@ -93,8 +101,8 @@ export class PostgresRelationsProvider implements RelationsProvider { * @param key - Relational key to remove stream from. * @param stream - Stream to remove from relation. */ - async remove(key: string, stream: string): Promise { - await this.db.sql`DELETE FROM ${this.table} WHERE key = ${key} AND stream = ${stream}`.catch((error) => { + async remove(key: string, stream: string, { tx }: Options = {}): Promise { + await (tx ?? this.db.sql)`DELETE FROM ${this.table} WHERE key = ${key} AND stream = ${stream}`.catch((error) => { throw new Error(`EventStore > 'relations.remove' failed with postgres error: ${error.message}`); }); } @@ -105,19 +113,28 @@ export class PostgresRelationsProvider implements RelationsProvider { * @param relations - Relations to remove stream from. * @param batchSize - Batch size for the insert loop. */ - async removeMany(relations: RelationPayload[], batchSize: number = 1_000): Promise { - await this.db.sql - .begin(async (sql) => { - for (let i = 0; i < relations.length; i += batchSize) { - const conditions = relations - .slice(i, i + batchSize) - .map(({ key, stream }) => `(key = '${key}' AND stream = '${stream}')`); - await sql`DELETE FROM ${this.table} WHERE ${this.db.sql.unsafe(conditions.join(" OR "))}`; - } - }) - .catch((error) => { - throw new Error(`EventStore > 'relations.removeMany' failed with postgres error: ${error.message}`); - }); + async removeMany(relations: RelationPayload[], batchSize: number = 1_000, { tx }: Options = {}): Promise { + if (tx !== undefined) { + for (let i = 0; i < relations.length; i += batchSize) { + const conditions = relations + .slice(i, i + batchSize) + .map(({ key, stream }) => `(key = '${key}' AND stream = '${stream}')`); + await tx`DELETE FROM ${this.table} WHERE ${this.db.sql.unsafe(conditions.join(" OR "))}`; + } + } else { + await this.db.sql + .begin(async (sql) => { + for (let i = 0; i < relations.length; i += batchSize) { + const conditions = relations + .slice(i, i + batchSize) + .map(({ key, stream }) => `(key = '${key}' AND stream = '${stream}')`); + await sql`DELETE FROM ${this.table} WHERE ${this.db.sql.unsafe(conditions.join(" OR "))}`; + } + }) + .catch((error) => { + throw new Error(`EventStore > 'relations.removeMany' failed with postgres error: ${error.message}`); + }); + } } /** @@ -125,8 +142,8 @@ export class PostgresRelationsProvider implements RelationsProvider { * * @param keys - Relational keys to remove from the relational table. */ - async removeByKeys(keys: string[]): Promise { - await this.db.sql`DELETE FROM ${this.table} WHERE key IN ${this.db.sql(keys)}`.catch((error) => { + async removeByKeys(keys: string[], { tx }: Options = {}): Promise { + await (tx ?? this.db.sql)`DELETE FROM ${this.table} WHERE key IN ${this.db.sql(keys)}`.catch((error) => { throw new Error(`EventStore > 'relations.removeByKeys' failed with postgres error: ${error.message}`); }); } @@ -136,8 +153,8 @@ export class PostgresRelationsProvider implements RelationsProvider { * * @param streams - Streams to remove from the relational table. */ - async removeByStreams(streams: string[]): Promise { - await this.db.sql`DELETE FROM ${this.table} WHERE stream IN ${this.db.sql(streams)}`.catch((error) => { + async removeByStreams(streams: string[], { tx }: Options = {}): Promise { + await (tx ?? this.db.sql)`DELETE FROM ${this.table} WHERE stream IN ${this.db.sql(streams)}`.catch((error) => { throw new Error(`EventStore > 'relations.removeByStreams' failed with postgres error: ${error.message}`); }); } diff --git a/adapters/postgres/providers/snapshot.ts b/adapters/postgres/providers/snapshot.ts index 8887294..3c03e98 100644 --- a/adapters/postgres/providers/snapshot.ts +++ b/adapters/postgres/providers/snapshot.ts @@ -1,7 +1,7 @@ import type { Helper } from "postgres"; import type { Snapshot, SnapshotsProvider } from "../../../types/adapter.ts"; -import type { PostgresDatabase } from "../database.ts"; +import type { Options, PostgresDatabase } from "../database.ts"; type PGSnapshot = Omit & { state: string }; @@ -26,8 +26,8 @@ export class PostgresSnapshotsProvider implements SnapshotsProvider { * @param cursor - Cursor timestamp for the last event used in the snapshot. * @param state - State of the reduced events. */ - async insert(name: string, stream: string, cursor: string, state: any): Promise { - await this.db.sql` + async insert(name: string, stream: string, cursor: string, state: any, { tx }: Options = {}): Promise { + await (tx ?? this.db.sql)` INSERT INTO ${this.table} ${this.db.sql(this.#toDriver({ name, stream, cursor, state }))}`.catch((error) => { throw new Error(`EventStore > 'snapshots.insert' failed with postgres error: ${error.message}`); }); @@ -39,8 +39,8 @@ export class PostgresSnapshotsProvider implements SnapshotsProvider { * @param name - Name of the reducer which the state was created. * @param stream - Stream the state was reduced for. */ - async getByStream(name: string, stream: string): Promise { - return this.db.sql`SELECT * FROM ${this.table} WHERE name = ${name} AND stream = ${stream}` + async getByStream(name: string, stream: string, { tx }: Options = {}): Promise { + return (tx ?? this.db.sql)`SELECT * FROM ${this.table} WHERE name = ${name} AND stream = ${stream}` .then(this.#fromDriver) .then(([snapshot]) => snapshot) .catch((error) => { @@ -54,8 +54,8 @@ export class PostgresSnapshotsProvider implements SnapshotsProvider { * @param name - Name of the reducer the snapshot is attached to. * @param stream - Stream to remove from snapshots. */ - async remove(name: string, stream: string): Promise { - await this.db.sql`DELETE FROM ${this.table} WHERE name = ${name} AND stream = ${stream}`.catch((error) => { + async remove(name: string, stream: string, { tx }: Options = {}): Promise { + await (tx ?? this.db.sql)`DELETE FROM ${this.table} WHERE name = ${name} AND stream = ${stream}`.catch((error) => { throw new Error(`EventStore > 'snapshots.remove' failed with postgres error: ${error.message}`); }); } diff --git a/deno.json b/deno.json index d30456c..a36bfa0 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@valkyr/event-store", - "version": "2.0.1", + "version": "2.0.2", "exports": { ".": "./mod.ts", "./browser": "./adapters/browser/adapter.ts", diff --git a/libraries/event-store.ts b/libraries/event-store.ts index d59da2b..7912f0a 100644 --- a/libraries/event-store.ts +++ b/libraries/event-store.ts @@ -559,7 +559,7 @@ type EventStoreConfig; }; -export type EventsInsertSettings = { +export interface EventsInsertSettings { /** * Should the event store emit events after successfull insertion. * This only takes false as value and by default events are always