diff --git a/package.json b/package.json index 0a51d93..84a5257 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nostr-tools", - "version": "0.3.2", + "version": "0.4.0", "description": "Tools for making a Nostr client.", "main": "index.js", "repository": { @@ -10,6 +10,7 @@ "dependencies": { "buffer": "^6.0.3", "noble-secp256k1": "^1.1.1", + "ramda": "^0.27.1", "websocket-polyfill": "^0.0.3" }, "keywords": [ @@ -23,14 +24,5 @@ "censorship", "censorship-resistance", "client" - ], - "devDependencies": { - "@babel/core": "^7.12.10", - "@babel/preset-env": "^7.12.11", - "@rollup/plugin-babel": "^5.2.2", - "@rollup/plugin-node-resolve": "^11.0.1", - "rollup": "^2.36.1", - "rollup-plugin-ignore": "^1.0.9", - "rollup-plugin-terser": "^7.0.2" - } + ] } diff --git a/pool.js b/pool.js index 03c3832..f1eb49a 100644 --- a/pool.js +++ b/pool.js @@ -1,10 +1,11 @@ +import R from 'ramda' + import {getEventHash, signEvent} from './event' import {relayConnect, normalizeRelayURL} from './relay' export function relayPool(globalPrivateKey) { const relays = {} const globalSub = [] - const attemptCallbacks = [] const eventCallbacks = [] const noticeCallbacks = [] @@ -20,21 +21,19 @@ export function relayPool(globalPrivateKey) { noticeCallbacks[i](notice, relay) } } - function propagateAttempt(eventId, status, relayURL) { - for (let i = 0; i < attemptCallbacks.length; i++) { - let {relay} = relays[relayURL] - attemptCallbacks[i](eventId, status, relay) - } - } - async function relaysEach(fn, policyFilter) { - for (let relayURL in relays) { - let {relay, policy} = relays[relayURL] - if (policyFilter.write && policy.write) { - await fn(relay) - } else if (policyFilter.read && policy.read) { - await fn(relay) - } + const sub = async (cb, params) => { + const subControllers = R.map(relay => { + return relay.sub(params, cb.bind(null, relay)) + }, R.filter(R.pipe(R.prop('policy'), R.prop('write'), R.equals(true)), relays)) + + return { + sub: (cb, params) => + R.map( + R.pipe(R.prop('sub'), R.flip(R.apply)([cb, params])), + subControllers + ), + unsub: () => R.map(R.pipe(R.prop('unsub'), R.call), subControllers) } } @@ -72,9 +71,6 @@ export function relayPool(globalPrivateKey) { relay.close() delete relays[relayURL] }, - onEvent(cb) { - eventCallbacks.push(cb) - }, offEvent(cb) { let index = eventCallbacks.indexOf(cb) if (index !== -1) eventCallbacks.splice(index, 1) @@ -86,14 +82,7 @@ export function relayPool(globalPrivateKey) { let index = noticeCallbacks.indexOf(cb) if (index !== -1) noticeCallbacks.splice(index, 1) }, - onAttempt(cb) { - attemptCallbacks.push(cb) - }, - offAttempt(cb) { - let index = attemptCallbacks.indexOf(cb) - if (index !== -1) attemptCallbacks.splice(index, 1) - }, - async publish(event) { + async publish(event, statusCallback) { if (!event.sig) { event.tags = event.tags || [] @@ -107,36 +96,23 @@ export function relayPool(globalPrivateKey) { } } - await relaysEach( - async relay => { - try { - await relay.publish(event) - propagateAttempt(event.id, 'sent', relay.url) - } catch (err) { - propagateAttempt(event.id, 'failed', relay.url) - } - }, - {write: true} - ) + await R.map(async relay => { + try { + await relay.publish(event) + statusCallback(0, relay.url) + let {unsub} = relay.sub( + () => { + statusCallback(1, relay.url) + }, + {id: event.id} + ) + setTimeout(unsub, 5000) + } catch (err) { + statusCallback(-1, relay.url) + } + }, R.filter(R.pipe(R.prop('policy'), R.prop('write'), R.equals(true)), relays)) return event - }, - async subKey(key) { - globalSub[key] = true - await relaysEach(async relay => relay.subKey(key), {read: true}) - }, - async unsubKey(key) { - delete globalSub[key] - await relaysEach(async relay => relay.unsubKey(key), {read: true}) - }, - async reqFeed(params = {}) { - await relaysEach(async relay => relay.reqFeed(params), {read: true}) - }, - async reqEvent(params) { - await relaysEach(async relay => relay.reqEvent(params), {read: true}) - }, - async reqKey(params) { - await relaysEach(async relay => relay.reqKey(params), {read: true}) } } } diff --git a/relay.js b/relay.js index b6ab91a..f3ed5ee 100644 --- a/relay.js +++ b/relay.js @@ -1,6 +1,8 @@ import 'websocket-polyfill' +import R from 'ramda' import {verifySignature} from './event' +import {sha256} from './utils' export function normalizeRelayURL(url) { let [host, ...qs] = url.split('?') @@ -10,10 +12,10 @@ export function normalizeRelayURL(url) { return [host, ...qs].join('?') } -export function relayConnect(url, onEvent, onNotice) { +export function relayConnect(url, onNotice) { url = normalizeRelayURL(url) - let ws, resolveOpen, untilOpen, rejectOpen + var ws, resolveOpen, untilOpen, rejectOpen let attemptNumber = 1 function resetOpenState() { @@ -23,6 +25,8 @@ export function relayConnect(url, onEvent, onNotice) { }) } + var channels = {} + function connect() { ws = new WebSocket( url + (url.indexOf('?') !== -1 ? '&' : '?') + `session=${Math.random()}` @@ -63,23 +67,29 @@ export function relayConnect(url, onEvent, onNotice) { return } - if (data[0] === 'notice') { + if (data[0] === 'NOTICE') { + if (data.length < 2) return + console.log('message from relay ' + url + ': ' + data[1]) onNotice(data[1]) return } - if (typeof data[0] === 'object') { - let event = data[0] - let context = data[1] + if (data[0] === 'EVENT') { + if (data.length < 3) return + + let channel = data[1] + let event = data[2] if (await verifySignature(event)) { - onEvent(event, context) + if (channels[channel]) { + channels[channel](event) + } } else { console.warn( 'got event with invalid signature from ' + url, event, - context + id ) } return @@ -94,7 +104,9 @@ export function relayConnect(url, onEvent, onNotice) { connect() } catch (err) {} - async function trySend(msg) { + async function trySend(params) { + let msg = JSON.stringify(params) + if (ws && ws.readyState === WebSocket.OPEN) { ws.send(msg) } else { @@ -107,23 +119,20 @@ export function relayConnect(url, onEvent, onNotice) { } } + const sub = async (channel, cb, params) => { + trySend(['REQ', channel, params]) + + channels[channel] = cb + + return { + sub: R.partial(sub, [channel]), + unsub: () => trySend(['CLOSE', channel]) + } + } + return { url, - async subKey(key) { - trySend('sub-key:' + key) - }, - async unsubKey(key) { - trySend('unsub-key:' + key) - }, - async reqFeed(params = {}) { - trySend('req-feed:' + JSON.stringify(params)) - }, - async reqEvent(params) { - trySend('req-event:' + JSON.stringify(params)) - }, - async reqKey(params) { - trySend('req-key:' + JSON.stringify(params)) - }, + sub: R.partial(sub, [sha256(Math.random().toString())]), async publish(event) { trySend(JSON.stringify(event)) },