diff --git a/relay.test.ts b/relay.test.ts index 7db69cc..220a246 100644 --- a/relay.test.ts +++ b/relay.test.ts @@ -57,6 +57,19 @@ test('querying', async () => { expect(t2).toEqual(true) }, 10000) +test('async iterator', async () => { + let sub = relay.sub([ + { + ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'], + }, + ]) + + for await (const event of sub.events) { + expect(event).toHaveProperty('id', 'd7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027') + break + } +}) + test('get()', async () => { let event = await relay.get({ ids: ['d7dd5eb3ab747e16f8d0212d53032ea2a7cadef53837e5a6c66d42849fcb9027'], diff --git a/relay.ts b/relay.ts index e4caa8e..e8cf8f5 100644 --- a/relay.ts +++ b/relay.ts @@ -39,6 +39,7 @@ export type Sub = { unsub: () => void on: , U extends SubEvent[T]>(event: T, listener: U) => void off: , U extends SubEvent[T]>(event: T, listener: U) => void + events: AsyncGenerator, void, unknown> } export type SubscriptionOptions = { @@ -241,6 +242,47 @@ export function relayInit( } trySend([verb, subid, ...filters]) + async function* eventsGenerator(): AsyncGenerator, void, unknown> { + let nextResolve: ((event: Event) => void) | undefined + const eventQueue: Event[] = [] + + const pushToQueue = (event: Event) => { + if (nextResolve) { + nextResolve(event) + nextResolve = undefined + } else { + eventQueue.push(event) + } + } + + // Register the event listener + if (!subListeners[subid]) { + subListeners[subid] = { + event: [], + count: [], + eose: [] + } + } + subListeners[subid].event.push(pushToQueue) + + try { + while (true) { + if (eventQueue.length > 0) { + yield eventQueue.shift()! + } else { + const event = await new Promise>((resolve) => { + nextResolve = resolve + }) + yield event + } + } + } finally { + // Unregister the event listener when the generator is done + const idx = subListeners[subid].event.indexOf(pushToQueue) + if (idx >= 0) subListeners[subid].event.splice(idx, 1) + } + } + return { sub: (newFilters, newOpts = {}) => sub(newFilters || filters, { @@ -266,6 +308,9 @@ export function relayInit( let idx = listeners[type].indexOf(cb) if (idx >= 0) listeners[type].splice(idx, 1) }, + get events() { + return eventsGenerator() + } } }