feat: update pending logic
This commit is contained in:
@@ -2,15 +2,15 @@ import { Document } from "../../types.ts";
|
|||||||
import { IndexedDbStorage } from "./storage.ts";
|
import { IndexedDbStorage } from "./storage.ts";
|
||||||
|
|
||||||
export class Pending<TSchema extends Document = Document> {
|
export class Pending<TSchema extends Document = Document> {
|
||||||
readonly #upsert: any[] = [];
|
|
||||||
readonly #remove: string[] = [];
|
|
||||||
|
|
||||||
readonly #chunkSize = 500;
|
|
||||||
|
|
||||||
#saving: Promise<void> | null = null;
|
|
||||||
|
|
||||||
#storage: IndexedDbStorage<TSchema>;
|
#storage: IndexedDbStorage<TSchema>;
|
||||||
|
|
||||||
|
readonly #upsert = new Map<string, TSchema>();
|
||||||
|
readonly #remove = new Set<string>();
|
||||||
|
|
||||||
|
#chunkSize = 500;
|
||||||
|
#saveScheduled = false;
|
||||||
|
#saving: Promise<void> | null = null;
|
||||||
|
|
||||||
constructor(storage: IndexedDbStorage<TSchema>) {
|
constructor(storage: IndexedDbStorage<TSchema>) {
|
||||||
this.#storage = storage;
|
this.#storage = storage;
|
||||||
}
|
}
|
||||||
@@ -20,33 +20,48 @@ export class Pending<TSchema extends Document = Document> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
upsert(document: any): void {
|
upsert(document: any): void {
|
||||||
this.#upsert.push(document);
|
this.#remove.delete(document.id);
|
||||||
this.save();
|
this.#upsert.set(document.id, document);
|
||||||
|
this.#schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
remove(id: any): void {
|
remove(id: any): void {
|
||||||
this.#remove.push(id);
|
this.#upsert.delete(id);
|
||||||
this.save();
|
this.#remove.add(id);
|
||||||
|
this.#schedule();
|
||||||
|
}
|
||||||
|
|
||||||
|
#schedule() {
|
||||||
|
if (!this.#saveScheduled) {
|
||||||
|
this.#saveScheduled = true;
|
||||||
|
queueMicrotask(() => {
|
||||||
|
this.#saveScheduled = false;
|
||||||
|
void this.save();
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async save() {
|
async save() {
|
||||||
if (this.#saving) {
|
if (this.#saving) return;
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.#saving = (async () => {
|
this.#saving = (async () => {
|
||||||
try {
|
try {
|
||||||
while (this.#upsert.length > 0 || this.#remove.length > 0) {
|
while (this.#upsert.size > 0 || this.#remove.size > 0) {
|
||||||
const tx = this.#storage.db.transaction(this.#storage.name, "readwrite", { durability: "relaxed" });
|
const tx = this.#storage.db.transaction(this.#storage.name, "readwrite", { durability: "relaxed" });
|
||||||
|
const store = tx.store;
|
||||||
|
|
||||||
if (this.#remove.length > 0) {
|
// Process removals
|
||||||
const removals = this.#remove.splice(0, this.#chunkSize);
|
if (this.#remove.size > 0) {
|
||||||
await Promise.all(removals.map((id) => tx.store.delete(id)));
|
const removals = Array.from(this.#remove).slice(0, this.#chunkSize);
|
||||||
|
removals.forEach((id) => this.#remove.delete(id));
|
||||||
|
await Promise.all(removals.map((id) => store.delete(id)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.#upsert.length > 0) {
|
// Process upserts
|
||||||
const upserts = this.#upsert.splice(0, this.#chunkSize);
|
if (this.#upsert.size > 0) {
|
||||||
await Promise.all(upserts.map((doc) => tx.store.put(doc)));
|
const upserts = Array.from(this.#upsert.values()).slice(0, this.#chunkSize);
|
||||||
|
upserts.forEach((doc) => this.#upsert.delete(doc.id));
|
||||||
|
await Promise.all(upserts.map((doc) => store.put(doc)));
|
||||||
}
|
}
|
||||||
|
|
||||||
await tx.done;
|
await tx.done;
|
||||||
|
|||||||
Reference in New Issue
Block a user