sse-multiplex: don't use stdio FILE I/O for nonblocking sockets

The behaviour is not defined; musl loses parts of long data lines. Change
code to work with file descriptors directly and handle buffering ourselves.
This commit is contained in:
Matthias Schiffer 2018-02-13 21:23:03 +01:00
parent 9a6ad5ce84
commit 7abd688e6a
No known key found for this signature in database
GPG Key ID: 16EF3F64CB201D9C
2 changed files with 294 additions and 159 deletions

View File

@ -1,7 +1,7 @@
include $(TOPDIR)/rules.mk
PKG_NAME:=sse-multiplex
PKG_VERSION:=1
PKG_VERSION:=2
PKG_LICENSE:=BSD-2-Clause

View File

@ -1,6 +1,6 @@
/*
Copyright (c) 2015, Matthias Schiffer <mschiffer@universe-factory.net>
Copyright (c) 2015-2018, Matthias Schiffer <mschiffer@universe-factory.net>
All rights reserved.
Redistribution and use in source and binary forms, with or without
@ -48,6 +48,8 @@
typedef struct client client_t;
typedef struct provider provider_t;
typedef void (*handler_t)(provider_t *, void *buffer, size_t len);
static volatile bool running = true;
@ -58,11 +60,19 @@ static struct epoll_event listen_event = {};
static provider_t *providers = NULL;
typedef enum {
CLIENT_STATE_NEW = 0,
CLIENT_STATE_HEADER_SENT,
CLIENT_STATE_ACTIVE,
CLIENT_STATE_CLOSE,
} client_state_t;
struct client {
struct client *next;
FILE *file;
bool active;
int fd;
client_state_t state;
};
struct provider {
@ -70,47 +80,26 @@ struct provider {
struct provider *next;
char *command;
FILE *file;
int fd;
struct epoll_event event;
size_t header_buflen;
size_t header_len;
char *header;
bool preclean;
char last;
bool clean;
handler_t handler;
client_t *clients;
};
static char * read_header(FILE *file) {
size_t buflen = 256, content_len = 0;
char *buffer = malloc(buflen);
while (true) {
size_t space = buflen - content_len;
if (space < 128) {
buflen += 256;
buffer = realloc(buffer, buflen);
space = buflen - content_len;
}
bool ok = fgets(buffer+content_len, space, file);
if (!ok) {
free(buffer);
return NULL;
}
content_len += strlen(buffer+content_len);
if (content_len >= 2 && buffer[content_len-2] == '\n' && buffer[content_len-1] == '\n')
return buffer;
}
}
static FILE * run_command(const char *command) {
static int run_command(const char *command) {
int pipefd[2];
if (pipe(pipefd) < 0) {
syslog(LOG_ERR, "pipe: %s", strerror(errno));
return NULL;
return -1;
}
pid_t pid = fork();
@ -118,19 +107,12 @@ static FILE * run_command(const char *command) {
syslog(LOG_ERR, "fork: %s", strerror(errno));
close(pipefd[0]);
close(pipefd[1]);
return NULL;
return -1;
}
if (pid > 0) {
close(pipefd[1]);
FILE *file = fdopen(pipefd[0], "r");
if (!file) {
close(pipefd[0]);
return NULL;
}
return file;
return pipefd[0];
}
else {
close(pipefd[0]);
@ -151,32 +133,131 @@ static FILE * run_command(const char *command) {
}
}
static provider_t * new_provider(const char *command) {
FILE *file = run_command(command);
if (!file) {
syslog(LOG_WARNING, "unable to start provider `%s'", command);
/** Write a whole buffer to a FD, retrying on partial writes */
static bool feed(int fd, void *buffer, size_t len) {
while (len) {
ssize_t w = write(fd, buffer, len);
if (w == 0)
return false;
if (w < 0) {
if (errno == EINTR)
continue;
return false;
}
buffer += w;
len -= w;
}
return true;
}
/**
Creates a new client for a given socket FD and adds
it to a provider's client list
*/
static void client_add(provider_t *p, int fd) {
client_t *c = calloc(1, sizeof(*c));
c->fd = fd;
c->next = p->clients;
p->clients = c;
}
/** Writes a buffer to a client's FD if the client is active */
static void client_feed(client_t *c, void *buffer, size_t len) {
if (!feed(c->fd, buffer, len))
c->state = CLIENT_STATE_CLOSE;
}
static void client_free(client_t *c) {
close(c->fd);
free(c);
}
/** Writes a buffer to all active clients of a given provider */
static void provider_handle_data(provider_t *provider, void *buffer, size_t len) {
if (!len)
return;
for (client_t *c = provider->clients; c; c = c->next) {
if (c->state == CLIENT_STATE_ACTIVE)
client_feed(c, buffer, len);
}
}
/**
Adds a buffer to the header buffer of a provider
The HTTP header generated by the provider is reproduced for each new
client connecting for the same provider, so it must be stored in a
buffer. New providers are using this hander. As soon as the input is
clean (a double newline terminating the header has been received), this
handler replaces itself with provider_handle_data().
*/
static void provider_handle_header(provider_t *provider, void *buffer, size_t len) {
if (provider->clean)
provider->handler = provider_handle_data;
if (!len)
return;
size_t new_len = provider->header_len + len;
if (new_len < provider->header_len)
goto overflow;
if (new_len > provider->header_buflen) {
if (!provider->header_buflen)
provider->header_buflen = 128;
while (new_len > provider->header_buflen) {
provider->header_buflen <<= 1;
if (!provider->header_buflen)
goto overflow;
}
provider->header = realloc(provider->header, provider->header_buflen);
}
memcpy(provider->header+provider->header_len, buffer, len);
provider->header_len = new_len;
return;
overflow:
/* Just a safeguard, we're OOM long before
* an overflow
*
* We can't really do anything useful here,
* so we just drop the buffer ¯\_()_/¯
*/
free(provider->header);
provider->header_len = 0;
provider->header_buflen = 0;
return;
}
/** Runs the given command, creating a new provider for the command's stdout pipe */
static provider_t * provider_new(const char *command) {
int fd = run_command(command);
if (fd < 0) {
syslog(LOG_WARNING, "unable to run command `%s'", command);
return NULL;
}
char *header = read_header(file);
if (!header) {
fclose(file);
return NULL;
}
fcntl(fileno(file), F_SETFL, fcntl(fileno(file), F_GETFL) | O_NONBLOCK);
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
provider_t *p = calloc(1, sizeof(*p));
p->command = strdup(command);
p->file = file;
p->header = header;
p->preclean = true;
p->clean = true;
p->fd = fd;
p->handler = provider_handle_header;
p->event.events = EPOLLIN|EPOLLRDHUP;
p->event.data.ptr = p;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fileno(file), &p->event) < 0) {
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &p->event) < 0) {
fprintf(stderr, "epoll_ctl: %s\n", strerror(errno));
exit(1);
}
@ -189,41 +270,24 @@ static provider_t * new_provider(const char *command) {
return p;
}
static provider_t * get_provider(const char *command) {
/**
Either retrieves an existing provider for the given command or
creates a new one if none exists
*/
static provider_t * provider_get(const char *command) {
provider_t *p;
for (p = providers; p; p = p->next) {
if (!strcmp(p->command, command))
return p;
}
return new_provider(command);
return provider_new(command);
}
static void free_clients(client_t *clients) {
while (clients) {
client_t *next = clients->next;
fclose(clients->file);
free(clients);
clients = next;
}
}
static void free_provider(provider_t *p) {
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fileno(p->file), NULL) < 0) {
fprintf(stderr, "epoll_ctl: %s\n", strerror(errno));
exit(1);
}
free(p->command);
fclose(p->file);
free(p->header);
free_clients(p->clients);
free(p);
}
static void remove_provider(provider_t *p) {
/**
Cleans up behind a provider, removes it from the global provider list
and frees the provider */
static void provider_del(provider_t *p) {
if (p->next)
p->next->prev = p->prev;
@ -232,31 +296,86 @@ static void remove_provider(provider_t *p) {
else
providers = p->next;
free_provider(p);
}
static void add_client(provider_t *p, FILE *file) {
if (fputs(p->header, file) == EOF || fflush(file) == EOF || ferror(file)) {
fclose(file);
return;
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, p->fd, NULL) < 0) {
fprintf(stderr, "epoll_ctl: %s\n", strerror(errno));
exit(1);
}
client_t *c = calloc(1, sizeof(*c));
c->file = file;
c->active = p->clean;
for (client_t *c = p->clients; c; c = p->clients) {
p->clients = c->next;
client_free(c);
}
c->next = p->clients;
p->clients = c;
free(p->command);
close(p->fd);
free(p->header);
free(p);
}
static void remove_client(client_t **client) {
client_t *c = *client;
*client = c->next;
/**
Periodic maintenance
fclose(c->file);
free(c);
New clients are activated as soon as the input is clean (we have just
read a double newline); clients that have failed are deleted.
When all clients have been removed, the provider itself is deleted and
false is returned.
*/
static bool provider_maintain(provider_t *p) {
for (client_t **cp = &p->clients, *c = *cp; c; c = *cp) {
switch (c->state) {
case CLIENT_STATE_NEW:
if (p->handler == provider_handle_header)
break;
/*
Set state first, in case client_feed()
sets the state to CLIENT_STATE_CLOSE
*/
c->state = CLIENT_STATE_HEADER_SENT;
client_feed(c, p->header, p->header_len);
continue;
case CLIENT_STATE_HEADER_SENT:
if (p->clean)
c->state = CLIENT_STATE_ACTIVE;
break;
case CLIENT_STATE_ACTIVE:
break;
case CLIENT_STATE_CLOSE:
*cp = c->next;
client_free(c);
continue;
}
cp = &c->next;
}
if (!p->clients) {
provider_del(p);
return false;
}
return true;
}
/**
Handles input data from a provider
clean must be set to true if we are at the end of the header or an SSE
record (a double newline).
Returns false when the provider has been deleted because all clients
disappeared.
*/
static bool provider_data(provider_t *provider, void *buf, size_t len, bool clean) {
provider->clean = clean;
provider->handler(provider, buf, len);
return provider_maintain(provider);
}
static void init_epoll(void) {
epoll_fd = epoll_create1(EPOLL_CLOEXEC);
@ -345,51 +464,65 @@ static void setup_signals(void) {
sigaction(SIGPIPE, &action, NULL);
}
static void handle_data(provider_t *provider) {
while (true) {
if (feof(provider->file)) {
remove_provider(provider);
return;
}
/** Handles input from a provider */
static void epoll_handle_provider(provider_t *provider) {
/*
The "last" field contains the last character from the previously
read buffer. This allows us to search for double newlines that
span two reads.
*/
struct {
char last;
char buf[1024];
bool ok = fgets(buf, sizeof(buf), provider->file);
if (!ok)
} data;
data.last = provider->last;
ssize_t r = read(provider->fd, data.buf, sizeof(data.buf));
if (r <= 0) {
if (r < 0 && errno == EINTR)
return;
provider->clean = provider->preclean && (buf[0] == '\n');
provider->preclean = (buf[strlen(buf)-1] == '\n');
client_t **c = &provider->clients;
while (*c) {
if ((*c)->active && fputs(buf, (*c)->file) == EOF) {
remove_client(c);
continue;
}
if (provider->clean) {
/* The ferror check should be redundant, as flush
* should already return EOF on errors; on uClibc,
* it sometimes doesn't... */
if (fflush((*c)->file) == EOF || ferror((*c)->file)) {
remove_client(c);
continue;
}
(*c)->active = true;
}
c = &(*c)->next;
}
if (!provider->clients) {
remove_provider(provider);
/*
EOF before header end: just output the whole block
by pretending a clean state and delete the provider
*/
if (!provider_data(provider, NULL, 0, true))
return;
}
provider_del(provider);
return;
}
provider->last = data.buf[r-1];
char *sep = memmem(&data, 1 + r, "\n\n", 2);
/*
When we found a separator, split the message, so
provider_maintain() is run as soon as possible, in case a new
client needs to be activated.
*/
size_t len1 = sep ? (sep + 2) - data.buf : r;
if (!provider_data(provider, data.buf, len1, sep))
return;
if (!sep)
return;
/*
If there is a second chunk after the first double newline,
we don't need to split it further: All new clients have already
been enabled
*/
size_t len2 = r - len1;
if (len2)
provider_data(
provider, data.buf + len1, len2,
data.buf[r-2] == '\n' && data.buf[r-1] == '\n'
);
}
static void handle_accept(uint32_t events) {
static void epoll_handle_accept(uint32_t events) {
if (events != EPOLLIN) {
syslog(LOG_ERR, "unexpected event on listening socket: %u\n", (unsigned)events);
exit(1);
@ -401,38 +534,40 @@ static void handle_accept(uint32_t events) {
return;
}
FILE *file = fdopen(fd, "r+");
if (!file) {
syslog(LOG_WARNING, "fdopen: %s\n", strerror(errno));
close(fd);
return;
}
char command[1024];
bool ok = fgets(command, sizeof(command), file);
if (!ok || !command[0] || !feof(file)) {
fclose(file);
return;
char *c = command;
while (true) {
ssize_t r = read(fd, c, command + (sizeof(command)-1) - c);
if (r < 0) {
close(fd);
return;
}
if (r == 0) {
*c = 0;
break;
}
c += r;
}
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
provider_t *p = get_provider(command);
provider_t *p = provider_get(command);
if (!p) {
fclose(file);
close(fd);
return;
}
add_client(p, file);
handle_data(p);
client_add(p, fd);
provider_maintain(p);
}
void cleanup(void) {
while (providers)
remove_provider(providers);
provider_del(providers);
}
int main() {
init_epoll();
create_socket();
@ -453,9 +588,9 @@ int main() {
}
if (event.data.ptr == &listen_event)
handle_accept(event.events);
epoll_handle_accept(event.events);
else
handle_data(event.data.ptr);
epoll_handle_provider(event.data.ptr);
}
cleanup();