From e7fbcf75a95da3c39a3a7a85a2da481c3bf7d93d Mon Sep 17 00:00:00 2001 From: lemoer Date: Tue, 6 Dec 2016 19:46:40 +0100 Subject: [PATCH] respondd: add support for delayed replies to multicast requests (#144) --- net/respondd/src/respondd.c | 302 ++++++++++++++++++++++++++++++++---- 1 file changed, 271 insertions(+), 31 deletions(-) diff --git a/net/respondd/src/respondd.c b/net/respondd/src/respondd.c index e3242da..9e91cec 100644 --- a/net/respondd/src/respondd.c +++ b/net/respondd/src/respondd.c @@ -1,6 +1,7 @@ /* Copyright (c) 2014-2015, Nils Schneider Copyright (c) 2015-2016, Matthias Schiffer + Copyright (c) 2016 Leonardo Mörlein All rights reserved. Redistribution and use in source and binary forms, with or without @@ -41,6 +42,7 @@ #include #include #include +#include #include #include @@ -48,6 +50,16 @@ #include #include +#define SCHEDULE_LEN 8 +#define REQUEST_MAXLEN 256 +#define MAX_MULTICAST_DELAY_DEFAULT 0 + +struct interface_delay_info { + struct interface_delay_info *next; + + unsigned int ifindex; + uint64_t max_multicast_delay; +}; struct provider_list { struct provider_list *next; @@ -64,6 +76,18 @@ struct request_type { int64_t cache_timeout; }; +struct request_task { + struct request_task *next; + int64_t scheduled_time; + + struct sockaddr_in6 client_addr; + char request[REQUEST_MAXLEN]; +}; + +struct request_schedule { + size_t length; + struct request_task *list_head; +}; static int64_t now; static struct hsearch_data htab; @@ -79,15 +103,18 @@ static void usage() { puts(" -p port number to listen on"); puts(" -g multicast group, e.g. ff02::2:1001"); puts(" -i interface on which the group is joined"); + puts(" -t maximum delay seconds before multicast responses"); + puts(" for the last specified multicast interface (default: 0)"); puts(" -d data provider directory (default: current directory)"); puts(" -h this help\n"); } -static void join_mcast(const int sock, const struct in6_addr addr, const char *iface) { +// returns true on success +static bool join_mcast(const int sock, const struct in6_addr addr, unsigned int ifindex) { struct ipv6_mreq mreq; mreq.ipv6mr_multiaddr = addr; - mreq.ipv6mr_interface = if_nametoindex(iface); + mreq.ipv6mr_interface = ifindex; if (mreq.ipv6mr_interface == 0) goto error; @@ -95,11 +122,11 @@ static void join_mcast(const int sock, const struct in6_addr addr, const char *i if (setsockopt(sock, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq, sizeof(mreq)) == -1) goto error; - return; + return true; error: - fprintf(stderr, "Could not join multicast group on %s: ", iface); perror(NULL); + return false; } @@ -165,6 +192,55 @@ static const struct respondd_provider_info * get_providers(const char *filename) return ret; } +bool schedule_push_request(struct request_schedule *s, struct request_task *new_task) { + if (s->length >= SCHEDULE_LEN) + // schedule is full + return false; + + // insert into sorted list + struct request_task **pos; + for (pos = &s->list_head; *pos; pos = &((*pos)->next)) { + if ((*pos)->scheduled_time > new_task->scheduled_time) + break; + } + // insert before *pos + new_task->next = *pos; + *pos = new_task; + + s->length++; + return true; +} + +int64_t schedule_idle_time(struct request_schedule *s) { + if (!s->list_head) + // nothing to do yet (0 = infinite time) + return 0; + + int64_t result = s->list_head->scheduled_time - now; + + if (result <= 0) + return -1; // zero is infinity + else + return result; +} + +struct request_task * schedule_pop_request(struct request_schedule *s) { + if (!s->list_head) + // schedule is empty + return NULL; + + if (schedule_idle_time(s) >= 0) { + // nothing to do yet + return NULL; + } + + struct request_task *result = s->list_head; + s->list_head = s->list_head->next; + s->length--; + + return result; +} + static void load_cache_time(struct request_type *r, const char *name) { r->cache = NULL; r->cache_time = 0; @@ -308,8 +384,6 @@ static struct json_object * handle_request(char *request, bool *compress) { if (!*request) return NULL; - update_time(); - if (!strncmp(request, "GET ", 4)) { *compress = true; return multi_request(request+4); @@ -320,27 +394,10 @@ static struct json_object * handle_request(char *request, bool *compress) { } } - -static void serve(int sock) { - char input[256]; +void send_response(int sock, struct json_object *result, bool compress, + struct sockaddr_in6 *addr) { const char *output = NULL; - ssize_t input_bytes, output_bytes; - struct sockaddr_in6 addr; - socklen_t addrlen = sizeof(addr); - bool compress; - - input_bytes = recvfrom(sock, input, sizeof(input)-1, 0, (struct sockaddr *)&addr, &addrlen); - - if (input_bytes < 0) { - perror("recvfrom failed"); - exit(EXIT_FAILURE); - } - - input[input_bytes] = 0; - - struct json_object *result = handle_request(input, &compress); - if (!result) - return; + size_t output_bytes; const char *str = json_object_to_json_string_ext(result, JSON_C_TO_STRING_PLAIN); @@ -361,13 +418,143 @@ static void serve(int sock) { } if (output) { - if (sendto(sock, output, output_bytes, 0, (struct sockaddr *)&addr, addrlen) < 0) + if (sendto(sock, output, output_bytes, 0, (struct sockaddr *) addr, sizeof(*addr)) < 0) perror("sendto failed"); } json_object_put(result); } +void serve_request(struct request_task *task, int sock) { + bool compress; + struct json_object *result = handle_request(task->request, &compress); + + if (!result) + return; + + send_response( + sock, + result, + compress, + &task->client_addr + ); +} + +/** + * Wait for an incoming request and schedule it. + * + * 1a. If the schedule is empty, we wait infinite time. + * 1b. If we have scheduled requests, we only wait for incoming requests + * until we reach the scheduling deadline. + * 1c. If there is no request incomming in the above time, the fuction will + * return. + * 2a. If the incoming request was sent to a multicast destination IPv6, + * check whether there was set a max multicast delay for the incomming iface + * in if_delay_info_list. + * 2b. If so choose a random delay between 0 and max_multicast_delay milliseconds + * and schedule the request. + * 2c. If not, send the request immediately. + * 2d. If the schedule is full, send the reply immediately. + * 3a. If the incoming request was sent to a unicast destination, the response + * will be also sent immediately. + */ +static void accept_request(struct request_schedule *schedule, int sock, + struct interface_delay_info *if_delay_info_list) { + char input[REQUEST_MAXLEN]; + ssize_t input_bytes; + struct sockaddr_in6 addr; + char control[256]; + struct in6_addr destaddr = {}; + struct cmsghdr *cmsg; + unsigned int ifindex = 0; + int recv_errno; + + int64_t timeout = schedule_idle_time(schedule); + if (timeout < 0) + return; + + // set timeout to the socket + struct timeval t; + t.tv_sec = ((uint64_t) timeout) / 1000; + t.tv_usec = (((uint64_t) timeout) % 1000) * 1000; + + if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &t, sizeof(t)) < 0) + perror("setsockopt failed\n"); + + struct iovec iv = { + .iov_base = input, + .iov_len = sizeof(input) - 1 + }; + + struct msghdr mh = { + .msg_name = &addr, + .msg_namelen = sizeof(addr), + .msg_iov = &iv, + .msg_iovlen = 1, + .msg_control = control, + .msg_controllen = sizeof(control) + }; + + input_bytes = recvmsg(sock, &mh, 0); + recv_errno = errno; + update_time(); + + // Timeout + errno = recv_errno; + if (input_bytes < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + return; + + if (input_bytes < 0) { + perror("recvmsg failed"); + exit(EXIT_FAILURE); + } + + // determine destination address + for (cmsg = CMSG_FIRSTHDR(&mh); cmsg != NULL; cmsg = CMSG_NXTHDR(&mh, cmsg)) + { + // skip other packet headers + if (cmsg->cmsg_level != IPPROTO_IPV6 || cmsg->cmsg_type != IPV6_PKTINFO) + continue; + + struct in6_pktinfo *pi = (struct in6_pktinfo *) CMSG_DATA(cmsg); + destaddr = pi->ipi6_addr; + ifindex = pi->ipi6_ifindex; + break; + } + + input[input_bytes] = 0; + + // get the max delay + uint64_t max_multicast_delay = MAX_MULTICAST_DELAY_DEFAULT; + struct interface_delay_info *tmp = if_delay_info_list; + for (; tmp; tmp = tmp->next) { + if (tmp->ifindex == ifindex) { + max_multicast_delay = tmp->max_multicast_delay; + break; + } + } + + struct request_task *new_task = malloc(sizeof(*new_task)); + // the ternary operator avoids division by 0 + new_task->scheduled_time = max_multicast_delay ? now + rand() % max_multicast_delay : 0; + strncpy(new_task->request, input, input_bytes + 1); + new_task->request[input_bytes] = 0; + new_task->client_addr = addr; + + bool is_scheduled; + if(new_task->scheduled_time && IN6_IS_ADDR_MULTICAST(&destaddr)) + // scheduling could fail because the schedule is full + is_scheduled = schedule_push_request(schedule, new_task); + else + // unicast packets are always sent directly + is_scheduled = false; + + if (!is_scheduled) { + // reply immediately + serve_request(new_task, sock); + free(new_task); + } +} int main(int argc, char **argv) { const int one = 1; @@ -376,6 +563,8 @@ int main(int argc, char **argv) { struct sockaddr_in6 server_addr = {}; struct in6_addr mgroup_addr; + srand(time(NULL)); + sock = socket(PF_INET6, SOCK_DGRAM, 0); if (sock < 0) { @@ -388,15 +577,24 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } + if (setsockopt(sock, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one))) { + perror("can't set socket to deliver IPV6_PKTINFO control message"); + exit(EXIT_FAILURE); + } + server_addr.sin6_family = AF_INET6; server_addr.sin6_addr = in6addr_any; + char *endptr; opterr = 0; int group_set = 0; + bool iface_set = false; + unsigned int last_ifindex = 0; + struct interface_delay_info *if_delay_info_list = NULL; int c; - while ((c = getopt(argc, argv, "p:g:i:d:h")) != -1) { + while ((c = getopt(argc, argv, "p:g:t:i:d:h")) != -1) { switch (c) { case 'p': server_addr.sin6_port = htons(atoi(optarg)); @@ -416,7 +614,38 @@ int main(int argc, char **argv) { fprintf(stderr, "Multicast group must be given before interface.\n"); exit(EXIT_FAILURE); } - join_mcast(sock, mgroup_addr, optarg); + iface_set = true; + last_ifindex = if_nametoindex(optarg); + if(!join_mcast(sock, mgroup_addr, last_ifindex)) { + fprintf(stderr, "Could not join multicast group on %s: ", optarg); + last_ifindex = 0; + } + break; + + case 't': + if (!iface_set) { + fprintf(stderr, "Interface must be given before max response delay.\n"); + exit(EXIT_FAILURE); + } + + uint64_t max_multicast_delay = UINT64_C(1000) * strtoul(optarg, &endptr, 10); + if (!*optarg || *endptr || max_multicast_delay > INT64_MAX) { + fprintf(stderr, "Invalid multicast delay\n"); + exit(EXIT_FAILURE); + } + + if (last_ifindex) { + // insert the interface delay info at the beginning of the list + struct interface_delay_info **head = &if_delay_info_list; + struct interface_delay_info *old_head = if_delay_info_list; + + *head = malloc(sizeof(*if_delay_info_list)); + (*head)->ifindex = last_ifindex; + (*head)->max_multicast_delay = max_multicast_delay; + (*head)->next = old_head; + } + + break; case 'd': @@ -443,8 +672,19 @@ int main(int argc, char **argv) { load_providers(); - while (true) - serve(sock); + struct request_schedule schedule = {}; + + while (true) { + accept_request(&schedule, sock, if_delay_info_list); + + struct request_task *task = schedule_pop_request(&schedule); + + if (!task) + continue; + + serve_request(task, sock); + free(task); + } return EXIT_FAILURE; }