From 804403f5746c1e6709c81c508860012d268fc65d Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Mon, 18 Dec 2023 17:11:16 -0300 Subject: [PATCH] change the way eose and connection timeouts work. --- pool.test.ts | 2 +- pool.ts | 50 +++++++++++++++++--------------- relay.ts | 82 ++++++++++++++++++++++++++++++++++++---------------- 3 files changed, 85 insertions(+), 49 deletions(-) diff --git a/pool.test.ts b/pool.test.ts index 0f592c2..3b9565d 100644 --- a/pool.test.ts +++ b/pool.test.ts @@ -74,7 +74,7 @@ test('same with double querying', async () => { }) test('querySync()', async () => { - let events = await pool.querySync([...relays, 'wss://offchain.pub', 'wss://eden.nostr.land'], { + let events = await pool.querySync([...relays.slice(2), 'wss://offchain.pub', 'wss://eden.nostr.land'], { authors: ['3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d'], kinds: [1], limit: 2, diff --git a/pool.ts b/pool.ts index f2e34d6..ac91a88 100644 --- a/pool.ts +++ b/pool.ts @@ -1,4 +1,4 @@ -import { relayConnect, type Relay, SubscriptionParams, Subscription } from './relay.ts' +import { Relay, SubscriptionParams, Subscription } from './relay.ts' import { normalizeURL } from './utils.ts' import type { Event } from './event.ts' @@ -7,7 +7,7 @@ import { type Filter } from './filter.ts' export type SubCloser = { close: () => void } export type SubscribeManyParams = Omit & { - eoseSubTimeout?: number + maxWait?: number onclose?: (reasons: string[]) => void id?: string } @@ -17,13 +17,18 @@ export class SimplePool { public seenOn = new Map>() public trackRelays: boolean = false - async ensureRelay(url: string): Promise { + public trustedRelayURLs = new Set() + + async ensureRelay(url: string, params?: { connectionTimeout?: number }): Promise { url = normalizeURL(url) let relay = this.relays.get(url) if (!relay) { - relay = relayConnect(url) + relay = new Relay(url) + if (params?.connectionTimeout) relay.connectionTimeout = params.connectionTimeout + if (this.trustedRelayURLs.has(relay.url)) relay.trusted = true this.relays.set(url, relay) + await relay.connect() } return relay @@ -45,26 +50,22 @@ export class SimplePool { const subs: Subscription[] = [] // batch all EOSEs into a single - let eosesMissing = relays.length - let handleEose = () => { - eosesMissing-- - if (eosesMissing === 0) { - clearTimeout(eoseTimeout) + const eosesReceived: boolean[] = [] + let handleEose = (i: number) => { + eosesReceived[i] = true + if (eosesReceived.filter(a => a).length === relays.length) { params.oneose?.() + handleEose = () => {} } } - const eoseTimeout = setTimeout(() => { - handleEose = () => {} - params.oneose?.() - }, params.eoseSubTimeout || 3400) - // batch all closes into a single const closesReceived: string[] = [] - const handleClose = (i: number, reason: string) => { - handleEose() + let handleClose = (i: number, reason: string) => { + handleEose(i) closesReceived[i] = reason - if (closesReceived.length === relays.length) { + if (closesReceived.filter(a => a).length === relays.length) { params.onclose?.(closesReceived) + handleClose = () => {} } } @@ -88,17 +89,20 @@ export class SimplePool { let relay: Relay try { - relay = await this.ensureRelay(url) + relay = await this.ensureRelay(url, { + connectionTimeout: params.maxWait ? Math.max(params.maxWait * 0.8, params.maxWait - 1000) : undefined, + }) } catch (err) { - handleEose() + handleClose(i, (err as any)?.message || String(err)) return } let subscription = await relay.subscribe(filters, { ...params, - oneose: handleEose, + oneose: () => handleEose(i), onclose: reason => handleClose(i, reason), alreadyHaveEvent: localAlreadyHaveEventHandler, + eoseTimeout: params.maxWait, }) subs.push(subscription) @@ -118,7 +122,7 @@ export class SimplePool { subscribeManyEose( relays: string[], filters: Filter[], - params: Pick, + params: Pick, ): SubCloser { const subcloser = this.subscribeMany(relays, filters, { ...params, @@ -132,7 +136,7 @@ export class SimplePool { async querySync( relays: string[], filter: Filter, - params?: Pick, + params?: Pick, ): Promise { return new Promise(async resolve => { const events: Event[] = [] @@ -151,7 +155,7 @@ export class SimplePool { async get( relays: string[], filter: Filter, - params?: Pick, + params?: Pick, ): Promise { filter.limit = 1 const events = await this.querySync(relays, filter, params) diff --git a/relay.ts b/relay.ts index e06b319..e608715 100644 --- a/relay.ts +++ b/relay.ts @@ -18,7 +18,11 @@ export class Relay { public trusted: boolean = false public onclose: (() => void) | null = null - public onnotice: (msg: string) => void = msg => console.log(`NOTICE from ${this.url}: ${msg}`) + public onnotice: (msg: string) => void = msg => console.debug(`NOTICE from ${this.url}: ${msg}`) + + public baseEoseTimeout: number = 4400 + public connectionTimeout: number = 8800 + private connectionTimeoutHandle: ReturnType | undefined private connectionPromise: Promise | undefined private openSubs = new Map() @@ -60,6 +64,13 @@ export class Relay { this.challenge = undefined this.connectionPromise = new Promise((resolve, reject) => { + this.connectionTimeoutHandle = setTimeout(() => { + reject('connection timed out') + this.connectionPromise = undefined + this.onclose?.() + this.closeAllSubscriptions('relay connection timed out') + }, this.connectionTimeout) + try { this.ws = new WebSocket(this.url) } catch (err) { @@ -68,6 +79,7 @@ export class Relay { } this.ws.onopen = () => { + clearTimeout(this.connectionTimeoutHandle) this._connected = true resolve() } @@ -166,9 +178,8 @@ export class Relay { } case 'EOSE': { const so = this.openSubs.get(data[1] as string) - if (!so || so.eosed) return - so.eosed = true - so.oneose?.() + if (!so) return + so.receivedEose() return } case 'OK': { @@ -204,7 +215,6 @@ export class Relay { } public async send(message: string) { - await this.connect() this.ws?.send(message) } @@ -237,20 +247,16 @@ export class Relay { public async subscribe(filters: Filter[], params: Partial): Promise { await this.connect() + const subscription = this.prepareSubscription(filters, params) + subscription.fire() + return subscription + } + + public prepareSubscription(filters: Filter[], params: Partial & { id?: string }): Subscription { this.serial++ const id = params.id || 'sub:' + this.serial - const subscription = new Subscription(this, filters, { - onevent: event => { - console.warn( - `onevent() callback not defined for subscription '${id}' in relay ${this.url}. event received:`, - event, - ) - }, - ...params, - id, - }) + const subscription = new Subscription(this, id, filters, params) this.openSubs.set(id, subscription) - this.send('["REQ","' + id + '",' + JSON.stringify(filters).substring(1)) return subscription } @@ -264,26 +270,52 @@ export class Relay { export class Subscription { public readonly relay: Relay public readonly id: string + public closed: boolean = false public eosed: boolean = false - + public filters: Filter[] public alreadyHaveEvent: ((id: string) => boolean) | undefined public receivedEvent: ((relay: Relay, id: string) => void) | undefined - public readonly filters: Filter[] public onevent: (evt: Event) => void public oneose: (() => void) | undefined public onclose: ((reason: string) => void) | undefined - constructor(relay: Relay, filters: Filter[], params: SubscriptionParams) { + public eoseTimeout: number + private eoseTimeoutHandle: ReturnType | undefined + + constructor(relay: Relay, id: string, filters: Filter[], params: SubscriptionParams) { this.relay = relay this.filters = filters - this.id = params.id - this.onevent = params.onevent - this.oneose = params.oneose - this.onclose = params.onclose + this.id = id this.alreadyHaveEvent = params.alreadyHaveEvent this.receivedEvent = params.receivedEvent + this.eoseTimeout = params.eoseTimeout || relay.baseEoseTimeout + + this.oneose = params.oneose + this.onclose = params.onclose + this.onevent = + params.onevent || + (event => { + console.warn( + `onevent() callback not defined for subscription '${this.id}' in relay ${this.relay.url}. event received:`, + event, + ) + }) + } + + public fire() { + this.relay.send('["REQ","' + this.id + '",' + JSON.stringify(this.filters).substring(1)) + + // only now we start counting the eoseTimeout + this.eoseTimeoutHandle = setTimeout(this.receivedEose.bind(this), this.eoseTimeout) + } + + public receivedEose() { + if (this.eosed) return + clearTimeout(this.eoseTimeoutHandle) + this.eosed = true + this.oneose?.() } public close(reason: string = 'closed by caller') { @@ -298,12 +330,12 @@ export class Subscription { } export type SubscriptionParams = { - id: string - onevent: (evt: Event) => void + onevent?: (evt: Event) => void oneose?: () => void onclose?: (reason: string) => void alreadyHaveEvent?: (id: string) => boolean receivedEvent?: (relay: Relay, id: string) => void + eoseTimeout?: number } export type CountResolver = {