v0.7.33 - Refactor monitoring system to use subscription-based activation with ephemeral events - fixes recursive crash bug

This commit is contained in:
Your Name
2025-10-19 10:26:09 -04:00
parent 57a0089664
commit 9cb9b746d8
11 changed files with 112 additions and 260 deletions

139
src/api.c
View File

@@ -40,28 +40,14 @@ const char* get_config_value(const char* key);
int get_config_bool(const char* key, int default_value);
int update_config_in_table(const char* key, const char* value);
// Monitoring system state
static time_t last_report_time = 0;
// Monitoring system state (throttling now handled per-function)
// Forward declaration for monitoring helper function
int generate_monitoring_event_for_type(const char* d_tag_value, cJSON* (*query_func)(void));
// Monitoring system helper functions
int is_monitoring_enabled(void) {
return get_config_bool("kind_34567_reporting_enabled", 0);
}
int get_monitoring_throttle_seconds(void) {
return get_config_int("kind_34567_reporting_throttling_sec", 5);
}
int set_monitoring_enabled(int enabled) {
const char* value = enabled ? "1" : "0";
if (update_config_in_table("kind_34567_reporting_enabled", value) == 0) {
DEBUG_INFO("Monitoring enabled state changed");
return 0;
}
return -1;
return get_config_int("kind_24567_reporting_throttle_sec", 5);
}
// Query event kind distribution from database
@@ -459,12 +445,12 @@ int generate_monitoring_event_for_type(const char* d_tag_value, cJSON* (*query_f
}
free(relay_privkey_hex);
// Create monitoring event (kind 34567)
// Create monitoring event (kind 24567 - ephemeral)
cJSON* monitoring_event = cJSON_CreateObject();
cJSON_AddStringToObject(monitoring_event, "id", ""); // Will be set by signing
cJSON_AddStringToObject(monitoring_event, "pubkey", relay_pubkey);
cJSON_AddNumberToObject(monitoring_event, "created_at", (double)time(NULL));
cJSON_AddNumberToObject(monitoring_event, "kind", 34567);
cJSON_AddNumberToObject(monitoring_event, "kind", 24567);
cJSON_AddStringToObject(monitoring_event, "content", content_json);
// Create tags array with d tag for identification
@@ -480,7 +466,7 @@ int generate_monitoring_event_for_type(const char* d_tag_value, cJSON* (*query_f
// Use the library function to create and sign the event
cJSON* signed_event = nostr_create_and_sign_event(
34567, // kind
24567, // kind (ephemeral)
cJSON_GetStringValue(cJSON_GetObjectItem(monitoring_event, "content")), // content
tags, // tags
relay_privkey, // private key
@@ -498,55 +484,36 @@ int generate_monitoring_event_for_type(const char* d_tag_value, cJSON* (*query_f
cJSON_Delete(monitoring_event);
monitoring_event = signed_event;
// Broadcast the event to active subscriptions
// Broadcast the ephemeral event to active subscriptions (no database storage)
broadcast_event_to_subscriptions(monitoring_event);
// Store in database
int store_result = store_event(monitoring_event);
cJSON_Delete(monitoring_event);
free(content_json);
if (store_result != 0) {
DEBUG_ERROR("Failed to store monitoring event (%s)", d_tag_value);
return -1;
}
DEBUG_LOG("Monitoring event broadcast (ephemeral kind 24567, type: %s)", d_tag_value);
return 0;
}
// Monitoring hook called when an event is stored
void monitoring_on_event_stored(void) {
// Check if monitoring is enabled
if (!is_monitoring_enabled()) {
// Check throttling first (cheapest check)
static time_t last_monitoring_time = 0;
time_t current_time = time(NULL);
int throttle_seconds = get_monitoring_throttle_seconds();
if (current_time - last_monitoring_time < throttle_seconds) {
return;
}
// Check throttling
time_t now = time(NULL);
int throttle_seconds = get_monitoring_throttle_seconds();
if (now - last_report_time < throttle_seconds) {
return; // Too soon, skip this report
// Check if anyone is subscribed to monitoring events (kind 24567)
// This is the ONLY activation check needed - if someone subscribes, they want monitoring
if (!has_subscriptions_for_kind(24567)) {
return; // No subscribers = no expensive operations
}
// Generate and broadcast monitoring event
if (generate_monitoring_event() == 0) {
last_report_time = now;
}
}
// Initialize monitoring system
int init_monitoring_system(void) {
last_report_time = 0;
DEBUG_INFO("Monitoring system initialized");
return 0;
}
// Cleanup monitoring system
void cleanup_monitoring_system(void) {
// No cleanup needed for monitoring system
DEBUG_INFO("Monitoring system cleaned up");
// Generate monitoring events only when someone is listening
last_monitoring_time = current_time;
generate_monitoring_event();
}
// Forward declaration for known_configs (defined in config.c)
@@ -2265,24 +2232,8 @@ int handle_monitoring_command(cJSON* event, const char* command, char* error_mes
if (*p >= 'A' && *p <= 'Z') *p = *p + 32;
}
// Handle commands
if (strcmp(cmd, "enable_monitoring") == 0) {
if (set_monitoring_enabled(1) == 0) {
char* response_content = "✅ Monitoring enabled\n\nReal-time monitoring events will now be generated.";
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
} else {
char* response_content = "❌ Failed to enable monitoring";
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
}
} else if (strcmp(cmd, "disable_monitoring") == 0) {
if (set_monitoring_enabled(0) == 0) {
char* response_content = "✅ Monitoring disabled\n\nReal-time monitoring events will no longer be generated.";
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
} else {
char* response_content = "❌ Failed to disable monitoring";
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
}
} else if (strcmp(cmd, "set_monitoring_throttle") == 0) {
// Handle set_monitoring_throttle command (only remaining monitoring command)
if (strcmp(cmd, "set_monitoring_throttle") == 0) {
if (arg[0] == '\0') {
char* response_content = "❌ Missing throttle value\n\nUsage: set_monitoring_throttle <seconds>";
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
@@ -2298,44 +2249,28 @@ int handle_monitoring_command(cJSON* event, const char* command, char* error_mes
char throttle_str[16];
snprintf(throttle_str, sizeof(throttle_str), "%ld", throttle_seconds);
if (update_config_in_table("kind_34567_reporting_throttling_sec", throttle_str) == 0) {
if (update_config_in_table("kind_24567_reporting_throttle_sec", throttle_str) == 0) {
char response_content[256];
snprintf(response_content, sizeof(response_content),
"✅ Monitoring throttle updated\n\nMinimum interval between monitoring events: %ld seconds", throttle_seconds);
"✅ Monitoring throttle updated\n\n"
"Minimum interval between monitoring events: %ld seconds\n\n"
" Monitoring activates automatically when you subscribe to kind 24567 events.",
throttle_seconds);
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
} else {
char* response_content = "❌ Failed to update monitoring throttle";
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
}
} else if (strcmp(cmd, "monitoring_status") == 0) {
int enabled = is_monitoring_enabled();
int throttle = get_monitoring_throttle_seconds();
} else {
char response_content[512];
snprintf(response_content, sizeof(response_content),
"📊 Monitoring Status\n"
"━━━━━━━━━━━━━━━━━━━━\n"
"\n"
"Enabled: %s\n"
"Throttle: %d seconds\n"
"\n"
"Commands:\n"
"• enable_monitoring\n"
"• disable_monitoring\n"
"• set_monitoring_throttle <seconds>\n"
"• monitoring_status",
enabled ? "Yes" : "No", throttle);
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
} else {
char response_content[256];
snprintf(response_content, sizeof(response_content),
"❌ Unknown monitoring command: %s\n\n"
"Available commands:\n"
"enable_monitoring\n"
"• disable_monitoring\n"
"• set_monitoring_throttle <seconds>\n"
"• monitoring_status", cmd);
"Available command:\n"
"set_monitoring_throttle <seconds>\n\n"
" Monitoring is now subscription-based:\n"
"Subscribe to kind 24567 events to receive real-time monitoring data.\n"
"Monitoring automatically activates when subscriptions exist and deactivates when they close.",
cmd);
return send_admin_response(sender_pubkey, response_content, request_id, error_message, error_size, wsi);
}
}
}

View File

@@ -60,11 +60,7 @@ char* execute_sql_query(const char* query, const char* request_id, char* error_m
int handle_sql_query_unified(cJSON* event, const char* query, char* error_message, size_t error_size, struct lws* wsi);
// Monitoring system functions
int init_monitoring_system(void);
void cleanup_monitoring_system(void);
void monitoring_on_event_stored(void);
int set_monitoring_enabled(int enabled);
int is_monitoring_enabled(void);
int get_monitoring_throttle_seconds(void);
#endif // API_H

View File

@@ -4112,32 +4112,18 @@ int populate_all_config_values_atomic(const char* admin_pubkey, const char* rela
return -1;
}
// Insert monitoring system config entries
// Insert monitoring system config entry (ephemeral kind 24567)
// Note: Monitoring is automatically activated when clients subscribe to kind 24567
sqlite3_reset(stmt);
sqlite3_bind_text(stmt, 1, "kind_34567_reporting_enabled", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 2, "false", -1, SQLITE_STATIC); // boolean, default false
sqlite3_bind_text(stmt, 3, "boolean", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 4, "Enable real-time monitoring event generation", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 5, "monitoring", -1, SQLITE_STATIC);
sqlite3_bind_int(stmt, 6, 0); // does not require restart
rc = sqlite3_step(stmt);
if (rc != SQLITE_DONE) {
DEBUG_ERROR("Failed to insert kind_34567_reporting_enabled: %s", sqlite3_errmsg(g_db));
sqlite3_finalize(stmt);
sqlite3_exec(g_db, "ROLLBACK;", NULL, NULL, NULL);
return -1;
}
sqlite3_reset(stmt);
sqlite3_bind_text(stmt, 1, "kind_34567_reporting_throttling_sec", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 1, "kind_24567_reporting_throttle_sec", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 2, "5", -1, SQLITE_STATIC); // integer, default 5 seconds
sqlite3_bind_text(stmt, 3, "integer", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 4, "Minimum seconds between monitoring event reports", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 4, "Minimum seconds between monitoring event reports (ephemeral kind 24567)", -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 5, "monitoring", -1, SQLITE_STATIC);
sqlite3_bind_int(stmt, 6, 0); // does not require restart
rc = sqlite3_step(stmt);
if (rc != SQLITE_DONE) {
DEBUG_ERROR("Failed to insert kind_34567_reporting_throttling_sec: %s", sqlite3_errmsg(g_db));
DEBUG_ERROR("Failed to insert kind_24567_reporting_throttle_sec: %s", sqlite3_errmsg(g_db));
sqlite3_finalize(stmt);
sqlite3_exec(g_db, "ROLLBACK;", NULL, NULL, NULL);
return -1;

File diff suppressed because one or more lines are too long

View File

@@ -149,9 +149,7 @@ int mark_event_as_deleted(const char* event_id, const char* deletion_event_id, c
// Forward declaration for database functions
int store_event(cJSON* event);
// Forward declarations for monitoring system
void init_monitoring_system(void);
void cleanup_monitoring_system(void);
// Forward declaration for monitoring system
void monitoring_on_event_stored(void);
// Forward declarations for NIP-11 relay information handling
@@ -1989,9 +1987,6 @@ int main(int argc, char* argv[]) {
// Initialize NIP-40 expiration configuration
init_expiration_config();
// Initialize monitoring system
init_monitoring_system();
// Update subscription manager configuration
update_subscription_manager_config();
@@ -2023,9 +2018,6 @@ int main(int argc, char* argv[]) {
ginxsom_request_validator_cleanup();
cleanup_configuration_system();
// Cleanup monitoring system
cleanup_monitoring_system();
// Cleanup subscription manager mutexes
pthread_mutex_destroy(&g_subscription_manager.subscriptions_lock);
pthread_mutex_destroy(&g_subscription_manager.ip_tracking_lock);

View File

@@ -10,10 +10,10 @@
#define MAIN_H
// Version information (auto-updated by build system)
#define VERSION "v0.7.32"
#define VERSION "v0.7.33"
#define VERSION_MAJOR 0
#define VERSION_MINOR 7
#define VERSION_PATCH 32
#define VERSION_PATCH 33
// Relay metadata (authoritative source for NIP-11 information)
#define RELAY_NAME "C-Relay"

View File

@@ -664,6 +664,38 @@ int broadcast_event_to_subscriptions(cJSON* event) {
return broadcasts;
}
// Check if any active subscription exists for a specific event kind (thread-safe)
int has_subscriptions_for_kind(int event_kind) {
pthread_mutex_lock(&g_subscription_manager.subscriptions_lock);
subscription_t* sub = g_subscription_manager.active_subscriptions;
while (sub) {
if (sub->active && sub->filters) {
subscription_filter_t* filter = sub->filters;
while (filter) {
// Check if this filter includes our event kind
if (filter->kinds && cJSON_IsArray(filter->kinds)) {
cJSON* kind_item = NULL;
cJSON_ArrayForEach(kind_item, filter->kinds) {
if (cJSON_IsNumber(kind_item)) {
int filter_kind = (int)cJSON_GetNumberValue(kind_item);
if (filter_kind == event_kind) {
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
return 1; // Found matching subscription
}
}
}
}
filter = filter->next;
}
}
sub = sub->next;
}
pthread_mutex_unlock(&g_subscription_manager.subscriptions_lock);
return 0; // No matching subscriptions
}
/////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////

View File

@@ -118,4 +118,7 @@ void log_subscription_disconnected(const char* client_ip);
void log_event_broadcast(const char* event_id, const char* sub_id, const char* client_ip);
void update_subscription_events_sent(const char* sub_id, int events_sent);
// Subscription query functions
int has_subscriptions_for_kind(int event_kind);
#endif // SUBSCRIPTIONS_H