diff --git a/abstract-pool.ts b/abstract-pool.ts index a5b6976..3caa12b 100644 --- a/abstract-pool.ts +++ b/abstract-pool.ts @@ -62,10 +62,127 @@ export class AbstractSimplePool { }) } - subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): SubCloser { - return this.subscribeManyMap(Object.fromEntries(relays.map(url => [url, filters])), params) + subscribe(relays: string[], filter: Filter, params: SubscribeManyParams): SubCloser { + return this.subscribeMap( + relays.map(url => ({ url, filter })), + params, + ) } + subscribeMany(relays: string[], filters: Filter[], params: SubscribeManyParams): SubCloser { + return this.subscribeMap( + relays.flatMap(url => filters.map(filter => ({ url, filter }))), + params, + ) + } + + subscribeMap(requests: { url: string; filter: Filter }[], params: SubscribeManyParams): SubCloser { + if (this.trackRelays) { + params.receivedEvent = (relay: AbstractRelay, id: string) => { + let set = this.seenOn.get(id) + if (!set) { + set = new Set() + this.seenOn.set(id, set) + } + set.add(relay) + } + } + + const _knownIds = new Set() + const subs: Subscription[] = [] + + // batch all EOSEs into a single + const eosesReceived: boolean[] = [] + let handleEose = (i: number) => { + if (eosesReceived[i]) return // do not act twice for the same relay + eosesReceived[i] = true + if (eosesReceived.filter(a => a).length === requests.length) { + params.oneose?.() + handleEose = () => {} + } + } + // batch all closes into a single + const closesReceived: string[] = [] + let handleClose = (i: number, reason: string) => { + if (closesReceived[i]) return // do not act twice for the same relay + handleEose(i) + closesReceived[i] = reason + if (closesReceived.filter(a => a).length === requests.length) { + params.onclose?.(closesReceived) + handleClose = () => {} + } + } + + const localAlreadyHaveEventHandler = (id: string) => { + if (params.alreadyHaveEvent?.(id)) { + return true + } + const have = _knownIds.has(id) + _knownIds.add(id) + return have + } + + // open a subscription in all given relays + const allOpened = Promise.all( + requests.map(async ({ url, filter }, i) => { + url = normalizeURL(url) + + let relay: AbstractRelay + try { + relay = await this.ensureRelay(url, { + connectionTimeout: params.maxWait ? Math.max(params.maxWait * 0.8, params.maxWait - 1000) : undefined, + }) + } catch (err) { + handleClose(i, (err as any)?.message || String(err)) + return + } + + let subscription = relay.subscribe([filter], { + ...params, + oneose: () => handleEose(i), + onclose: reason => { + if (reason.startsWith('auth-required:') && params.doauth) { + relay + .auth(params.doauth) + .then(() => { + relay.subscribe([filter], { + ...params, + oneose: () => handleEose(i), + onclose: reason => { + handleClose(i, reason) // the second time we won't try to auth anymore + }, + alreadyHaveEvent: localAlreadyHaveEventHandler, + eoseTimeout: params.maxWait, + }) + }) + .catch(err => { + handleClose(i, `auth was required and attempted, but failed with: ${err}`) + }) + } else { + handleClose(i, reason) + } + }, + alreadyHaveEvent: localAlreadyHaveEventHandler, + eoseTimeout: params.maxWait, + }) + + subs.push(subscription) + }), + ) + + return { + async close() { + await allOpened + subs.forEach(sub => { + sub.close() + }) + }, + } + } + + /** + * @deprecated Use subscribeMap instead. + */ subscribeManyMap(requests: { [relay: string]: Filter[] }, params: SubscribeManyParams): SubCloser { if (this.trackRelays) { params.receivedEvent = (relay: AbstractRelay, id: string) => { @@ -178,10 +295,24 @@ export class AbstractSimplePool { } } + subscribeEose( + relays: string[], + filter: Filter, + params: Pick, + ): SubCloser { + const subcloser = this.subscribe(relays, filter, { + ...params, + oneose() { + subcloser.close() + }, + }) + return subcloser + } + subscribeManyEose( relays: string[], filters: Filter[], - params: Pick, + params: Pick, ): SubCloser { const subcloser = this.subscribeMany(relays, filters, { ...params, @@ -199,7 +330,7 @@ export class AbstractSimplePool { ): Promise { return new Promise(async resolve => { const events: Event[] = [] - this.subscribeManyEose(relays, [filter], { + this.subscribeEose(relays, filter, { ...params, onevent(event: Event) { events.push(event)