respondd: replace queue with schedule

This commit is contained in:
lemoer 2016-08-12 23:22:36 +02:00
parent 949e3dd454
commit be55c1926f
1 changed files with 47 additions and 31 deletions

View File

@ -69,16 +69,17 @@ struct request_type {
}; };
struct request_task { struct request_task {
struct request_task *next;
int64_t scheduled_time;
struct sockaddr_storage client_addr; struct sockaddr_storage client_addr;
socklen_t client_addrlen; socklen_t client_addrlen;
char request[REQUEST_MAXLEN]; char request[REQUEST_MAXLEN];
bool unprocessed;
}; };
struct request_queue { struct request_queue {
struct request_task *pop_task; int length;
struct request_task *push_task; struct request_task *list_head;
struct request_task task_ring[QUEUE_RING_LEN];
}; };
static int64_t now; static int64_t now;
@ -184,38 +185,53 @@ static const struct respondd_provider_info * get_providers(const char *filename)
} }
bool queue_push_request(struct request_queue *q, char* req, bool queue_push_request(struct request_queue *q, char* req,
struct sockaddr *addr, socklen_t addrlen) { struct sockaddr *addr, socklen_t addrlen,
int64_t scheduled_time) {
// prevent ringbuffer overflow // TODO: rename QUEUE_RING_LEN
if (q->push_task->unprocessed) if (q->length >= QUEUE_RING_LEN)
// queue is full
return false; return false;
struct request_task *new_task = malloc(sizeof(struct request_task));
// store the request // store the request
strcpy(q->push_task->request, req); new_task->scheduled_time = scheduled_time;
memcpy(&q->push_task->client_addr, addr, addrlen); strcpy(new_task->request, req);
q->push_task->client_addrlen = addrlen; memcpy(&new_task->client_addr, addr, addrlen);
q->push_task->unprocessed = true; new_task->client_addrlen = addrlen;
if (++q->push_task > &q->task_ring[QUEUE_RING_LEN-1]) if (!q->list_head || q->list_head->scheduled_time > new_task->scheduled_time) {
q->push_task = &q->task_ring[0]; new_task->next = q->list_head;
q->list_head = new_task;
} else {
struct request_task *t;
for (t = q->list_head; t && t->next; t = t->next) {
if (t->next->scheduled_time > new_task->scheduled_time) {
break;
}
}
new_task->next = t->next;
t->next = new_task;
}
return true; q->length++;
} }
// the returned task is already set as processed // the returned task is already set as processed
struct request_task * queue_pop_request(struct request_queue *q) { struct request_task * queue_pop_request(struct request_queue *q) {
struct request_task *current_task = q->pop_task; if (!q->list_head)
// queue is empty
if (!current_task->unprocessed)
return NULL; return NULL;
current_task->unprocessed = false; if (q->list_head->scheduled_time > now) {
// nothing to do yet
return NULL;
}
// go on to next task struct request_task *result = q->list_head;
if (++q->pop_task > &q->task_ring[QUEUE_RING_LEN-1]) q->list_head = q->list_head->next;
q->pop_task = &q->task_ring[0]; q->length--;
return current_task; return result;
} }
static void load_cache_time(struct request_type *r, const char *name) { static void load_cache_time(struct request_type *r, const char *name) {
@ -361,8 +377,6 @@ static struct json_object * handle_request(char *request, bool *compress) {
if (!*request) if (!*request)
return NULL; return NULL;
update_time();
if (!strncmp(request, "GET ", 4)) { if (!strncmp(request, "GET ", 4)) {
*compress = true; *compress = true;
return multi_request(request+4); return multi_request(request+4);
@ -394,7 +408,7 @@ static bool accept_request(struct request_queue *queue, int sock) {
input[input_bytes] = 0; input[input_bytes] = 0;
// TODO: only multicast requests should be queued // TODO: only multicast requests should be queued
queue_push_request(queue, input, (struct sockaddr *)&addr, addrlen); queue_push_request(queue, input, (struct sockaddr *)&addr, addrlen, now + 5000);
return true; return true;
} }
@ -448,6 +462,8 @@ void serve_request(struct request_queue *queue, int sock) {
(struct sockaddr_in *) &task->client_addr, (struct sockaddr_in *) &task->client_addr,
task->client_addrlen task->client_addrlen
); );
free(task);
} }
int main(int argc, char **argv) { int main(int argc, char **argv) {
@ -470,8 +486,8 @@ int main(int argc, char **argv) {
} }
struct timeval timeout; struct timeval timeout;
timeout.tv_sec = 2; timeout.tv_sec = 0;
timeout.tv_usec = 0; timeout.tv_usec = 10000;
if (setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, if (setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,
sizeof(timeout)) < 0) sizeof(timeout)) < 0)
@ -533,11 +549,11 @@ int main(int argc, char **argv) {
load_providers(); load_providers();
struct request_queue queue = {}; struct request_queue queue = {};
queue.pop_task = &queue.task_ring[0];
queue.push_task = &queue.task_ring[0];
while (true) { while (true) {
while(accept_request(&queue, sock)) {} update_time();
// TODO: adjust timeout, remove polling
accept_request(&queue, sock);
serve_request(&queue, sock); serve_request(&queue, sock);
} }