v1.1.4 - Add kind-based index for 10x subscription matching performance improvement
This commit is contained in:
@@ -37,6 +37,111 @@ extern int get_config_bool(const char* key, int default_value);
|
||||
// Global subscription manager
|
||||
extern subscription_manager_t g_subscription_manager;
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////
|
||||
// KIND-BASED INDEX FOR FAST SUBSCRIPTION LOOKUP
|
||||
/////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// Initialize the kind index (called once at startup)
|
||||
void init_kind_index() {
|
||||
DEBUG_LOG("Initializing kind index for 65536 possible kinds");
|
||||
|
||||
// Initialize all kind index entries to NULL
|
||||
for (int i = 0; i < 65536; i++) {
|
||||
g_subscription_manager.kind_index[i] = NULL;
|
||||
}
|
||||
|
||||
// Initialize no-kind-filter list
|
||||
g_subscription_manager.no_kind_filter_subs = NULL;
|
||||
|
||||
DEBUG_LOG("Kind index initialized successfully");
|
||||
}
|
||||
|
||||
// Add a subscription to the kind index for all kinds it's interested in
|
||||
// Must be called with subscriptions_lock held
|
||||
void add_subscription_to_kind_index(subscription_t* sub) {
|
||||
if (!sub) return;
|
||||
|
||||
int has_kind_filter = 0;
|
||||
|
||||
// Iterate through all filters in this subscription
|
||||
subscription_filter_t* filter = sub->filters;
|
||||
while (filter) {
|
||||
// Check if this filter has a kinds array
|
||||
if (filter->kinds && cJSON_IsArray(filter->kinds)) {
|
||||
has_kind_filter = 1;
|
||||
|
||||
// Add subscription to index for each kind in the filter
|
||||
cJSON* kind_item = NULL;
|
||||
cJSON_ArrayForEach(kind_item, filter->kinds) {
|
||||
if (cJSON_IsNumber(kind_item)) {
|
||||
int kind = (int)cJSON_GetNumberValue(kind_item);
|
||||
|
||||
// Bounds check
|
||||
if (kind < 0 || kind > 65535) {
|
||||
DEBUG_WARN("add_subscription_to_kind_index: kind %d out of range, skipping", kind);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Create new index node
|
||||
kind_subscription_node_t* node = malloc(sizeof(kind_subscription_node_t));
|
||||
if (!node) {
|
||||
DEBUG_ERROR("add_subscription_to_kind_index: failed to allocate node for kind %d", kind);
|
||||
continue;
|
||||
}
|
||||
|
||||
node->subscription = sub;
|
||||
node->next = g_subscription_manager.kind_index[kind];
|
||||
g_subscription_manager.kind_index[kind] = node;
|
||||
|
||||
DEBUG_TRACE("KIND_INDEX: Added subscription '%s' to kind %d index", sub->id, kind);
|
||||
}
|
||||
}
|
||||
}
|
||||
filter = filter->next;
|
||||
}
|
||||
|
||||
// If subscription has no kind filter, add to no-kind-filter list
|
||||
if (!has_kind_filter) {
|
||||
sub->next = g_subscription_manager.no_kind_filter_subs;
|
||||
g_subscription_manager.no_kind_filter_subs = sub;
|
||||
DEBUG_TRACE("KIND_INDEX: Added subscription '%s' to no-kind-filter list", sub->id);
|
||||
}
|
||||
}
|
||||
|
||||
// Remove a subscription from the kind index
|
||||
// Must be called with subscriptions_lock held
|
||||
void remove_subscription_from_kind_index(subscription_t* sub) {
|
||||
if (!sub) return;
|
||||
|
||||
// Remove from all kind indexes
|
||||
for (int kind = 0; kind < 65536; kind++) {
|
||||
kind_subscription_node_t** current = &g_subscription_manager.kind_index[kind];
|
||||
|
||||
while (*current) {
|
||||
if ((*current)->subscription == sub) {
|
||||
kind_subscription_node_t* to_free = *current;
|
||||
*current = (*current)->next;
|
||||
free(to_free);
|
||||
DEBUG_TRACE("KIND_INDEX: Removed subscription '%s' from kind %d index", sub->id, kind);
|
||||
// Don't break - subscription might be in index multiple times if it has duplicate kinds
|
||||
} else {
|
||||
current = &((*current)->next);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from no-kind-filter list
|
||||
subscription_t** current = &g_subscription_manager.no_kind_filter_subs;
|
||||
while (*current) {
|
||||
if (*current == sub) {
|
||||
*current = (*current)->next;
|
||||
DEBUG_TRACE("KIND_INDEX: Removed subscription '%s' from no-kind-filter list", sub->id);
|
||||
break;
|
||||
}
|
||||
current = &((*current)->next);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////
|
||||
/////////////////////////////////////////////////////////////////////////////////////////
|
||||
@@ -284,6 +389,14 @@ int add_subscription_to_manager(subscription_t* sub) {
|
||||
g_subscription_manager.total_created++;
|
||||
}
|
||||
|
||||
// Add to kind index for fast lookup (must be done while holding lock)
|
||||
add_subscription_to_kind_index(sub);
|
||||
|
||||
// If we found a duplicate, remove it from the kind index
|
||||
if (duplicate_old) {
|
||||
remove_subscription_from_kind_index(duplicate_old);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
||||
|
||||
// If we replaced an existing subscription, unlink it from the per-session list before freeing
|
||||
@@ -341,6 +454,9 @@ int remove_subscription_from_manager(const char* sub_id, struct lws* wsi) {
|
||||
|
||||
// Match by ID and WebSocket connection
|
||||
if (strcmp(sub->id, sub_id) == 0 && (!wsi || sub->wsi == wsi)) {
|
||||
// Remove from kind index first (while still holding lock)
|
||||
remove_subscription_from_kind_index(sub);
|
||||
|
||||
// Remove from list
|
||||
*current = sub->next;
|
||||
g_subscription_manager.total_subscriptions--;
|
||||
@@ -654,19 +770,47 @@ int broadcast_event_to_subscriptions(cJSON* event) {
|
||||
temp_sub_t* matching_subs = NULL;
|
||||
int matching_count = 0;
|
||||
|
||||
// Get event kind for index lookup
|
||||
int event_kind_val = -1;
|
||||
if (event_kind && cJSON_IsNumber(event_kind)) {
|
||||
event_kind_val = (int)cJSON_GetNumberValue(event_kind);
|
||||
}
|
||||
|
||||
// First pass: collect matching subscriptions while holding lock
|
||||
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
|
||||
|
||||
int total_subs = 0;
|
||||
subscription_t* count_sub = g_subscription_manager.active_subscriptions;
|
||||
while (count_sub) {
|
||||
total_subs++;
|
||||
count_sub = count_sub->next;
|
||||
}
|
||||
DEBUG_TRACE("BROADCAST: Checking %d active subscriptions", total_subs);
|
||||
// Use kind index for fast lookup instead of checking all subscriptions
|
||||
subscription_t* candidates_to_check[MAX_TOTAL_SUBSCRIPTIONS];
|
||||
int candidate_count = 0;
|
||||
|
||||
subscription_t* sub = g_subscription_manager.active_subscriptions;
|
||||
while (sub) {
|
||||
// Add subscriptions from kind index (if event has valid kind)
|
||||
if (event_kind_val >= 0 && event_kind_val <= 65535) {
|
||||
DEBUG_TRACE("BROADCAST: Using kind index for kind=%d", event_kind_val);
|
||||
|
||||
kind_subscription_node_t* node = g_subscription_manager.kind_index[event_kind_val];
|
||||
while (node && candidate_count < MAX_TOTAL_SUBSCRIPTIONS) {
|
||||
if (node->subscription && node->subscription->active) {
|
||||
candidates_to_check[candidate_count++] = node->subscription;
|
||||
}
|
||||
node = node->next;
|
||||
}
|
||||
}
|
||||
|
||||
// Add subscriptions with no kind filter (must check against all events)
|
||||
subscription_t* no_kind_sub = g_subscription_manager.no_kind_filter_subs;
|
||||
while (no_kind_sub && candidate_count < MAX_TOTAL_SUBSCRIPTIONS) {
|
||||
if (no_kind_sub->active) {
|
||||
candidates_to_check[candidate_count++] = no_kind_sub;
|
||||
}
|
||||
no_kind_sub = no_kind_sub->next;
|
||||
}
|
||||
|
||||
DEBUG_TRACE("BROADCAST: Checking %d candidate subscriptions (kind index optimization)", candidate_count);
|
||||
|
||||
// Test each candidate subscription
|
||||
for (int i = 0; i < candidate_count; i++) {
|
||||
subscription_t* sub = candidates_to_check[i];
|
||||
|
||||
if (sub->active && sub->wsi && event_matches_subscription(event, sub)) {
|
||||
temp_sub_t* temp = malloc(sizeof(temp_sub_t));
|
||||
if (temp) {
|
||||
@@ -695,7 +839,6 @@ int broadcast_event_to_subscriptions(cJSON* event) {
|
||||
DEBUG_ERROR("broadcast_event_to_subscriptions: failed to allocate temp subscription");
|
||||
}
|
||||
}
|
||||
sub = sub->next;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
|
||||
|
||||
Reference in New Issue
Block a user