diff --git a/net/sse-multiplex/Makefile b/net/sse-multiplex/Makefile index a7e834a..9a33f6a 100644 --- a/net/sse-multiplex/Makefile +++ b/net/sse-multiplex/Makefile @@ -1,7 +1,7 @@ include $(TOPDIR)/rules.mk PKG_NAME:=sse-multiplex -PKG_VERSION:=1 +PKG_VERSION:=2 PKG_LICENSE:=BSD-2-Clause diff --git a/net/sse-multiplex/src/sse-multiplexd.c b/net/sse-multiplex/src/sse-multiplexd.c index 131943b..0679369 100644 --- a/net/sse-multiplex/src/sse-multiplexd.c +++ b/net/sse-multiplex/src/sse-multiplexd.c @@ -1,6 +1,6 @@ /* - Copyright (c) 2015, Matthias Schiffer + Copyright (c) 2015-2018, Matthias Schiffer All rights reserved. Redistribution and use in source and binary forms, with or without @@ -48,6 +48,8 @@ typedef struct client client_t; typedef struct provider provider_t; +typedef void (*handler_t)(provider_t *, void *buffer, size_t len); + static volatile bool running = true; @@ -58,11 +60,19 @@ static struct epoll_event listen_event = {}; static provider_t *providers = NULL; +typedef enum { + CLIENT_STATE_NEW = 0, + CLIENT_STATE_HEADER_SENT, + CLIENT_STATE_ACTIVE, + CLIENT_STATE_CLOSE, +} client_state_t; + + struct client { struct client *next; - FILE *file; - bool active; + int fd; + client_state_t state; }; struct provider { @@ -70,47 +80,26 @@ struct provider { struct provider *next; char *command; - FILE *file; + int fd; struct epoll_event event; + size_t header_buflen; + size_t header_len; char *header; - bool preclean; + + char last; bool clean; + handler_t handler; client_t *clients; }; -static char * read_header(FILE *file) { - size_t buflen = 256, content_len = 0; - char *buffer = malloc(buflen); - - while (true) { - size_t space = buflen - content_len; - if (space < 128) { - buflen += 256; - buffer = realloc(buffer, buflen); - space = buflen - content_len; - } - - bool ok = fgets(buffer+content_len, space, file); - if (!ok) { - free(buffer); - return NULL; - } - - content_len += strlen(buffer+content_len); - - if (content_len >= 2 && buffer[content_len-2] == '\n' && buffer[content_len-1] == '\n') - return buffer; - } -} - -static FILE * run_command(const char *command) { +static int run_command(const char *command) { int pipefd[2]; if (pipe(pipefd) < 0) { syslog(LOG_ERR, "pipe: %s", strerror(errno)); - return NULL; + return -1; } pid_t pid = fork(); @@ -118,19 +107,12 @@ static FILE * run_command(const char *command) { syslog(LOG_ERR, "fork: %s", strerror(errno)); close(pipefd[0]); close(pipefd[1]); - return NULL; + return -1; } if (pid > 0) { close(pipefd[1]); - - FILE *file = fdopen(pipefd[0], "r"); - if (!file) { - close(pipefd[0]); - return NULL; - } - - return file; + return pipefd[0]; } else { close(pipefd[0]); @@ -151,32 +133,131 @@ static FILE * run_command(const char *command) { } } -static provider_t * new_provider(const char *command) { - FILE *file = run_command(command); - if (!file) { - syslog(LOG_WARNING, "unable to start provider `%s'", command); +/** Write a whole buffer to a FD, retrying on partial writes */ +static bool feed(int fd, void *buffer, size_t len) { + while (len) { + ssize_t w = write(fd, buffer, len); + if (w == 0) + return false; + if (w < 0) { + if (errno == EINTR) + continue; + + return false; + } + + buffer += w; + len -= w; + } + + return true; +} + +/** + Creates a new client for a given socket FD and adds + it to a provider's client list +*/ +static void client_add(provider_t *p, int fd) { + client_t *c = calloc(1, sizeof(*c)); + c->fd = fd; + + c->next = p->clients; + p->clients = c; +} + +/** Writes a buffer to a client's FD if the client is active */ +static void client_feed(client_t *c, void *buffer, size_t len) { + if (!feed(c->fd, buffer, len)) + c->state = CLIENT_STATE_CLOSE; +} + +static void client_free(client_t *c) { + close(c->fd); + free(c); +} + +/** Writes a buffer to all active clients of a given provider */ +static void provider_handle_data(provider_t *provider, void *buffer, size_t len) { + if (!len) + return; + + for (client_t *c = provider->clients; c; c = c->next) { + if (c->state == CLIENT_STATE_ACTIVE) + client_feed(c, buffer, len); + } +} + +/** + Adds a buffer to the header buffer of a provider + + The HTTP header generated by the provider is reproduced for each new + client connecting for the same provider, so it must be stored in a + buffer. New providers are using this hander. As soon as the input is + clean (a double newline terminating the header has been received), this + handler replaces itself with provider_handle_data(). +*/ +static void provider_handle_header(provider_t *provider, void *buffer, size_t len) { + if (provider->clean) + provider->handler = provider_handle_data; + + if (!len) + return; + + size_t new_len = provider->header_len + len; + if (new_len < provider->header_len) + goto overflow; + + if (new_len > provider->header_buflen) { + if (!provider->header_buflen) + provider->header_buflen = 128; + + while (new_len > provider->header_buflen) { + provider->header_buflen <<= 1; + if (!provider->header_buflen) + goto overflow; + } + + provider->header = realloc(provider->header, provider->header_buflen); + } + + memcpy(provider->header+provider->header_len, buffer, len); + provider->header_len = new_len; + + return; + +overflow: + /* Just a safeguard, we're OOM long before + * an overflow + * + * We can't really do anything useful here, + * so we just drop the buffer ¯\_(ツ)_/¯ + */ + free(provider->header); + provider->header_len = 0; + provider->header_buflen = 0; + return; +} + +/** Runs the given command, creating a new provider for the command's stdout pipe */ +static provider_t * provider_new(const char *command) { + int fd = run_command(command); + if (fd < 0) { + syslog(LOG_WARNING, "unable to run command `%s'", command); return NULL; } - char *header = read_header(file); - if (!header) { - fclose(file); - return NULL; - } - - fcntl(fileno(file), F_SETFL, fcntl(fileno(file), F_GETFL) | O_NONBLOCK); + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK); provider_t *p = calloc(1, sizeof(*p)); p->command = strdup(command); - p->file = file; - p->header = header; - p->preclean = true; - p->clean = true; + p->fd = fd; + + p->handler = provider_handle_header; p->event.events = EPOLLIN|EPOLLRDHUP; p->event.data.ptr = p; - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fileno(file), &p->event) < 0) { + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &p->event) < 0) { fprintf(stderr, "epoll_ctl: %s\n", strerror(errno)); exit(1); } @@ -189,41 +270,24 @@ static provider_t * new_provider(const char *command) { return p; } -static provider_t * get_provider(const char *command) { +/** + Either retrieves an existing provider for the given command or + creates a new one if none exists +*/ +static provider_t * provider_get(const char *command) { provider_t *p; for (p = providers; p; p = p->next) { if (!strcmp(p->command, command)) return p; } - return new_provider(command); + return provider_new(command); } -static void free_clients(client_t *clients) { - while (clients) { - client_t *next = clients->next; - - fclose(clients->file); - free(clients); - - clients = next; - } -} - -static void free_provider(provider_t *p) { - if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fileno(p->file), NULL) < 0) { - fprintf(stderr, "epoll_ctl: %s\n", strerror(errno)); - exit(1); - } - - free(p->command); - fclose(p->file); - free(p->header); - free_clients(p->clients); - free(p); -} - -static void remove_provider(provider_t *p) { +/** + Cleans up behind a provider, removes it from the global provider list + and frees the provider */ +static void provider_del(provider_t *p) { if (p->next) p->next->prev = p->prev; @@ -232,31 +296,86 @@ static void remove_provider(provider_t *p) { else providers = p->next; - free_provider(p); -} - -static void add_client(provider_t *p, FILE *file) { - if (fputs(p->header, file) == EOF || fflush(file) == EOF || ferror(file)) { - fclose(file); - return; + if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, p->fd, NULL) < 0) { + fprintf(stderr, "epoll_ctl: %s\n", strerror(errno)); + exit(1); } - client_t *c = calloc(1, sizeof(*c)); - c->file = file; - c->active = p->clean; + for (client_t *c = p->clients; c; c = p->clients) { + p->clients = c->next; + client_free(c); + } - c->next = p->clients; - p->clients = c; + free(p->command); + close(p->fd); + free(p->header); + free(p); } -static void remove_client(client_t **client) { - client_t *c = *client; - *client = c->next; +/** + Periodic maintenance - fclose(c->file); - free(c); + New clients are activated as soon as the input is clean (we have just + read a double newline); clients that have failed are deleted. + + When all clients have been removed, the provider itself is deleted and + false is returned. +*/ +static bool provider_maintain(provider_t *p) { + for (client_t **cp = &p->clients, *c = *cp; c; c = *cp) { + switch (c->state) { + case CLIENT_STATE_NEW: + if (p->handler == provider_handle_header) + break; + + /* + Set state first, in case client_feed() + sets the state to CLIENT_STATE_CLOSE + */ + c->state = CLIENT_STATE_HEADER_SENT; + client_feed(c, p->header, p->header_len); + continue; + + case CLIENT_STATE_HEADER_SENT: + if (p->clean) + c->state = CLIENT_STATE_ACTIVE; + + break; + + case CLIENT_STATE_ACTIVE: + break; + + case CLIENT_STATE_CLOSE: + *cp = c->next; + client_free(c); + continue; + } + + cp = &c->next; + } + + if (!p->clients) { + provider_del(p); + return false; + } + + return true; } +/** + Handles input data from a provider + + clean must be set to true if we are at the end of the header or an SSE + record (a double newline). + + Returns false when the provider has been deleted because all clients + disappeared. +*/ +static bool provider_data(provider_t *provider, void *buf, size_t len, bool clean) { + provider->clean = clean; + provider->handler(provider, buf, len); + return provider_maintain(provider); +} static void init_epoll(void) { epoll_fd = epoll_create1(EPOLL_CLOEXEC); @@ -345,51 +464,65 @@ static void setup_signals(void) { sigaction(SIGPIPE, &action, NULL); } -static void handle_data(provider_t *provider) { - while (true) { - if (feof(provider->file)) { - remove_provider(provider); - return; - } - +/** Handles input from a provider */ +static void epoll_handle_provider(provider_t *provider) { + /* + The "last" field contains the last character from the previously + read buffer. This allows us to search for double newlines that + span two reads. + */ + struct { + char last; char buf[1024]; - bool ok = fgets(buf, sizeof(buf), provider->file); - if (!ok) + } data; + + data.last = provider->last; + + ssize_t r = read(provider->fd, data.buf, sizeof(data.buf)); + if (r <= 0) { + if (r < 0 && errno == EINTR) return; - provider->clean = provider->preclean && (buf[0] == '\n'); - provider->preclean = (buf[strlen(buf)-1] == '\n'); - - client_t **c = &provider->clients; - while (*c) { - if ((*c)->active && fputs(buf, (*c)->file) == EOF) { - remove_client(c); - continue; - } - - if (provider->clean) { - /* The ferror check should be redundant, as flush - * should already return EOF on errors; on uClibc, - * it sometimes doesn't... */ - if (fflush((*c)->file) == EOF || ferror((*c)->file)) { - remove_client(c); - continue; - } - - (*c)->active = true; - } - - c = &(*c)->next; - } - - if (!provider->clients) { - remove_provider(provider); + /* + EOF before header end: just output the whole block + by pretending a clean state and delete the provider + */ + if (!provider_data(provider, NULL, 0, true)) return; - } + provider_del(provider); + return; } + + provider->last = data.buf[r-1]; + + char *sep = memmem(&data, 1 + r, "\n\n", 2); + + /* + When we found a separator, split the message, so + provider_maintain() is run as soon as possible, in case a new + client needs to be activated. + */ + size_t len1 = sep ? (sep + 2) - data.buf : r; + if (!provider_data(provider, data.buf, len1, sep)) + return; + + if (!sep) + return; + + /* + If there is a second chunk after the first double newline, + we don't need to split it further: All new clients have already + been enabled + */ + size_t len2 = r - len1; + if (len2) + provider_data( + provider, data.buf + len1, len2, + data.buf[r-2] == '\n' && data.buf[r-1] == '\n' + ); } -static void handle_accept(uint32_t events) { +static void epoll_handle_accept(uint32_t events) { if (events != EPOLLIN) { syslog(LOG_ERR, "unexpected event on listening socket: %u\n", (unsigned)events); exit(1); @@ -401,38 +534,40 @@ static void handle_accept(uint32_t events) { return; } - FILE *file = fdopen(fd, "r+"); - if (!file) { - syslog(LOG_WARNING, "fdopen: %s\n", strerror(errno)); - close(fd); - return; - } - char command[1024]; - bool ok = fgets(command, sizeof(command), file); - if (!ok || !command[0] || !feof(file)) { - fclose(file); - return; + char *c = command; + while (true) { + ssize_t r = read(fd, c, command + (sizeof(command)-1) - c); + if (r < 0) { + close(fd); + return; + } + + if (r == 0) { + *c = 0; + break; + } + + c += r; } fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK); - provider_t *p = get_provider(command); + provider_t *p = provider_get(command); if (!p) { - fclose(file); + close(fd); return; } - add_client(p, file); - handle_data(p); + client_add(p, fd); + provider_maintain(p); } void cleanup(void) { while (providers) - remove_provider(providers); + provider_del(providers); } - int main() { init_epoll(); create_socket(); @@ -453,9 +588,9 @@ int main() { } if (event.data.ptr == &listen_event) - handle_accept(event.events); + epoll_handle_accept(event.events); else - handle_data(event.data.ptr); + epoll_handle_provider(event.data.ptr); } cleanup();