From 3f78449b9d902cf9435bf582cc01aff2f3011a26 Mon Sep 17 00:00:00 2001 From: Matthias Schiffer Date: Sun, 13 Dec 2015 04:09:17 +0100 Subject: [PATCH] Add new package sse-multiplex --- net/sse-multiplex/Makefile | 36 ++ net/sse-multiplex/files/sse-multiplexd.init | 15 + net/sse-multiplex/src/CMakeLists.txt | 18 + net/sse-multiplex/src/sse-multiplex.c | 99 +++++ net/sse-multiplex/src/sse-multiplex.h.in | 4 + net/sse-multiplex/src/sse-multiplexd.c | 460 ++++++++++++++++++++ 6 files changed, 632 insertions(+) create mode 100644 net/sse-multiplex/Makefile create mode 100755 net/sse-multiplex/files/sse-multiplexd.init create mode 100644 net/sse-multiplex/src/CMakeLists.txt create mode 100644 net/sse-multiplex/src/sse-multiplex.c create mode 100644 net/sse-multiplex/src/sse-multiplex.h.in create mode 100644 net/sse-multiplex/src/sse-multiplexd.c diff --git a/net/sse-multiplex/Makefile b/net/sse-multiplex/Makefile new file mode 100644 index 0000000..a7e834a --- /dev/null +++ b/net/sse-multiplex/Makefile @@ -0,0 +1,36 @@ +include $(TOPDIR)/rules.mk + +PKG_NAME:=sse-multiplex +PKG_VERSION:=1 + +PKG_LICENSE:=BSD-2-Clause + +PKG_BUILD_DIR := $(BUILD_DIR)/$(PKG_NAME) + +include $(INCLUDE_DIR)/package.mk +include $(INCLUDE_DIR)/cmake.mk + +define Package/sse-multiplex + SECTION:=net + CATEGORY:=Network + DEPENDS:= + TITLE:=Allows multiple clients to receive the same Server-Sent Event stream +endef + +CMAKE_OPTIONS += -DCMAKE_BUILD_TYPE:STRING=MINSIZEREL -DSSE_MULTIPLEX_SOCKET:STRING=/var/run/sse-multiplex.sock + +define Build/Prepare + mkdir -p $(PKG_BUILD_DIR) + $(CP) ./src/* $(PKG_BUILD_DIR)/ +endef + +define Package/sse-multiplex/install + $(INSTALL_DIR) $(1)/usr/sbin + $(INSTALL_BIN) $(PKG_INSTALL_DIR)/usr/sbin/sse-multiplexd $(1)/usr/sbin/ + $(INSTALL_BIN) $(PKG_INSTALL_DIR)/usr/sbin/sse-multiplex $(1)/usr/sbin/ + + $(INSTALL_DIR) $(1)/etc/init.d + $(INSTALL_BIN) ./files/sse-multiplexd.init $(1)/etc/init.d/sse-multiplexd +endef + +$(eval $(call BuildPackage,sse-multiplex)) diff --git a/net/sse-multiplex/files/sse-multiplexd.init b/net/sse-multiplex/files/sse-multiplexd.init new file mode 100755 index 0000000..6ebaf2b --- /dev/null +++ b/net/sse-multiplex/files/sse-multiplexd.init @@ -0,0 +1,15 @@ +#!/bin/sh /etc/rc.common + +START=50 + +SERVICE_WRITE_PID=1 +SERVICE_DAEMONIZE=1 + + +start() { + service_start /usr/sbin/sse-multiplexd +} + +stop() { + service_stop /usr/sbin/sse-multiplexd +} diff --git a/net/sse-multiplex/src/CMakeLists.txt b/net/sse-multiplex/src/CMakeLists.txt new file mode 100644 index 0000000..3e83212 --- /dev/null +++ b/net/sse-multiplex/src/CMakeLists.txt @@ -0,0 +1,18 @@ +cmake_minimum_required(VERSION 2.6) + +project(sse-multiplex C) + +set(SSE_MULTIPLEX_SOCKET "/run/sse-multiplex.sock" CACHE STRING "Path of the socket to use") + +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/sse-multiplex.h.in ${CMAKE_BINARY_DIR}/gen/generated/sse-multiplex.h) + +set_property(DIRECTORY PROPERTY COMPILE_DEFINITIONS _GNU_SOURCE) +include_directories(${CMAKE_BINARY_DIR}/gen) + +add_executable(sse-multiplexd sse-multiplexd.c) +set_property(TARGET sse-multiplexd PROPERTY COMPILE_FLAGS "-Wall -std=c99") + +add_executable(sse-multiplex sse-multiplex.c) +set_property(TARGET sse-multiplex PROPERTY COMPILE_FLAGS "-Wall -std=c99") + +install(TARGETS sse-multiplexd sse-multiplex RUNTIME DESTINATION sbin) diff --git a/net/sse-multiplex/src/sse-multiplex.c b/net/sse-multiplex/src/sse-multiplex.c new file mode 100644 index 0000000..b027ec6 --- /dev/null +++ b/net/sse-multiplex/src/sse-multiplex.c @@ -0,0 +1,99 @@ + +/* + Copyright (c) 2015, Matthias Schiffer + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + + +int main(int argc, char *argv[]) { + if (argc != 2) { + fprintf(stderr, "Usage: %s \n", argv[0]); + return 1; + } + + size_t socket_len = strlen(SSE_MULTIPLEX_SOCKET); + size_t len = offsetof(struct sockaddr_un, sun_path) + socket_len + 1; + uint8_t addrbuf[len]; + memset(addrbuf, 0, len); + + struct sockaddr_un *sa = (void*)addrbuf; + + sa->sun_family = AF_UNIX; + memcpy(sa->sun_path, SSE_MULTIPLEX_SOCKET, socket_len+1); + + int fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) { + fprintf(stderr, "Failed to create socket: %s\n", strerror(errno)); + return 1; + } + + if (connect(fd, (struct sockaddr*)sa, sizeof(addrbuf)) < 0) { + fprintf(stderr, "Can't connect to `%s': %s\n", SSE_MULTIPLEX_SOCKET, strerror(errno)); + return 1; + } + + char *command = argv[1]; + while (command[0]) { + ssize_t w = write(fd, command, strlen(command)); + if (w < 0) { + fprintf(stderr, "Can't write command: %s\n", strerror(errno)); + return 1; + } + + command += w; + } + + if (shutdown(fd, SHUT_WR) < 0) { + fprintf(stderr, "shutdown: %s\n", strerror(errno)); + return 1; + } + + setlinebuf(stdout); + + char buf[1024]; + ssize_t r; + while (1) { + r = recv(fd, buf, sizeof(buf), 0); + if (r < 0) { + fprintf(stderr, "read: %s\n", strerror(errno)); + return 1; + } + + if (r == 0) + return 0; + + fwrite(buf, r, 1, stdout); + } +} diff --git a/net/sse-multiplex/src/sse-multiplex.h.in b/net/sse-multiplex/src/sse-multiplex.h.in new file mode 100644 index 0000000..a155735 --- /dev/null +++ b/net/sse-multiplex/src/sse-multiplex.h.in @@ -0,0 +1,4 @@ +#pragma once + + +#define SSE_MULTIPLEX_SOCKET "@SSE_MULTIPLEX_SOCKET@" diff --git a/net/sse-multiplex/src/sse-multiplexd.c b/net/sse-multiplex/src/sse-multiplexd.c new file mode 100644 index 0000000..a9dc129 --- /dev/null +++ b/net/sse-multiplex/src/sse-multiplexd.c @@ -0,0 +1,460 @@ + +/* + Copyright (c) 2015, Matthias Schiffer + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + + +typedef struct client client_t; +typedef struct provider provider_t; + + +static volatile bool running = true; + +static int epoll_fd = -1; +static int listen_fd = -1; +static struct epoll_event listen_event = {}; + +static provider_t *providers = NULL; + + +struct client { + struct client *next; + + FILE *file; + bool active; +}; + +struct provider { + struct provider *prev; + struct provider *next; + + char *command; + FILE *file; + struct epoll_event event; + + char *header; + bool preclean; + bool clean; + + client_t *clients; +}; + + +static char * read_header(FILE *file) { + size_t buflen = 256, content_len = 0; + char *buffer = malloc(buflen); + + while (true) { + size_t space = buflen - content_len; + if (space < 128) { + buflen += 256; + buffer = realloc(buffer, buflen); + space = buflen - content_len; + } + + bool ok = fgets(buffer+content_len, space, file); + if (!ok) { + free(buffer); + return NULL; + } + + content_len += strlen(buffer+content_len); + + if (content_len >= 2 && buffer[content_len-2] == '\n' && buffer[content_len-1] == '\n') + return buffer; + } +} + +static FILE * run_command(const char *command) { + int pipefd[2]; + if (pipe(pipefd) < 0) { + syslog(LOG_ERR, "pipe: %s", strerror(errno)); + return NULL; + } + + pid_t pid = fork(); + if (pid < 0) { + syslog(LOG_ERR, "fork: %s", strerror(errno)); + close(pipefd[0]); + close(pipefd[1]); + return NULL; + } + + if (pid > 0) { + close(pipefd[1]); + + FILE *file = fdopen(pipefd[0], "r"); + if (!file) { + close(pipefd[0]); + return NULL; + } + + return file; + } + else { + close(pipefd[0]); + dup2(pipefd[1], STDOUT_FILENO); + + if (pipefd[1] != STDOUT_FILENO) + close(pipefd[1]); + + struct sigaction action = {}; + sigemptyset(&action.sa_mask); + + action.sa_handler = SIG_DFL; + sigaction(SIGCHLD, &action, NULL); + sigaction(SIGPIPE, &action, NULL); + + execl("/bin/sh", "/bin/sh", "-c", command, NULL); + _exit(127); + } +} + +static provider_t * new_provider(const char *command) { + FILE *file = run_command(command); + if (!file) { + syslog(LOG_WARNING, "unable to start provider `%s'", command); + return NULL; + } + + char *header = read_header(file); + if (!header) { + fclose(file); + return NULL; + } + + fcntl(fileno(file), F_SETFL, fcntl(fileno(file), F_GETFL) | O_NONBLOCK); + + provider_t *p = calloc(1, sizeof(*p)); + p->command = strdup(command); + p->file = file; + p->header = header; + p->preclean = true; + p->clean = true; + + p->event.events = EPOLLIN|EPOLLRDHUP; + p->event.data.ptr = p; + + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fileno(file), &p->event) < 0) { + fprintf(stderr, "epoll_ctl: %s\n", strerror(errno)); + exit(1); + } + + if (providers) + providers->prev = p; + p->next = providers; + providers = p; + + return p; +} + +static provider_t * get_provider(const char *command) { + provider_t *p; + for (p = providers; p; p = p->next) { + if (!strcmp(p->command, command)) + return p; + } + + return new_provider(command); +} + +static void free_clients(client_t *clients) { + while (clients) { + client_t *next = clients->next; + + fclose(clients->file); + free(clients); + + clients = next; + } +} + +static void free_provider(provider_t *p) { + if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fileno(p->file), NULL) < 0) { + fprintf(stderr, "epoll_ctl: %s\n", strerror(errno)); + exit(1); + } + + free(p->command); + fclose(p->file); + free(p->header); + free_clients(p->clients); + free(p); +} + +static void remove_provider(provider_t *p) { + if (p->next) + p->next->prev = p->prev; + + if (p->prev) + p->prev->next = p->next; + else + providers = p->next; + + free_provider(p); +} + +static void add_client(provider_t *p, FILE *file) { + if (fputs(p->header, file) == EOF || fflush(file) == EOF) { + fclose(file); + return; + } + + client_t *c = calloc(1, sizeof(*c)); + c->file = file; + c->active = p->clean; + + c->next = p->clients; + p->clients = c; +} + +static void remove_client(client_t **client) { + client_t *c = *client; + *client = c->next; + + fclose(c->file); + free(c); +} + + +static void init_epoll(void) { + epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (epoll_fd < 0) { + fprintf(stderr, "Unable initialize epoll: %s\n", strerror(errno)); + exit(1); + } +} + +static void unlink_socket(void) { + if (listen_fd >= 0) { + unlink(SSE_MULTIPLEX_SOCKET); + listen_fd = -1; + } +} + +static void create_socket(void) { + listen_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0); + if (listen_fd < 0) { + fprintf(stderr, "socket: %s\n", strerror(errno)); + exit(1); + } + + size_t socket_len = strlen(SSE_MULTIPLEX_SOCKET); + size_t len = offsetof(struct sockaddr_un, sun_path) + socket_len + 1; + uint8_t buf[len]; + memset(buf, 0, len); + + struct sockaddr_un *sa = (void*)buf; + + sa->sun_family = AF_UNIX; + memcpy(sa->sun_path, SSE_MULTIPLEX_SOCKET, socket_len+1); + + mode_t old_umask = umask(077); + + if (bind(listen_fd, (struct sockaddr*)sa, len)) { + switch (errno) { + case EADDRINUSE: + fprintf(stderr, "Unable to bind socket: the path `%s' already exists\n", SSE_MULTIPLEX_SOCKET); + break; + + default: + fprintf(stderr, "Unable to bind socket: %s\n", strerror(errno)); + } + + exit(1); + } + + umask(old_umask); + + if (atexit(unlink_socket)) { + fprintf(stderr, "atexit: %s", strerror(errno)); + unlink_socket(); + exit(1); + } + + if (listen(listen_fd, 16) < 0) { + fprintf(stderr, "listen: %s\n", strerror(errno)); + exit(1); + } + + listen_event.events = EPOLLIN; + listen_event.data.ptr = &listen_event; + + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &listen_event) < 0) { + fprintf(stderr, "epoll_ctl: %s\n", strerror(errno)); + exit(1); + } +} + +static void signal_exit(int signal __attribute__((unused))) { + running = false; +} + +static void setup_signals(void) { + struct sigaction action = {}; + sigemptyset(&action.sa_mask); + + action.sa_handler = signal_exit; + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); + sigaction(SIGQUIT, &action, NULL); + + action.sa_handler = SIG_IGN; + sigaction(SIGCHLD, &action, NULL); + sigaction(SIGPIPE, &action, NULL); +} + +static void handle_data(provider_t *provider) { + while (true) { + if (feof(provider->file)) { + remove_provider(provider); + return; + } + + char buf[1024]; + bool ok = fgets(buf, sizeof(buf), provider->file); + if (!ok) + return; + + provider->clean = provider->preclean && (buf[0] == '\n'); + provider->preclean = (buf[strlen(buf)-1] == '\n'); + + client_t **c = &provider->clients; + while (*c) { + if ((*c)->active && fputs(buf, (*c)->file) == EOF) { + remove_client(c); + continue; + } + + if (provider->clean) { + /* The ferror check should be redundant, as flush + * should already return EOF on errors; on uClibc, + * it sometimes doesn't... */ + if (fflush((*c)->file) == EOF || ferror((*c)->file)) { + remove_client(c); + continue; + } + + (*c)->active = true; + } + + c = &(*c)->next; + } + + if (!provider->clients) { + remove_provider(provider); + return; + } + } +} + +static void handle_accept(uint32_t events) { + if (events != EPOLLIN) { + syslog(LOG_ERR, "unexpected event on listening socket: %u\n", (unsigned)events); + exit(1); + } + + int fd = accept4(listen_fd, NULL, NULL, SOCK_CLOEXEC); + if (fd < 0) { + syslog(LOG_WARNING, "accept4: %s\n", strerror(errno)); + return; + } + + FILE *file = fdopen(fd, "r+"); + if (!file) { + syslog(LOG_WARNING, "fdopen: %s\n", strerror(errno)); + close(fd); + return; + } + + char command[1024]; + bool ok = fgets(command, sizeof(command), file); + if (!ok || !command[0] || !feof(file)) { + fclose(file); + return; + } + + provider_t *p = get_provider(command); + if (!p) { + fclose(file); + return; + } + + add_client(p, file); + handle_data(p); +} + +void cleanup(void) { + while (providers) + remove_provider(providers); +} + + +int main() { + init_epoll(); + create_socket(); + setup_signals(); + + while (running) { + struct epoll_event event; + int ret = epoll_wait(epoll_fd, &event, 1, -1); + if (ret == 0) + continue; + + if (ret < 0) { + if (errno == EINTR) + continue; + + syslog(LOG_ERR, "epoll_wait: %s\n", strerror(errno)); + exit(1); + } + + if (event.data.ptr == &listen_event) + handle_accept(event.events); + else + handle_data(event.data.ptr); + } + + cleanup(); +}