relay: get rid of the message queue, because js is single-threaded.

This commit is contained in:
fiatjaf
2026-02-02 09:06:56 -03:00
parent c2423f7f31
commit be9b91318f
3 changed files with 104 additions and 214 deletions

View File

@@ -3,9 +3,8 @@
import type { Event, EventTemplate, VerifiedEvent, Nostr, NostrEvent } from './core.ts'
import { matchFilters, type Filter } from './filter.ts'
import { getHex64, getSubscriptionId } from './fakejson.ts'
import { Queue, normalizeURL } from './utils.ts'
import { normalizeURL } from './utils.ts'
import { makeAuthEvent } from './nip42.ts'
import { yieldThread } from './helpers.ts'
type RelayWebSocket = WebSocket & {
ping?(): void
@@ -51,8 +50,6 @@ export class AbstractRelay {
private openCountRequests = new Map<string, CountResolver>()
private openEventPublishes = new Map<string, EventPublishResolver>()
private ws: RelayWebSocket | undefined
private incomingMessageQueue = new Queue<string>()
private queueRunning = false
private challenge: string | undefined
private authPromise: Promise<string> | undefined
private serial: number = 0
@@ -269,122 +266,6 @@ export class AbstractRelay {
}
}
private async runQueue() {
this.queueRunning = true
while (true) {
if (false === this.handleNext()) {
break
}
await yieldThread()
}
this.queueRunning = false
}
private handleNext(): undefined | false {
const json = this.incomingMessageQueue.dequeue()
if (!json) {
return false
}
// shortcut EVENT sub
const subid = getSubscriptionId(json)
if (subid) {
const so = this.openSubs.get(subid as string)
if (!so) {
// this is an EVENT message, but for a subscription we don't have, so just stop here
return
}
// this will be called only when this message is a EVENT message for a subscription we have
// we do this before parsing the JSON to not have to do that for duplicate events
// since JSON parsing is slow
const id = getHex64(json, 'id')
const alreadyHave = so.alreadyHaveEvent?.(id)
// notify any interested client that the relay has this event
// (do this after alreadyHaveEvent() because the client may rely on this to answer that)
so.receivedEvent?.(this, id)
if (alreadyHave) {
// if we had already seen this event we can just stop here
return
}
}
try {
let data = JSON.parse(json)
// we won't do any checks against the data since all failures (i.e. invalid messages from relays)
// will naturally be caught by the encompassing try..catch block
switch (data[0]) {
case 'EVENT': {
const so = this.openSubs.get(data[1] as string) as Subscription
const event = data[2] as NostrEvent
if (this.verifyEvent(event) && matchFilters(so.filters, event)) {
so.onevent(event)
}
if (!so.lastEmitted || so.lastEmitted < event.created_at) so.lastEmitted = event.created_at
return
}
case 'COUNT': {
const id: string = data[1]
const payload = data[2] as { count: number }
const cr = this.openCountRequests.get(id) as CountResolver
if (cr) {
cr.resolve(payload.count)
this.openCountRequests.delete(id)
}
return
}
case 'EOSE': {
const so = this.openSubs.get(data[1] as string)
if (!so) return
so.receivedEose()
return
}
case 'OK': {
const id: string = data[1]
const ok: boolean = data[2]
const reason: string = data[3]
const ep = this.openEventPublishes.get(id) as EventPublishResolver
if (ep) {
clearTimeout(ep.timeout)
if (ok) ep.resolve(reason)
else ep.reject(new Error(reason))
this.openEventPublishes.delete(id)
}
return
}
case 'CLOSED': {
const id: string = data[1]
const so = this.openSubs.get(id)
if (!so) return
so.closed = true
so.close(data[2] as string)
return
}
case 'NOTICE': {
this.onnotice(data[1] as string)
return
}
case 'AUTH': {
this.challenge = data[1] as string
if (this.onauth) {
this.auth(this.onauth)
}
return
}
default: {
const so = this.openSubs.get(data[1])
so?.oncustom?.(data)
return
}
}
} catch (err) {
return
}
}
public async send(message: string) {
if (!this.connectionPromise) throw new SendingOnClosedConnection(message, this.url)
@@ -488,9 +369,109 @@ export class AbstractRelay {
// this is the function assigned to this.ws.onmessage
// it's exposed for testing and debugging purposes
public _onmessage(ev: MessageEvent<any>) {
this.incomingMessageQueue.enqueue(ev.data as string)
if (!this.queueRunning) {
this.runQueue()
const json = ev.data
if (!json) {
return false
}
// shortcut EVENT sub
const subid = getSubscriptionId(json)
if (subid) {
const so = this.openSubs.get(subid as string)
if (!so) {
// this is an EVENT message, but for a subscription we don't have, so just stop here
return
}
// this will be called only when this message is a EVENT message for a subscription we have
// we do this before parsing the JSON to not have to do that for duplicate events
// since JSON parsing is slow
const id = getHex64(json, 'id')
const alreadyHave = so.alreadyHaveEvent?.(id)
// notify any interested client that the relay has this event
// (do this after alreadyHaveEvent() because the client may rely on this to answer that)
so.receivedEvent?.(this, id)
if (alreadyHave) {
// if we had already seen this event we can just stop here
return
}
}
try {
let data = JSON.parse(json)
// we won't do any checks against the data since all failures (i.e. invalid messages from relays)
// will naturally be caught by the encompassing try..catch block
switch (data[0]) {
case 'EVENT': {
const so = this.openSubs.get(data[1] as string) as Subscription
const event = data[2] as NostrEvent
if (this.verifyEvent(event) && matchFilters(so.filters, event)) {
so.onevent(event)
}
if (!so.lastEmitted || so.lastEmitted < event.created_at) so.lastEmitted = event.created_at
return
}
case 'COUNT': {
const id: string = data[1]
const payload = data[2] as { count: number }
const cr = this.openCountRequests.get(id) as CountResolver
if (cr) {
cr.resolve(payload.count)
this.openCountRequests.delete(id)
}
return
}
case 'EOSE': {
const so = this.openSubs.get(data[1] as string)
if (!so) return
so.receivedEose()
return
}
case 'OK': {
const id: string = data[1]
const ok: boolean = data[2]
const reason: string = data[3]
const ep = this.openEventPublishes.get(id) as EventPublishResolver
if (ep) {
clearTimeout(ep.timeout)
if (ok) ep.resolve(reason)
else ep.reject(new Error(reason))
this.openEventPublishes.delete(id)
}
return
}
case 'CLOSED': {
const id: string = data[1]
const so = this.openSubs.get(id)
if (!so) return
so.closed = true
so.close(data[2] as string)
return
}
case 'NOTICE': {
this.onnotice(data[1] as string)
return
}
case 'AUTH': {
this.challenge = data[1] as string
if (this.onauth) {
this.auth(this.onauth)
}
return
}
default: {
const so = this.openSubs.get(data[1])
so?.oncustom?.(data)
return
}
}
} catch (err) {
const [_, __, event] = JSON.parse(json)
;(window as any).printer.maybe(event.pubkey, ':: caught err', event, this.url, err)
return
}
}
}

View File

@@ -1,37 +1,5 @@
import { verifiedSymbol, type Event, type Nostr, VerifiedEvent } from './core.ts'
export async function yieldThread() {
return new Promise<void>((resolve, reject) => {
try {
// Check if MessageChannel is available
if (typeof MessageChannel !== 'undefined') {
const ch = new MessageChannel()
const handler = () => {
// @ts-ignore (typescript thinks this property should be called `removeListener`, but in fact it's `removeEventListener`)
ch.port1.removeEventListener('message', handler)
resolve()
}
// @ts-ignore (typescript thinks this property should be called `addListener`, but in fact it's `addEventListener`)
ch.port1.addEventListener('message', handler)
ch.port2.postMessage(0)
ch.port1.start()
} else {
if (typeof setImmediate !== 'undefined') {
setImmediate(resolve)
} else if (typeof setTimeout !== 'undefined') {
setTimeout(resolve, 0)
} else {
// Last resort - resolve immediately
resolve()
}
}
} catch (e) {
console.error('during yield: ', e)
reject(e)
}
})
}
export const alwaysTrue: Nostr['verifyEvent'] = (t: Event): t is VerifiedEvent => {
t[verifiedSymbol] = true
return true

View File

@@ -123,62 +123,3 @@ export function mergeReverseSortedLists(list1: NostrEvent[], list2: NostrEvent[]
return result
}
export class QueueNode<V> {
public value: V
public next: QueueNode<V> | null = null
public prev: QueueNode<V> | null = null
constructor(message: V) {
this.value = message
}
}
export class Queue<V> {
public first: QueueNode<V> | null
public last: QueueNode<V> | null
constructor() {
this.first = null
this.last = null
}
enqueue(value: V): boolean {
const newNode = new QueueNode(value)
if (!this.last) {
// list is empty
this.first = newNode
this.last = newNode
} else if (this.last === this.first) {
// list has a single element
this.last = newNode
this.last.prev = this.first
this.first.next = newNode
} else {
// list has elements, add as last
newNode.prev = this.last
this.last.next = newNode
this.last = newNode
}
return true
}
dequeue(): V | null {
if (!this.first) return null
if (this.first === this.last) {
const target = this.first
this.first = null
this.last = null
return target.value
}
const target = this.first
this.first = target.next
if (this.first) {
this.first.prev = null // fix: clean up prev pointer
}
return target.value
}
}