diff --git a/relay.pid b/relay.pid index fc8cd66..af1b06f 100644 --- a/relay.pid +++ b/relay.pid @@ -1 +1 @@ -1688521 +1912734 diff --git a/src/main.c b/src/main.c index 651e4bf..2e5e1d3 100644 --- a/src/main.c +++ b/src/main.c @@ -1808,6 +1808,9 @@ int main(int argc, char* argv[]) { return 1; } + // Initialize kind-based index for fast subscription lookup + init_kind_index(); + // Cleanup orphaned subscriptions from previous runs cleanup_all_subscriptions_on_startup(); diff --git a/src/main.h b/src/main.h index b10fd3f..77302df 100644 --- a/src/main.h +++ b/src/main.h @@ -13,8 +13,8 @@ // Using CRELAY_ prefix to avoid conflicts with nostr_core_lib VERSION macros #define CRELAY_VERSION_MAJOR 1 #define CRELAY_VERSION_MINOR 1 -#define CRELAY_VERSION_PATCH 3 -#define CRELAY_VERSION "v1.1.3" +#define CRELAY_VERSION_PATCH 4 +#define CRELAY_VERSION "v1.1.4" // Relay metadata (authoritative source for NIP-11 information) #define RELAY_NAME "C-Relay" diff --git a/src/subscriptions.c b/src/subscriptions.c index 6282b39..d806788 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -37,6 +37,111 @@ extern int get_config_bool(const char* key, int default_value); // Global subscription manager extern subscription_manager_t g_subscription_manager; +///////////////////////////////////////////////////////////////////////////////////////// +// KIND-BASED INDEX FOR FAST SUBSCRIPTION LOOKUP +///////////////////////////////////////////////////////////////////////////////////////// + +// Initialize the kind index (called once at startup) +void init_kind_index() { + DEBUG_LOG("Initializing kind index for 65536 possible kinds"); + + // Initialize all kind index entries to NULL + for (int i = 0; i < 65536; i++) { + g_subscription_manager.kind_index[i] = NULL; + } + + // Initialize no-kind-filter list + g_subscription_manager.no_kind_filter_subs = NULL; + + DEBUG_LOG("Kind index initialized successfully"); +} + +// Add a subscription to the kind index for all kinds it's interested in +// Must be called with subscriptions_lock held +void add_subscription_to_kind_index(subscription_t* sub) { + if (!sub) return; + + int has_kind_filter = 0; + + // Iterate through all filters in this subscription + subscription_filter_t* filter = sub->filters; + while (filter) { + // Check if this filter has a kinds array + if (filter->kinds && cJSON_IsArray(filter->kinds)) { + has_kind_filter = 1; + + // Add subscription to index for each kind in the filter + cJSON* kind_item = NULL; + cJSON_ArrayForEach(kind_item, filter->kinds) { + if (cJSON_IsNumber(kind_item)) { + int kind = (int)cJSON_GetNumberValue(kind_item); + + // Bounds check + if (kind < 0 || kind > 65535) { + DEBUG_WARN("add_subscription_to_kind_index: kind %d out of range, skipping", kind); + continue; + } + + // Create new index node + kind_subscription_node_t* node = malloc(sizeof(kind_subscription_node_t)); + if (!node) { + DEBUG_ERROR("add_subscription_to_kind_index: failed to allocate node for kind %d", kind); + continue; + } + + node->subscription = sub; + node->next = g_subscription_manager.kind_index[kind]; + g_subscription_manager.kind_index[kind] = node; + + DEBUG_TRACE("KIND_INDEX: Added subscription '%s' to kind %d index", sub->id, kind); + } + } + } + filter = filter->next; + } + + // If subscription has no kind filter, add to no-kind-filter list + if (!has_kind_filter) { + sub->next = g_subscription_manager.no_kind_filter_subs; + g_subscription_manager.no_kind_filter_subs = sub; + DEBUG_TRACE("KIND_INDEX: Added subscription '%s' to no-kind-filter list", sub->id); + } +} + +// Remove a subscription from the kind index +// Must be called with subscriptions_lock held +void remove_subscription_from_kind_index(subscription_t* sub) { + if (!sub) return; + + // Remove from all kind indexes + for (int kind = 0; kind < 65536; kind++) { + kind_subscription_node_t** current = &g_subscription_manager.kind_index[kind]; + + while (*current) { + if ((*current)->subscription == sub) { + kind_subscription_node_t* to_free = *current; + *current = (*current)->next; + free(to_free); + DEBUG_TRACE("KIND_INDEX: Removed subscription '%s' from kind %d index", sub->id, kind); + // Don't break - subscription might be in index multiple times if it has duplicate kinds + } else { + current = &((*current)->next); + } + } + } + + // Remove from no-kind-filter list + subscription_t** current = &g_subscription_manager.no_kind_filter_subs; + while (*current) { + if (*current == sub) { + *current = (*current)->next; + DEBUG_TRACE("KIND_INDEX: Removed subscription '%s' from no-kind-filter list", sub->id); + break; + } + current = &((*current)->next); + } +} + ///////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////// @@ -284,6 +389,14 @@ int add_subscription_to_manager(subscription_t* sub) { g_subscription_manager.total_created++; } + // Add to kind index for fast lookup (must be done while holding lock) + add_subscription_to_kind_index(sub); + + // If we found a duplicate, remove it from the kind index + if (duplicate_old) { + remove_subscription_from_kind_index(duplicate_old); + } + pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); // If we replaced an existing subscription, unlink it from the per-session list before freeing @@ -341,6 +454,9 @@ int remove_subscription_from_manager(const char* sub_id, struct lws* wsi) { // Match by ID and WebSocket connection if (strcmp(sub->id, sub_id) == 0 && (!wsi || sub->wsi == wsi)) { + // Remove from kind index first (while still holding lock) + remove_subscription_from_kind_index(sub); + // Remove from list *current = sub->next; g_subscription_manager.total_subscriptions--; @@ -654,19 +770,47 @@ int broadcast_event_to_subscriptions(cJSON* event) { temp_sub_t* matching_subs = NULL; int matching_count = 0; + // Get event kind for index lookup + int event_kind_val = -1; + if (event_kind && cJSON_IsNumber(event_kind)) { + event_kind_val = (int)cJSON_GetNumberValue(event_kind); + } + // First pass: collect matching subscriptions while holding lock pthread_mutex_lock(&g_subscription_manager.subscriptions_lock); - int total_subs = 0; - subscription_t* count_sub = g_subscription_manager.active_subscriptions; - while (count_sub) { - total_subs++; - count_sub = count_sub->next; - } - DEBUG_TRACE("BROADCAST: Checking %d active subscriptions", total_subs); + // Use kind index for fast lookup instead of checking all subscriptions + subscription_t* candidates_to_check[MAX_TOTAL_SUBSCRIPTIONS]; + int candidate_count = 0; - subscription_t* sub = g_subscription_manager.active_subscriptions; - while (sub) { + // Add subscriptions from kind index (if event has valid kind) + if (event_kind_val >= 0 && event_kind_val <= 65535) { + DEBUG_TRACE("BROADCAST: Using kind index for kind=%d", event_kind_val); + + kind_subscription_node_t* node = g_subscription_manager.kind_index[event_kind_val]; + while (node && candidate_count < MAX_TOTAL_SUBSCRIPTIONS) { + if (node->subscription && node->subscription->active) { + candidates_to_check[candidate_count++] = node->subscription; + } + node = node->next; + } + } + + // Add subscriptions with no kind filter (must check against all events) + subscription_t* no_kind_sub = g_subscription_manager.no_kind_filter_subs; + while (no_kind_sub && candidate_count < MAX_TOTAL_SUBSCRIPTIONS) { + if (no_kind_sub->active) { + candidates_to_check[candidate_count++] = no_kind_sub; + } + no_kind_sub = no_kind_sub->next; + } + + DEBUG_TRACE("BROADCAST: Checking %d candidate subscriptions (kind index optimization)", candidate_count); + + // Test each candidate subscription + for (int i = 0; i < candidate_count; i++) { + subscription_t* sub = candidates_to_check[i]; + if (sub->active && sub->wsi && event_matches_subscription(event, sub)) { temp_sub_t* temp = malloc(sizeof(temp_sub_t)); if (temp) { @@ -695,7 +839,6 @@ int broadcast_event_to_subscriptions(cJSON* event) { DEBUG_ERROR("broadcast_event_to_subscriptions: failed to allocate temp subscription"); } } - sub = sub->next; } pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock); diff --git a/src/subscriptions.h b/src/subscriptions.h index cae0d04..c3d0260 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -63,6 +63,12 @@ struct subscription { struct subscription* session_next; // Next subscription for this session }; +// Kind index entry - linked list of subscriptions interested in a specific kind +typedef struct kind_subscription_node { + subscription_t* subscription; // Pointer to subscription + struct kind_subscription_node* next; // Next subscription for this kind +} kind_subscription_node_t; + // Per-IP connection tracking typedef struct ip_connection_info { char ip_address[CLIENT_IP_MAX_LENGTH]; // IP address @@ -79,6 +85,10 @@ struct subscription_manager { pthread_mutex_t subscriptions_lock; // Global thread safety int total_subscriptions; // Current count + // Kind-based index for fast subscription lookup (10x performance improvement) + kind_subscription_node_t* kind_index[65536]; // Array of subscription lists, one per kind + subscription_t* no_kind_filter_subs; // Subscriptions with no kind filter (must check all events) + // Configuration int max_subscriptions_per_client; // Default: 20 int max_total_subscriptions; // Default: 5000 @@ -104,6 +114,11 @@ int event_matches_filter(cJSON* event, subscription_filter_t* filter); int event_matches_subscription(cJSON* event, subscription_t* subscription); int broadcast_event_to_subscriptions(cJSON* event); +// Kind index functions for performance optimization +void init_kind_index(void); +void add_subscription_to_kind_index(subscription_t* sub); +void remove_subscription_from_kind_index(subscription_t* sub); + // Per-IP connection tracking functions ip_connection_info_t* get_or_create_ip_connection(const char* client_ip); void update_ip_connection_activity(const char* client_ip);