diff --git a/pool.test.js b/pool.test.js index 4dda7f7..71fe3a8 100644 --- a/pool.test.js +++ b/pool.test.js @@ -12,18 +12,19 @@ const { let p = pool() let relays = [ - p.ensureRelay('wss://nostr-dev.wellorder.net/'), - p.ensureRelay('wss://relay.nostr.bg/'), - p.ensureRelay('wss://nostr.fmt.wiz.biz/'), - p.ensureRelay('wss://relay.nostr.band/'), - p.ensureRelay('wss://nostr.zebedee.cloud/') + 'wss://nostr-dev.wellorder.net/', + 'wss://relay.nostr.bg/', + 'wss://nostr.fmt.wiz.biz/', + 'wss://relay.nostr.band/', + 'wss://nostr.zebedee.cloud/' ] beforeAll(async () => { Promise.all( relays.map(relay => { try { - return relay.connect() + let r = p.ensureRelay(relay) + return r.connect() } catch (err) { /***/ } @@ -34,7 +35,8 @@ beforeAll(async () => { afterAll(async () => { relays.forEach(relay => { try { - relay.close() + let r = p.ensureRelay(relay) + r.close() } catch (err) { /***/ } @@ -45,13 +47,11 @@ test('removing duplicates when querying', async () => { let priv = generatePrivateKey() let pub = getPublicKey(priv) - let subs = relays.map(relay => - relay.sub([ - { - authors: [pub] - } - ]) - ) + let subs = p.sub(relays, [ + { + authors: [pub] + } + ]) let received = [] @@ -74,11 +74,46 @@ test('removing duplicates when querying', async () => { event.id = getEventHash(event) event.sig = signEvent(event, priv) - relays.forEach(relay => { - relay.publish(event) - }) + p.publish(relays, event) await new Promise(resolve => setTimeout(resolve, 1500)) - return expect(received).toHaveLength(1) + expect(received).toHaveLength(1) +}) + +test('removing duplicates correctly when double querying', async () => { + let priv = generatePrivateKey() + let pub = getPublicKey(priv) + + let subs1 = p.sub(relays, [ { authors: [pub] } ]) + let subs2 = p.sub(relays, [ { authors: [pub] } ]) + + let received = [] + + subs1.forEach(sub => + sub.on('event', event => { + received.push(event) + }) + ) + subs2.forEach(sub => + sub.on('event', event => { + received.push(event) + }) + ) + + let event = { + pubkey: pub, + created_at: Math.round(Date.now() / 1000), + content: 'test2', + kind: 22346, + tags: [] + } + event.id = getEventHash(event) + event.sig = signEvent(event, priv) + + p.publish(relays, event) + + await new Promise(resolve => setTimeout(resolve, 1500)) + + expect(received).toHaveLength(2) }) diff --git a/pool.ts b/pool.ts index 012b058..5884306 100644 --- a/pool.ts +++ b/pool.ts @@ -2,7 +2,7 @@ import {Relay, relayInit} from './relay' import {normalizeURL} from './utils' import {Filter} from './filter' import {Event} from './event' -import {SubscriptionOptions, Sub} from './relay' +import {SubscriptionOptions, Sub, Pub} from './relay' export function pool(defaultRelays: string[] = []) { return new SimplePool(defaultRelays) @@ -10,7 +10,6 @@ export function pool(defaultRelays: string[] = []) { class SimplePool { private _conn: {[url: string]: Relay} - private _knownIds: Set = new Set() constructor(defaultRelays: string[]) { this._conn = {} @@ -22,17 +21,52 @@ class SimplePool { const existing = this._conn[nm] if (existing) return existing - const hasEventId = (id: string): boolean => this._knownIds.has(id) - const relay = relayInit(nm, hasEventId) + const relay = relayInit(nm) this._conn[nm] = relay - let sub = relay.sub - relay.sub = (filters: Filter[], opts?: SubscriptionOptions): Sub => { - let s = sub(filters, opts) - s.on('event', (event: Event) => this._knownIds.add(event.id as string)) - return s - } - return relay } + + sub(relays: string[], filters: Filter[], opts?: SubscriptionOptions): Sub[] { + let _knownIds: Set = new Set() + let modifiedOpts = opts || {} + modifiedOpts.alreadyHaveEvent = id => _knownIds.has(id) + + return relays.map(relay => { + let r = this._conn[relay] + if (!r) return badSub() + let s = r.sub(filters, modifiedOpts) + s.on('event', (event: Event) => _knownIds.add(event.id as string)) + return s + }) + } + + publish(relays: string[], event: Event): Pub[] { + return relays.map(relay => { + let r = this._conn[relay] + if (!r) return badPub(relay) + let s = r.publish(event) + return s + }) + } +} + +function badSub(): Sub { + return { + on() {}, + off() {}, + sub(): Sub { + return badSub() + }, + unsub() {} + } +} + +function badPub(relay: string): Pub { + return { + on(typ, cb) { + if (typ === 'failed') cb(`relay ${relay} not connected`) + }, + off() {} + } } diff --git a/relay.ts b/relay.ts index ca23d43..f29ef4a 100644 --- a/relay.ts +++ b/relay.ts @@ -2,7 +2,7 @@ import {Event, verifySignature, validateEvent} from './event' import {Filter, matchFilters} from './filter' -import {getHex64} from './fakejson' +import {getHex64, getSubscriptionId} from './fakejson' type RelayEvent = 'connect' | 'disconnect' | 'error' | 'notice' @@ -29,13 +29,11 @@ export type Sub = { export type SubscriptionOptions = { skipVerification?: boolean + alreadyHaveEvent?: null | ((id: string) => boolean) id?: string } -export function relayInit( - url: string, - alreadyHaveEvent: (id: string) => boolean = () => false -): Relay { +export function relayInit(url: string): Relay { var ws: WebSocket var resolveClose: () => void var setOpen: (value: PromiseLike | void) => void @@ -104,8 +102,14 @@ export function relayInit( } var json = incomingMessageQueue.shift() - if (!json || alreadyHaveEvent(getHex64(json, 'id'))) { - return + if (!json) return + + let subid = getSubscriptionId(json) + if (subid) { + let {alreadyHaveEvent} = openSubs[subid] + if (alreadyHaveEvent && alreadyHaveEvent(getHex64(json, 'id'))) { + return + } } try { @@ -173,6 +177,7 @@ export function relayInit( filters: Filter[], { skipVerification = false, + alreadyHaveEvent = null, id = Math.random().toString().slice(2) }: SubscriptionOptions = {} ): Sub => { @@ -181,7 +186,8 @@ export function relayInit( openSubs[subid] = { id: subid, filters, - skipVerification + skipVerification, + alreadyHaveEvent } trySend(['REQ', subid, ...filters]) @@ -189,6 +195,7 @@ export function relayInit( sub: (newFilters, newOpts = {}) => sub(newFilters || filters, { skipVerification: newOpts.skipVerification || skipVerification, + alreadyHaveEvent: newOpts.alreadyHaveEvent || alreadyHaveEvent, id: subid }), unsub: () => {