diff --git a/net/respondd/src/respondd.c b/net/respondd/src/respondd.c index e3242da..b16cd4d 100644 --- a/net/respondd/src/respondd.c +++ b/net/respondd/src/respondd.c @@ -41,6 +41,7 @@ #include #include #include +#include #include #include @@ -48,6 +49,8 @@ #include #include +#define QUEUE_RING_LEN 8 +#define REQUEST_MAXLEN 256 struct provider_list { struct provider_list *next; @@ -57,6 +60,7 @@ struct provider_list { }; struct request_type { + // points to the corresponding provider_list for this request in the chained list struct provider_list *providers; struct json_object *cache; @@ -64,10 +68,24 @@ struct request_type { int64_t cache_timeout; }; +struct request_task { + struct sockaddr_storage client_addr; + socklen_t client_addrlen; + char request[REQUEST_MAXLEN]; + bool unprocessed; +}; + +struct request_queue { + struct request_task *pop_task; + struct request_task *push_task; + struct request_task task_ring[QUEUE_RING_LEN]; +}; static int64_t now; static struct hsearch_data htab; +void process_request(int sock, char *input, struct sockaddr_in *addr, socklen_t addrlen); + static struct json_object * merge_json(struct json_object *a, struct json_object *b); @@ -165,6 +183,45 @@ static const struct respondd_provider_info * get_providers(const char *filename) return ret; } +bool queue_push_request(struct request_queue *q, char* req, + struct sockaddr *addr, socklen_t addrlen) { + + // prevent ringbuffer overflow + if (q->push_task->unprocessed) + return false; + + // store the request + strcpy(q->push_task->request, req); + memcpy(&q->push_task->client_addr, addr, addrlen); + q->push_task->client_addrlen = addrlen; + q->push_task->unprocessed = true; + + if (q->push_task++ > &q->task_ring[QUEUE_RING_LEN-1]) + q->push_task = &q->task_ring[0]; + + return true; +} + +void queue_process_request(struct request_queue *q, int sock) { + struct request_task *current_task = q->pop_task; + + if (!current_task->unprocessed) + return; + + process_request( + sock, + current_task->request, + (struct sockaddr_in *) ¤t_task->client_addr, + current_task->client_addrlen + ); + + current_task->unprocessed = false; + + // go on to next task + if (q->pop_task++ > &q->task_ring[QUEUE_RING_LEN-1]) + q->pop_task = &q->task_ring[0]; +} + static void load_cache_time(struct request_type *r, const char *name) { r->cache = NULL; r->cache_time = 0; @@ -320,17 +377,19 @@ static struct json_object * handle_request(char *request, bool *compress) { } } - -static void serve(int sock) { - char input[256]; - const char *output = NULL; - ssize_t input_bytes, output_bytes; +// the return value indicates whether there was a request +static bool serve(struct request_queue *queue, int sock) { + char input[REQUEST_MAXLEN]; + ssize_t input_bytes; struct sockaddr_in6 addr; socklen_t addrlen = sizeof(addr); - bool compress; input_bytes = recvfrom(sock, input, sizeof(input)-1, 0, (struct sockaddr *)&addr, &addrlen); + // Timeout + if (input_bytes < 0 && errno == EWOULDBLOCK) + return false; + if (input_bytes < 0) { perror("recvfrom failed"); exit(EXIT_FAILURE); @@ -338,6 +397,16 @@ static void serve(int sock) { input[input_bytes] = 0; + // TODO: only multicast requests should be queued + queue_push_request(queue, input, (struct sockaddr *)&addr, addrlen); + return true; +} + +void process_request(int sock, char *input, struct sockaddr_in *addr, socklen_t addrlen) { + bool compress; + const char *output = NULL; + size_t output_bytes; + struct json_object *result = handle_request(input, &compress); if (!result) return; @@ -361,14 +430,13 @@ static void serve(int sock) { } if (output) { - if (sendto(sock, output, output_bytes, 0, (struct sockaddr *)&addr, addrlen) < 0) + if (sendto(sock, output, output_bytes, 0, addr, addrlen) < 0) perror("sendto failed"); } json_object_put(result); } - int main(int argc, char **argv) { const int one = 1; @@ -388,6 +456,14 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } + struct timeval timeout; + timeout.tv_sec = 2; + timeout.tv_usec = 0; + + if (setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, + sizeof(timeout)) < 0) + error("setsockopt failed\n"); + server_addr.sin6_family = AF_INET6; server_addr.sin6_addr = in6addr_any; @@ -443,8 +519,14 @@ int main(int argc, char **argv) { load_providers(); - while (true) - serve(sock); + struct request_queue queue = {}; + queue.pop_task = &queue.task_ring[0]; + queue.push_task = &queue.task_ring[0]; + + while (true) { + while(serve(&queue, sock)) {} + queue_process_request(&queue, sock); + } return EXIT_FAILURE; }