respondd: first implementation of request queue

This commit is contained in:
lemoer 2016-08-12 01:13:04 +02:00
parent 0a6411b56b
commit d05c2e3813
1 changed files with 92 additions and 10 deletions

View File

@ -41,6 +41,7 @@
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <errno.h>
#include <arpa/inet.h>
#include <net/if.h>
@ -48,6 +49,8 @@
#include <sys/types.h>
#include <sys/socket.h>
#define QUEUE_RING_LEN 8
#define REQUEST_MAXLEN 256
struct provider_list {
struct provider_list *next;
@ -57,6 +60,7 @@ struct provider_list {
};
struct request_type {
// points to the corresponding provider_list for this request in the chained list
struct provider_list *providers;
struct json_object *cache;
@ -64,10 +68,24 @@ struct request_type {
int64_t cache_timeout;
};
struct request_task {
struct sockaddr_storage client_addr;
socklen_t client_addrlen;
char request[REQUEST_MAXLEN];
bool unprocessed;
};
struct request_queue {
struct request_task *pop_task;
struct request_task *push_task;
struct request_task task_ring[QUEUE_RING_LEN];
};
static int64_t now;
static struct hsearch_data htab;
void process_request(int sock, char *input, struct sockaddr_in *addr, socklen_t addrlen);
static struct json_object * merge_json(struct json_object *a, struct json_object *b);
@ -165,6 +183,45 @@ static const struct respondd_provider_info * get_providers(const char *filename)
return ret;
}
bool queue_push_request(struct request_queue *q, char* req,
struct sockaddr *addr, socklen_t addrlen) {
// prevent ringbuffer overflow
if (q->push_task->unprocessed)
return false;
// store the request
strcpy(q->push_task->request, req);
memcpy(&q->push_task->client_addr, addr, addrlen);
q->push_task->client_addrlen = addrlen;
q->push_task->unprocessed = true;
if (q->push_task++ > &q->task_ring[QUEUE_RING_LEN-1])
q->push_task = &q->task_ring[0];
return true;
}
void queue_process_request(struct request_queue *q, int sock) {
struct request_task *current_task = q->pop_task;
if (!current_task->unprocessed)
return;
process_request(
sock,
current_task->request,
(struct sockaddr_in *) &current_task->client_addr,
current_task->client_addrlen
);
current_task->unprocessed = false;
// go on to next task
if (q->pop_task++ > &q->task_ring[QUEUE_RING_LEN-1])
q->pop_task = &q->task_ring[0];
}
static void load_cache_time(struct request_type *r, const char *name) {
r->cache = NULL;
r->cache_time = 0;
@ -320,17 +377,19 @@ static struct json_object * handle_request(char *request, bool *compress) {
}
}
static void serve(int sock) {
char input[256];
const char *output = NULL;
ssize_t input_bytes, output_bytes;
// the return value indicates whether there was a request
static bool serve(struct request_queue *queue, int sock) {
char input[REQUEST_MAXLEN];
ssize_t input_bytes;
struct sockaddr_in6 addr;
socklen_t addrlen = sizeof(addr);
bool compress;
input_bytes = recvfrom(sock, input, sizeof(input)-1, 0, (struct sockaddr *)&addr, &addrlen);
// Timeout
if (input_bytes < 0 && errno == EWOULDBLOCK)
return false;
if (input_bytes < 0) {
perror("recvfrom failed");
exit(EXIT_FAILURE);
@ -338,6 +397,16 @@ static void serve(int sock) {
input[input_bytes] = 0;
// TODO: only multicast requests should be queued
queue_push_request(queue, input, (struct sockaddr *)&addr, addrlen);
return true;
}
void process_request(int sock, char *input, struct sockaddr_in *addr, socklen_t addrlen) {
bool compress;
const char *output = NULL;
size_t output_bytes;
struct json_object *result = handle_request(input, &compress);
if (!result)
return;
@ -361,14 +430,13 @@ static void serve(int sock) {
}
if (output) {
if (sendto(sock, output, output_bytes, 0, (struct sockaddr *)&addr, addrlen) < 0)
if (sendto(sock, output, output_bytes, 0, addr, addrlen) < 0)
perror("sendto failed");
}
json_object_put(result);
}
int main(int argc, char **argv) {
const int one = 1;
@ -388,6 +456,14 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE);
}
struct timeval timeout;
timeout.tv_sec = 2;
timeout.tv_usec = 0;
if (setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,
sizeof(timeout)) < 0)
error("setsockopt failed\n");
server_addr.sin6_family = AF_INET6;
server_addr.sin6_addr = in6addr_any;
@ -443,8 +519,14 @@ int main(int argc, char **argv) {
load_providers();
while (true)
serve(sock);
struct request_queue queue = {};
queue.pop_task = &queue.task_ring[0];
queue.push_task = &queue.task_ring[0];
while (true) {
while(serve(&queue, sock)) {}
queue_process_request(&queue, sock);
}
return EXIT_FAILURE;
}