import type { Helper } from "postgres"; import type { EventRecord } from "../../../libraries/event.ts"; import type { EventsProvider } from "../../../types/adapter.ts"; import type { EventReadOptions } from "../../../types/query.ts"; import type { PostgresDatabase } from "../database.ts"; type PGEventRecord = Omit & { data: string; meta: string }; export class PostgresEventsProvider implements EventsProvider { constructor( readonly db: PostgresDatabase, readonly schema?: string, ) {} get table(): Helper { if (this.schema !== undefined) { return this.db.sql(`${this.schema}.events`); } return this.db.sql("public.events"); } /** * Insert a new event record to the events table. * * @param record - Event record to insert. */ async insert(record: EventRecord): Promise { await this.db.sql`INSERT INTO ${this.table} ${this.db.sql(this.#toDriver(record))}`.catch((error) => { throw new Error(`EventStore > 'events.insert' failed with postgres error: ${error.message}`); }); } /** * Insert many new event records to the events table. * * @param records - Event records to insert. * @param batchSize - Batch size for the insert loop. */ async insertMany(records: EventRecord[], batchSize: number = 1_000): Promise { await this.db.sql .begin(async (sql) => { for (let i = 0; i < records.length; i += batchSize) { await sql`INSERT INTO ${this.table} ${this.db.sql(records.slice(i, i + batchSize).map(this.#toDriver))}`; } }) .catch((error) => { throw new Error(`EventStore > 'events.insertMany' failed with postgres error: ${error.message}`); }); } /** * Retrieve all the events in the events table. Optionally a cursor and direction * can be provided to reduce the list of events returned. * * @param options - Find options. */ async get(options: EventReadOptions): Promise { if (options !== undefined) { const { filter, cursor, direction, limit } = options; return this.db.sql` SELECT * FROM ${this.table} WHERE ${filter?.types ? this.#withTypes(filter.types) : this.db.sql``} ${cursor ? this.#withCursor(cursor, direction) : this.db.sql``} ORDER BY created ASC ${limit ? this.#withLimit(limit) : this.db.sql``} `.then(this.#fromDriver); } return this.db.sql`SELECT * FROM ${this.table} ORDER BY created ASC`.then(this.#fromDriver); } /** * Get events within the given stream. * * @param stream - Stream to fetch events for. * @param options - Read options for modifying the result. */ async getByStream(stream: string, { filter, cursor, direction, limit }: EventReadOptions = {}): Promise { return this.db.sql` SELECT * FROM ${this.table} WHERE stream = ${stream} ${filter?.types ? this.#withTypes(filter.types) : this.db.sql``} ${cursor ? this.#withCursor(cursor, direction) : this.db.sql``} ORDER BY created ASC ${limit ? this.#withLimit(limit) : this.db.sql``} `.then(this.#fromDriver); } /** * Get events within given list of streams. * * @param streams - Stream to get events for. * @param options - Read options for modifying the result. */ async getByStreams(streams: string[], { filter, cursor, direction, limit }: EventReadOptions = {}): Promise { return this.db.sql` SELECT * FROM ${this.table} WHERE stream IN ${this.db.sql(streams)} ${filter?.types ? this.#withTypes(filter.types) : this.db.sql``} ${cursor ? this.#withCursor(cursor, direction) : this.db.sql``} ORDER BY created ASC ${limit ? this.#withLimit(limit) : this.db.sql``} `.then(this.#fromDriver); } /** * Get a single event by its id. * * @param id - Event id. */ async getById(id: string): Promise { return this.db.sql`SELECT * FROM ${this.table} WHERE id = ${id}`.then(this.#fromDriver).then(([record]) => record); } /** * Check if the given event is outdated in relation to the local event data. */ async checkOutdated({ stream, type, created }: EventRecord): Promise { const count = await await this.db.sql` SELECT COUNT(*) AS count FROM ${this.table} WHERE stream = ${stream} AND type = ${type} AND created > ${created} `.then((result: any) => Number(result[0])); return count > 0; } /* |-------------------------------------------------------------------------------- | Utilities |-------------------------------------------------------------------------------- */ #withTypes(types: string[]) { return this.db.sql`AND type IN ${this.db.sql(types)}`; } #withCursor(cursor: string, direction?: 1 | -1 | "asc" | "desc") { if (direction === "desc" || direction === -1) { return this.db.sql`AND created < ${cursor}`; } return this.db.sql`AND created > ${cursor}`; } #withLimit(limit: number) { return this.db.sql`LIMIT ${limit}`; } /* |-------------------------------------------------------------------------------- | Parsers |-------------------------------------------------------------------------------- */ #fromDriver(records: PGEventRecord[]): EventRecord[] { return records.map((record) => { record.data = typeof record.data === "string" ? JSON.parse(record.data) : record.data; record.meta = typeof record.meta === "string" ? JSON.parse(record.meta) : record.meta; return record as unknown as EventRecord; }); } #toDriver(record: EventRecord): PGEventRecord { return { ...record, data: JSON.stringify(record.data), meta: JSON.stringify(record.meta), }; } }