respondd: add support for delayed replies to multicast requests (#144)
This commit is contained in:
parent
738d8a2365
commit
e7fbcf75a9
|
@ -1,6 +1,7 @@
|
||||||
/*
|
/*
|
||||||
Copyright (c) 2014-2015, Nils Schneider <nils@nilsschneider.net>
|
Copyright (c) 2014-2015, Nils Schneider <nils@nilsschneider.net>
|
||||||
Copyright (c) 2015-2016, Matthias Schiffer <mschiffer@universe-factory.net>
|
Copyright (c) 2015-2016, Matthias Schiffer <mschiffer@universe-factory.net>
|
||||||
|
Copyright (c) 2016 Leonardo Mörlein <me@irrelefant.net>
|
||||||
All rights reserved.
|
All rights reserved.
|
||||||
|
|
||||||
Redistribution and use in source and binary forms, with or without
|
Redistribution and use in source and binary forms, with or without
|
||||||
|
@ -41,6 +42,7 @@
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
#include <net/if.h>
|
#include <net/if.h>
|
||||||
|
@ -48,6 +50,16 @@
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
|
|
||||||
|
#define SCHEDULE_LEN 8
|
||||||
|
#define REQUEST_MAXLEN 256
|
||||||
|
#define MAX_MULTICAST_DELAY_DEFAULT 0
|
||||||
|
|
||||||
|
struct interface_delay_info {
|
||||||
|
struct interface_delay_info *next;
|
||||||
|
|
||||||
|
unsigned int ifindex;
|
||||||
|
uint64_t max_multicast_delay;
|
||||||
|
};
|
||||||
|
|
||||||
struct provider_list {
|
struct provider_list {
|
||||||
struct provider_list *next;
|
struct provider_list *next;
|
||||||
|
@ -64,6 +76,18 @@ struct request_type {
|
||||||
int64_t cache_timeout;
|
int64_t cache_timeout;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct request_task {
|
||||||
|
struct request_task *next;
|
||||||
|
int64_t scheduled_time;
|
||||||
|
|
||||||
|
struct sockaddr_in6 client_addr;
|
||||||
|
char request[REQUEST_MAXLEN];
|
||||||
|
};
|
||||||
|
|
||||||
|
struct request_schedule {
|
||||||
|
size_t length;
|
||||||
|
struct request_task *list_head;
|
||||||
|
};
|
||||||
|
|
||||||
static int64_t now;
|
static int64_t now;
|
||||||
static struct hsearch_data htab;
|
static struct hsearch_data htab;
|
||||||
|
@ -79,15 +103,18 @@ static void usage() {
|
||||||
puts(" -p <int> port number to listen on");
|
puts(" -p <int> port number to listen on");
|
||||||
puts(" -g <ip6> multicast group, e.g. ff02::2:1001");
|
puts(" -g <ip6> multicast group, e.g. ff02::2:1001");
|
||||||
puts(" -i <string> interface on which the group is joined");
|
puts(" -i <string> interface on which the group is joined");
|
||||||
|
puts(" -t <int> maximum delay seconds before multicast responses");
|
||||||
|
puts(" for the last specified multicast interface (default: 0)");
|
||||||
puts(" -d <string> data provider directory (default: current directory)");
|
puts(" -d <string> data provider directory (default: current directory)");
|
||||||
puts(" -h this help\n");
|
puts(" -h this help\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
static void join_mcast(const int sock, const struct in6_addr addr, const char *iface) {
|
// returns true on success
|
||||||
|
static bool join_mcast(const int sock, const struct in6_addr addr, unsigned int ifindex) {
|
||||||
struct ipv6_mreq mreq;
|
struct ipv6_mreq mreq;
|
||||||
|
|
||||||
mreq.ipv6mr_multiaddr = addr;
|
mreq.ipv6mr_multiaddr = addr;
|
||||||
mreq.ipv6mr_interface = if_nametoindex(iface);
|
mreq.ipv6mr_interface = ifindex;
|
||||||
|
|
||||||
if (mreq.ipv6mr_interface == 0)
|
if (mreq.ipv6mr_interface == 0)
|
||||||
goto error;
|
goto error;
|
||||||
|
@ -95,11 +122,11 @@ static void join_mcast(const int sock, const struct in6_addr addr, const char *i
|
||||||
if (setsockopt(sock, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq, sizeof(mreq)) == -1)
|
if (setsockopt(sock, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq, sizeof(mreq)) == -1)
|
||||||
goto error;
|
goto error;
|
||||||
|
|
||||||
return;
|
return true;
|
||||||
|
|
||||||
error:
|
error:
|
||||||
fprintf(stderr, "Could not join multicast group on %s: ", iface);
|
|
||||||
perror(NULL);
|
perror(NULL);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -165,6 +192,55 @@ static const struct respondd_provider_info * get_providers(const char *filename)
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool schedule_push_request(struct request_schedule *s, struct request_task *new_task) {
|
||||||
|
if (s->length >= SCHEDULE_LEN)
|
||||||
|
// schedule is full
|
||||||
|
return false;
|
||||||
|
|
||||||
|
// insert into sorted list
|
||||||
|
struct request_task **pos;
|
||||||
|
for (pos = &s->list_head; *pos; pos = &((*pos)->next)) {
|
||||||
|
if ((*pos)->scheduled_time > new_task->scheduled_time)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// insert before *pos
|
||||||
|
new_task->next = *pos;
|
||||||
|
*pos = new_task;
|
||||||
|
|
||||||
|
s->length++;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t schedule_idle_time(struct request_schedule *s) {
|
||||||
|
if (!s->list_head)
|
||||||
|
// nothing to do yet (0 = infinite time)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
int64_t result = s->list_head->scheduled_time - now;
|
||||||
|
|
||||||
|
if (result <= 0)
|
||||||
|
return -1; // zero is infinity
|
||||||
|
else
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct request_task * schedule_pop_request(struct request_schedule *s) {
|
||||||
|
if (!s->list_head)
|
||||||
|
// schedule is empty
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
if (schedule_idle_time(s) >= 0) {
|
||||||
|
// nothing to do yet
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct request_task *result = s->list_head;
|
||||||
|
s->list_head = s->list_head->next;
|
||||||
|
s->length--;
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
static void load_cache_time(struct request_type *r, const char *name) {
|
static void load_cache_time(struct request_type *r, const char *name) {
|
||||||
r->cache = NULL;
|
r->cache = NULL;
|
||||||
r->cache_time = 0;
|
r->cache_time = 0;
|
||||||
|
@ -308,8 +384,6 @@ static struct json_object * handle_request(char *request, bool *compress) {
|
||||||
if (!*request)
|
if (!*request)
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
update_time();
|
|
||||||
|
|
||||||
if (!strncmp(request, "GET ", 4)) {
|
if (!strncmp(request, "GET ", 4)) {
|
||||||
*compress = true;
|
*compress = true;
|
||||||
return multi_request(request+4);
|
return multi_request(request+4);
|
||||||
|
@ -320,27 +394,10 @@ static struct json_object * handle_request(char *request, bool *compress) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void send_response(int sock, struct json_object *result, bool compress,
|
||||||
static void serve(int sock) {
|
struct sockaddr_in6 *addr) {
|
||||||
char input[256];
|
|
||||||
const char *output = NULL;
|
const char *output = NULL;
|
||||||
ssize_t input_bytes, output_bytes;
|
size_t output_bytes;
|
||||||
struct sockaddr_in6 addr;
|
|
||||||
socklen_t addrlen = sizeof(addr);
|
|
||||||
bool compress;
|
|
||||||
|
|
||||||
input_bytes = recvfrom(sock, input, sizeof(input)-1, 0, (struct sockaddr *)&addr, &addrlen);
|
|
||||||
|
|
||||||
if (input_bytes < 0) {
|
|
||||||
perror("recvfrom failed");
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
input[input_bytes] = 0;
|
|
||||||
|
|
||||||
struct json_object *result = handle_request(input, &compress);
|
|
||||||
if (!result)
|
|
||||||
return;
|
|
||||||
|
|
||||||
const char *str = json_object_to_json_string_ext(result, JSON_C_TO_STRING_PLAIN);
|
const char *str = json_object_to_json_string_ext(result, JSON_C_TO_STRING_PLAIN);
|
||||||
|
|
||||||
|
@ -361,13 +418,143 @@ static void serve(int sock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (output) {
|
if (output) {
|
||||||
if (sendto(sock, output, output_bytes, 0, (struct sockaddr *)&addr, addrlen) < 0)
|
if (sendto(sock, output, output_bytes, 0, (struct sockaddr *) addr, sizeof(*addr)) < 0)
|
||||||
perror("sendto failed");
|
perror("sendto failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
json_object_put(result);
|
json_object_put(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void serve_request(struct request_task *task, int sock) {
|
||||||
|
bool compress;
|
||||||
|
struct json_object *result = handle_request(task->request, &compress);
|
||||||
|
|
||||||
|
if (!result)
|
||||||
|
return;
|
||||||
|
|
||||||
|
send_response(
|
||||||
|
sock,
|
||||||
|
result,
|
||||||
|
compress,
|
||||||
|
&task->client_addr
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for an incoming request and schedule it.
|
||||||
|
*
|
||||||
|
* 1a. If the schedule is empty, we wait infinite time.
|
||||||
|
* 1b. If we have scheduled requests, we only wait for incoming requests
|
||||||
|
* until we reach the scheduling deadline.
|
||||||
|
* 1c. If there is no request incomming in the above time, the fuction will
|
||||||
|
* return.
|
||||||
|
* 2a. If the incoming request was sent to a multicast destination IPv6,
|
||||||
|
* check whether there was set a max multicast delay for the incomming iface
|
||||||
|
* in if_delay_info_list.
|
||||||
|
* 2b. If so choose a random delay between 0 and max_multicast_delay milliseconds
|
||||||
|
* and schedule the request.
|
||||||
|
* 2c. If not, send the request immediately.
|
||||||
|
* 2d. If the schedule is full, send the reply immediately.
|
||||||
|
* 3a. If the incoming request was sent to a unicast destination, the response
|
||||||
|
* will be also sent immediately.
|
||||||
|
*/
|
||||||
|
static void accept_request(struct request_schedule *schedule, int sock,
|
||||||
|
struct interface_delay_info *if_delay_info_list) {
|
||||||
|
char input[REQUEST_MAXLEN];
|
||||||
|
ssize_t input_bytes;
|
||||||
|
struct sockaddr_in6 addr;
|
||||||
|
char control[256];
|
||||||
|
struct in6_addr destaddr = {};
|
||||||
|
struct cmsghdr *cmsg;
|
||||||
|
unsigned int ifindex = 0;
|
||||||
|
int recv_errno;
|
||||||
|
|
||||||
|
int64_t timeout = schedule_idle_time(schedule);
|
||||||
|
if (timeout < 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// set timeout to the socket
|
||||||
|
struct timeval t;
|
||||||
|
t.tv_sec = ((uint64_t) timeout) / 1000;
|
||||||
|
t.tv_usec = (((uint64_t) timeout) % 1000) * 1000;
|
||||||
|
|
||||||
|
if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &t, sizeof(t)) < 0)
|
||||||
|
perror("setsockopt failed\n");
|
||||||
|
|
||||||
|
struct iovec iv = {
|
||||||
|
.iov_base = input,
|
||||||
|
.iov_len = sizeof(input) - 1
|
||||||
|
};
|
||||||
|
|
||||||
|
struct msghdr mh = {
|
||||||
|
.msg_name = &addr,
|
||||||
|
.msg_namelen = sizeof(addr),
|
||||||
|
.msg_iov = &iv,
|
||||||
|
.msg_iovlen = 1,
|
||||||
|
.msg_control = control,
|
||||||
|
.msg_controllen = sizeof(control)
|
||||||
|
};
|
||||||
|
|
||||||
|
input_bytes = recvmsg(sock, &mh, 0);
|
||||||
|
recv_errno = errno;
|
||||||
|
update_time();
|
||||||
|
|
||||||
|
// Timeout
|
||||||
|
errno = recv_errno;
|
||||||
|
if (input_bytes < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (input_bytes < 0) {
|
||||||
|
perror("recvmsg failed");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// determine destination address
|
||||||
|
for (cmsg = CMSG_FIRSTHDR(&mh); cmsg != NULL; cmsg = CMSG_NXTHDR(&mh, cmsg))
|
||||||
|
{
|
||||||
|
// skip other packet headers
|
||||||
|
if (cmsg->cmsg_level != IPPROTO_IPV6 || cmsg->cmsg_type != IPV6_PKTINFO)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
struct in6_pktinfo *pi = (struct in6_pktinfo *) CMSG_DATA(cmsg);
|
||||||
|
destaddr = pi->ipi6_addr;
|
||||||
|
ifindex = pi->ipi6_ifindex;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
input[input_bytes] = 0;
|
||||||
|
|
||||||
|
// get the max delay
|
||||||
|
uint64_t max_multicast_delay = MAX_MULTICAST_DELAY_DEFAULT;
|
||||||
|
struct interface_delay_info *tmp = if_delay_info_list;
|
||||||
|
for (; tmp; tmp = tmp->next) {
|
||||||
|
if (tmp->ifindex == ifindex) {
|
||||||
|
max_multicast_delay = tmp->max_multicast_delay;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct request_task *new_task = malloc(sizeof(*new_task));
|
||||||
|
// the ternary operator avoids division by 0
|
||||||
|
new_task->scheduled_time = max_multicast_delay ? now + rand() % max_multicast_delay : 0;
|
||||||
|
strncpy(new_task->request, input, input_bytes + 1);
|
||||||
|
new_task->request[input_bytes] = 0;
|
||||||
|
new_task->client_addr = addr;
|
||||||
|
|
||||||
|
bool is_scheduled;
|
||||||
|
if(new_task->scheduled_time && IN6_IS_ADDR_MULTICAST(&destaddr))
|
||||||
|
// scheduling could fail because the schedule is full
|
||||||
|
is_scheduled = schedule_push_request(schedule, new_task);
|
||||||
|
else
|
||||||
|
// unicast packets are always sent directly
|
||||||
|
is_scheduled = false;
|
||||||
|
|
||||||
|
if (!is_scheduled) {
|
||||||
|
// reply immediately
|
||||||
|
serve_request(new_task, sock);
|
||||||
|
free(new_task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char **argv) {
|
int main(int argc, char **argv) {
|
||||||
const int one = 1;
|
const int one = 1;
|
||||||
|
@ -376,6 +563,8 @@ int main(int argc, char **argv) {
|
||||||
struct sockaddr_in6 server_addr = {};
|
struct sockaddr_in6 server_addr = {};
|
||||||
struct in6_addr mgroup_addr;
|
struct in6_addr mgroup_addr;
|
||||||
|
|
||||||
|
srand(time(NULL));
|
||||||
|
|
||||||
sock = socket(PF_INET6, SOCK_DGRAM, 0);
|
sock = socket(PF_INET6, SOCK_DGRAM, 0);
|
||||||
|
|
||||||
if (sock < 0) {
|
if (sock < 0) {
|
||||||
|
@ -388,15 +577,24 @@ int main(int argc, char **argv) {
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (setsockopt(sock, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one))) {
|
||||||
|
perror("can't set socket to deliver IPV6_PKTINFO control message");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
server_addr.sin6_family = AF_INET6;
|
server_addr.sin6_family = AF_INET6;
|
||||||
server_addr.sin6_addr = in6addr_any;
|
server_addr.sin6_addr = in6addr_any;
|
||||||
|
|
||||||
|
char *endptr;
|
||||||
opterr = 0;
|
opterr = 0;
|
||||||
|
|
||||||
int group_set = 0;
|
int group_set = 0;
|
||||||
|
bool iface_set = false;
|
||||||
|
unsigned int last_ifindex = 0;
|
||||||
|
struct interface_delay_info *if_delay_info_list = NULL;
|
||||||
|
|
||||||
int c;
|
int c;
|
||||||
while ((c = getopt(argc, argv, "p:g:i:d:h")) != -1) {
|
while ((c = getopt(argc, argv, "p:g:t:i:d:h")) != -1) {
|
||||||
switch (c) {
|
switch (c) {
|
||||||
case 'p':
|
case 'p':
|
||||||
server_addr.sin6_port = htons(atoi(optarg));
|
server_addr.sin6_port = htons(atoi(optarg));
|
||||||
|
@ -416,7 +614,38 @@ int main(int argc, char **argv) {
|
||||||
fprintf(stderr, "Multicast group must be given before interface.\n");
|
fprintf(stderr, "Multicast group must be given before interface.\n");
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
join_mcast(sock, mgroup_addr, optarg);
|
iface_set = true;
|
||||||
|
last_ifindex = if_nametoindex(optarg);
|
||||||
|
if(!join_mcast(sock, mgroup_addr, last_ifindex)) {
|
||||||
|
fprintf(stderr, "Could not join multicast group on %s: ", optarg);
|
||||||
|
last_ifindex = 0;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 't':
|
||||||
|
if (!iface_set) {
|
||||||
|
fprintf(stderr, "Interface must be given before max response delay.\n");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t max_multicast_delay = UINT64_C(1000) * strtoul(optarg, &endptr, 10);
|
||||||
|
if (!*optarg || *endptr || max_multicast_delay > INT64_MAX) {
|
||||||
|
fprintf(stderr, "Invalid multicast delay\n");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (last_ifindex) {
|
||||||
|
// insert the interface delay info at the beginning of the list
|
||||||
|
struct interface_delay_info **head = &if_delay_info_list;
|
||||||
|
struct interface_delay_info *old_head = if_delay_info_list;
|
||||||
|
|
||||||
|
*head = malloc(sizeof(*if_delay_info_list));
|
||||||
|
(*head)->ifindex = last_ifindex;
|
||||||
|
(*head)->max_multicast_delay = max_multicast_delay;
|
||||||
|
(*head)->next = old_head;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case 'd':
|
case 'd':
|
||||||
|
@ -443,8 +672,19 @@ int main(int argc, char **argv) {
|
||||||
|
|
||||||
load_providers();
|
load_providers();
|
||||||
|
|
||||||
while (true)
|
struct request_schedule schedule = {};
|
||||||
serve(sock);
|
|
||||||
|
while (true) {
|
||||||
|
accept_request(&schedule, sock, if_delay_info_list);
|
||||||
|
|
||||||
|
struct request_task *task = schedule_pop_request(&schedule);
|
||||||
|
|
||||||
|
if (!task)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
serve_request(task, sock);
|
||||||
|
free(task);
|
||||||
|
}
|
||||||
|
|
||||||
return EXIT_FAILURE;
|
return EXIT_FAILURE;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue