Adding async publish to relay pool

This commit is contained in:
2025-10-09 09:19:11 -04:00
parent c0784fc890
commit 45fb6d061d
12 changed files with 929 additions and 1626 deletions

View File

@@ -41,6 +41,7 @@
#define NOSTR_POOL_SUBSCRIPTION_ID_SIZE 32
#define NOSTR_POOL_PING_INTERVAL 59 // 59 seconds - keeps connections alive
#define NOSTR_POOL_MAX_PENDING_SUBSCRIPTIONS 8 // Max concurrent subscription timings per relay
#define NOSTR_POOL_MAX_PENDING_PUBLISHES 32 // Max concurrent publish operations
// High-resolution timing helper
static double get_current_time_ms(void) {
@@ -60,6 +61,17 @@ typedef struct subscription_timing {
int active;
} subscription_timing_t;
// Publish operation tracking for async callbacks
typedef struct publish_operation {
char event_id[65]; // Event ID being published
publish_response_callback_t callback; // User callback function
void* user_data; // User data for callback
time_t publish_time; // When publish was initiated
int pending_relay_count; // Number of relays still pending response
char** pending_relay_urls; // URLs of relays still pending
int total_relay_count; // Total number of relays for this publish
} publish_operation_t;
// Internal structures
typedef struct relay_connection {
char* url;
@@ -147,6 +159,10 @@ struct nostr_relay_pool {
nostr_pool_subscription_t* subscriptions[NOSTR_POOL_MAX_SUBSCRIPTIONS];
int subscription_count;
// Active publish operations for async callbacks
publish_operation_t* publish_operations[NOSTR_POOL_MAX_PENDING_PUBLISHES];
int publish_operation_count;
// Pool-wide settings
int default_timeout_ms;
};
@@ -217,6 +233,122 @@ static double remove_subscription_timing(relay_connection_t* relay, const char*
return -1.0; // Not found
}
// Helper functions for managing publish operations
static int add_publish_operation(nostr_relay_pool_t* pool, const char* event_id,
const char** relay_urls, int relay_count,
publish_response_callback_t callback, void* user_data) {
if (!pool || !event_id || !relay_urls || relay_count <= 0) return -1;
// Check if we have space for another operation
if (pool->publish_operation_count >= NOSTR_POOL_MAX_PENDING_PUBLISHES) {
return -1; // No space available
}
// Create new publish operation
publish_operation_t* op = calloc(1, sizeof(publish_operation_t));
if (!op) return -1;
// Copy event ID
strncpy(op->event_id, event_id, sizeof(op->event_id) - 1);
op->event_id[sizeof(op->event_id) - 1] = '\0';
// Set callback and user data
op->callback = callback;
op->user_data = user_data;
op->publish_time = time(NULL);
op->total_relay_count = relay_count;
op->pending_relay_count = relay_count;
// Copy relay URLs
op->pending_relay_urls = malloc(relay_count * sizeof(char*));
if (!op->pending_relay_urls) {
free(op);
return -1;
}
for (int i = 0; i < relay_count; i++) {
op->pending_relay_urls[i] = strdup(relay_urls[i]);
if (!op->pending_relay_urls[i]) {
// Cleanup on failure
for (int j = 0; j < i; j++) {
free(op->pending_relay_urls[j]);
}
free(op->pending_relay_urls);
free(op);
return -1;
}
}
// Add to pool
pool->publish_operations[pool->publish_operation_count++] = op;
return 0;
}
static publish_operation_t* find_publish_operation(nostr_relay_pool_t* pool, const char* event_id) {
if (!pool || !event_id) return NULL;
for (int i = 0; i < pool->publish_operation_count; i++) {
if (pool->publish_operations[i] &&
strcmp(pool->publish_operations[i]->event_id, event_id) == 0) {
return pool->publish_operations[i];
}
}
return NULL;
}
static void remove_publish_operation(nostr_relay_pool_t* pool, const char* event_id) {
if (!pool || !event_id) return;
for (int i = 0; i < pool->publish_operation_count; i++) {
if (pool->publish_operations[i] &&
strcmp(pool->publish_operations[i]->event_id, event_id) == 0) {
publish_operation_t* op = pool->publish_operations[i];
// Free relay URLs (only non-NULL ones)
if (op->pending_relay_urls) {
for (int j = 0; j < op->total_relay_count; j++) {
if (op->pending_relay_urls[j]) {
free(op->pending_relay_urls[j]);
op->pending_relay_urls[j] = NULL;
}
}
free(op->pending_relay_urls);
op->pending_relay_urls = NULL;
}
free(op);
// Shift remaining operations
for (int j = i; j < pool->publish_operation_count - 1; j++) {
pool->publish_operations[j] = pool->publish_operations[j + 1];
}
pool->publish_operations[--pool->publish_operation_count] = NULL;
break;
}
}
}
static int remove_relay_from_publish_operation(publish_operation_t* op, const char* relay_url) {
if (!op || !relay_url) return -1;
for (int i = 0; i < op->pending_relay_count; i++) {
if (op->pending_relay_urls[i] && strcmp(op->pending_relay_urls[i], relay_url) == 0) {
// Free this relay URL
free(op->pending_relay_urls[i]);
op->pending_relay_urls[i] = NULL; // Mark as freed
// Shift remaining URLs
for (int j = i; j < op->pending_relay_count - 1; j++) {
op->pending_relay_urls[j] = op->pending_relay_urls[j + 1];
}
op->pending_relay_urls[op->pending_relay_count - 1] = NULL; // Clear the last slot
op->pending_relay_count--;
return op->pending_relay_count; // Return remaining count
}
}
return -1; // Relay not found
}
// Helper function to ensure relay connection
static int ensure_relay_connection(relay_connection_t* relay) {
if (!relay) {
@@ -559,6 +691,20 @@ void nostr_relay_pool_destroy(nostr_relay_pool_t* pool) {
}
}
// Clean up all pending publish operations
for (int i = 0; i < pool->publish_operation_count; i++) {
if (pool->publish_operations[i]) {
publish_operation_t* op = pool->publish_operations[i];
// Free relay URLs
for (int j = 0; j < op->total_relay_count; j++) {
free(op->pending_relay_urls[j]);
}
free(op->pending_relay_urls);
free(op);
}
}
// Close all relay connections
for (int i = 0; i < pool->relay_count; i++) {
if (pool->relays[i]) {
@@ -902,25 +1048,48 @@ static void process_relay_message(nostr_relay_pool_t* pool, relay_connection_t*
} else if (strcmp(msg_type, "OK") == 0) {
// Handle OK response: ["OK", event_id, true/false, message]
if (cJSON_IsArray(parsed) && cJSON_GetArraySize(parsed) >= 3) {
cJSON* event_id_json = cJSON_GetArrayItem(parsed, 1);
cJSON* success_flag = cJSON_GetArrayItem(parsed, 2);
if (cJSON_IsBool(success_flag)) {
if (cJSON_IsTrue(success_flag)) {
if (cJSON_IsString(event_id_json) && cJSON_IsBool(success_flag)) {
const char* event_id = cJSON_GetStringValue(event_id_json);
int success = cJSON_IsTrue(success_flag);
const char* error_message = NULL;
// Extract error message if available
if (!success && cJSON_GetArraySize(parsed) >= 4) {
cJSON* error_msg = cJSON_GetArrayItem(parsed, 3);
if (cJSON_IsString(error_msg)) {
error_message = cJSON_GetStringValue(error_msg);
}
}
// Update relay statistics
if (success) {
relay->stats.events_published_ok++;
} else {
relay->stats.events_published_failed++;
// Store error message if available
if (cJSON_GetArraySize(parsed) >= 4) {
cJSON* error_msg = cJSON_GetArrayItem(parsed, 3);
if (cJSON_IsString(error_msg)) {
const char* msg = cJSON_GetStringValue(error_msg);
if (msg) {
strncpy(relay->last_publish_error, msg,
sizeof(relay->last_publish_error) - 1);
relay->last_publish_error[sizeof(relay->last_publish_error) - 1] = '\0';
relay->last_publish_error_time = time(NULL);
}
}
// Store error message for legacy API
if (error_message) {
strncpy(relay->last_publish_error, error_message,
sizeof(relay->last_publish_error) - 1);
relay->last_publish_error[sizeof(relay->last_publish_error) - 1] = '\0';
relay->last_publish_error_time = time(NULL);
}
}
// Check for async publish operation callback
publish_operation_t* op = find_publish_operation(pool, event_id);
if (op && op->callback) {
// Call the user's callback
op->callback(relay->url, event_id, success, error_message, op->user_data);
// Remove this relay from the pending list
int remaining = remove_relay_from_publish_operation(op, relay->url);
// If no more relays pending, remove the operation
if (remaining == 0) {
remove_publish_operation(pool, event_id);
}
}
}
@@ -1113,16 +1282,35 @@ cJSON* nostr_relay_pool_get_event(
return result;
}
int nostr_relay_pool_publish(
int nostr_relay_pool_publish_async(
nostr_relay_pool_t* pool,
const char** relay_urls,
int relay_count,
cJSON* event) {
cJSON* event,
publish_response_callback_t callback,
void* user_data) {
if (!pool || !relay_urls || relay_count <= 0 || !event) {
return -1;
}
// Extract event ID for tracking
cJSON* event_id_json = cJSON_GetObjectItem(event, "id");
if (!event_id_json || !cJSON_IsString(event_id_json)) {
return -1; // Event must have an ID
}
const char* event_id = cJSON_GetStringValue(event_id_json);
// Add publish operation for tracking (only if callback provided)
publish_operation_t* op = NULL;
if (callback) {
if (add_publish_operation(pool, event_id, relay_urls, relay_count, callback, user_data) != 0) {
return -1; // Failed to add operation
}
op = find_publish_operation(pool, event_id);
}
int success_count = 0;
for (int i = 0; i < relay_count; i++) {
@@ -1135,55 +1323,35 @@ int nostr_relay_pool_publish(
}
if (relay && ensure_relay_connection(relay) == 0) {
double start_time_ms = get_current_time_ms();
// Send EVENT message
if (nostr_relay_send_event(relay->ws_client, event) >= 0) {
relay->stats.events_published++;
// Wait for OK response
char buffer[1024];
time_t wait_start = time(NULL);
int got_response = 0;
while (time(NULL) - wait_start < 5 && !got_response) { // 5 second timeout
int len = nostr_ws_receive(relay->ws_client, buffer, sizeof(buffer) - 1, 1000);
if (len > 0) {
buffer[len] = '\0';
char* msg_type = NULL;
cJSON* parsed = NULL;
if (nostr_parse_relay_message(buffer, &msg_type, &parsed) == 0) {
if (msg_type && strcmp(msg_type, "OK") == 0) {
// Handle OK response
if (cJSON_IsArray(parsed) && cJSON_GetArraySize(parsed) >= 3) {
cJSON* success_flag = cJSON_GetArrayItem(parsed, 2);
if (cJSON_IsBool(success_flag) && cJSON_IsTrue(success_flag)) {
success_count++;
relay->stats.events_published_ok++;
// Update publish latency statistics
double latency_ms = get_current_time_ms() - start_time_ms;
if (relay->stats.publish_samples == 0) {
relay->stats.publish_latency_avg = latency_ms;
} else {
relay->stats.publish_latency_avg =
(relay->stats.publish_latency_avg * relay->stats.publish_samples + latency_ms) /
(relay->stats.publish_samples + 1);
}
relay->stats.publish_samples++;
} else {
relay->stats.events_published_failed++;
}
}
got_response = 1;
}
if (msg_type) free(msg_type);
if (parsed) cJSON_Delete(parsed);
}
success_count++;
} else {
// If send failed and we have a callback, notify immediately
if (callback && op) {
callback(relay_urls[i], event_id, 0, "Failed to send event to relay", user_data);
// Remove this relay from the pending operation
int remaining = remove_relay_from_publish_operation(op, relay_urls[i]);
if (remaining == 0) {
remove_publish_operation(pool, event_id);
op = NULL; // Mark as removed to prevent double-free
}
}
}
} else {
// Connection failed - notify callback immediately if provided
if (callback && op) {
callback(relay_urls[i], event_id, 0, "Failed to connect to relay", user_data);
// Remove this relay from the pending operation
int remaining = remove_relay_from_publish_operation(op, relay_urls[i]);
if (remaining == 0) {
remove_publish_operation(pool, event_id);
op = NULL; // Mark as removed to prevent double-free
}
}
}
}

View File

@@ -288,11 +288,23 @@ cJSON* nostr_relay_pool_get_event(
int relay_count,
cJSON* filter,
int timeout_ms);
int nostr_relay_pool_publish(
// Async publish callback typedef
typedef void (*publish_response_callback_t)(
const char* relay_url,
const char* event_id,
int success, // 1 for OK, 0 for rejection
const char* message, // Error message if rejected, NULL if success
void* user_data
);
// Async publish function (only async version available)
int nostr_relay_pool_publish_async(
nostr_relay_pool_t* pool,
const char** relay_urls,
int relay_count,
cJSON* event);
cJSON* event,
publish_response_callback_t callback,
void* user_data);
// Status and statistics functions
nostr_pool_relay_status_t nostr_relay_pool_get_relay_status(