From 6b99c8d9cff7b3e8ae8f3204b99e7ea40f791349 Mon Sep 17 00:00:00 2001 From: Yves-Alexis Perez Date: Sun, 25 Aug 2013 15:37:26 +0200 Subject: Imported Upstream version 5.1.0 --- src/libstrongswan/networking/streams/stream.c | 426 +++++++++++++++++++++ src/libstrongswan/networking/streams/stream.h | 199 ++++++++++ .../networking/streams/stream_manager.c | 235 ++++++++++++ .../networking/streams/stream_manager.h | 96 +++++ .../networking/streams/stream_service.c | 332 ++++++++++++++++ .../networking/streams/stream_service.h | 104 +++++ 6 files changed, 1392 insertions(+) create mode 100644 src/libstrongswan/networking/streams/stream.c create mode 100644 src/libstrongswan/networking/streams/stream.h create mode 100644 src/libstrongswan/networking/streams/stream_manager.c create mode 100644 src/libstrongswan/networking/streams/stream_manager.h create mode 100644 src/libstrongswan/networking/streams/stream_service.c create mode 100644 src/libstrongswan/networking/streams/stream_service.h (limited to 'src/libstrongswan/networking/streams') diff --git a/src/libstrongswan/networking/streams/stream.c b/src/libstrongswan/networking/streams/stream.c new file mode 100644 index 000000000..8ecb89fc9 --- /dev/null +++ b/src/libstrongswan/networking/streams/stream.c @@ -0,0 +1,426 @@ +/* + * Copyright (C) 2013 Martin Willi + * Copyright (C) 2013 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See . + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include +#include +#include +#include + +typedef struct private_stream_t private_stream_t; + +/** + * Private data of an stream_t object. + */ +struct private_stream_t { + + /** + * Public stream_t interface. + */ + stream_t public; + + /** + * Underlying socket + */ + int fd; + + /** + * Callback if data is ready to read + */ + stream_cb_t read_cb; + + /** + * Data for read-ready callback + */ + void *read_data; + + /** + * Callback if write is non-blocking + */ + stream_cb_t write_cb; + + /** + * Data for write-ready callback + */ + void *write_data; +}; + +METHOD(stream_t, read_, ssize_t, + private_stream_t *this, void *buf, size_t len, bool block) +{ + while (TRUE) + { + ssize_t ret; + + if (block) + { + ret = read(this->fd, buf, len); + } + else + { + ret = recv(this->fd, buf, len, MSG_DONTWAIT); + if (ret == -1 && errno == EAGAIN) + { + /* unify EGAIN and EWOULDBLOCK */ + errno = EWOULDBLOCK; + } + } + if (ret == -1 && errno == EINTR) + { /* interrupted, try again */ + continue; + } + return ret; + } +} + +METHOD(stream_t, read_all, bool, + private_stream_t *this, void *buf, size_t len) +{ + ssize_t ret; + + while (len) + { + ret = read_(this, buf, len, TRUE); + if (ret < 0) + { + return FALSE; + } + if (ret == 0) + { + errno = ECONNRESET; + return FALSE; + } + len -= ret; + buf += ret; + } + return TRUE; +} + +METHOD(stream_t, write_, ssize_t, + private_stream_t *this, void *buf, size_t len, bool block) +{ + ssize_t ret; + + while (TRUE) + { + if (block) + { + ret = write(this->fd, buf, len); + } + else + { + ret = send(this->fd, buf, len, MSG_DONTWAIT); + if (ret == -1 && errno == EAGAIN) + { + /* unify EGAIN and EWOULDBLOCK */ + errno = EWOULDBLOCK; + } + } + if (ret == -1 && errno == EINTR) + { /* interrupted, try again */ + continue; + } + return ret; + } +} + +METHOD(stream_t, write_all, bool, + private_stream_t *this, void *buf, size_t len) +{ + ssize_t ret; + + while (len) + { + ret = write_(this, buf, len, TRUE); + if (ret < 0) + { + return FALSE; + } + if (ret == 0) + { + errno = ECONNRESET; + return FALSE; + } + len -= ret; + buf += ret; + } + return TRUE; +} + +/** + * Remove a registered watcher + */ +static void remove_watcher(private_stream_t *this) +{ + if (this->read_cb || this->write_cb) + { + lib->watcher->remove(lib->watcher, this->fd); + } +} + +/** + * Watcher callback + */ +static bool watch(private_stream_t *this, int fd, watcher_event_t event) +{ + bool keep = FALSE; + stream_cb_t cb; + + switch (event) + { + case WATCHER_READ: + cb = this->read_cb; + this->read_cb = NULL; + keep = cb(this->read_data, &this->public); + if (keep) + { + this->read_cb = cb; + } + break; + case WATCHER_WRITE: + cb = this->write_cb; + this->write_cb = NULL; + keep = cb(this->write_data, &this->public); + if (keep) + { + this->write_cb = cb; + } + break; + case WATCHER_EXCEPT: + break; + } + return keep; +} + +/** + * Register watcher for stream callbacks + */ +static void add_watcher(private_stream_t *this) +{ + watcher_event_t events = 0; + + if (this->read_cb) + { + events |= WATCHER_READ; + } + if (this->write_cb) + { + events |= WATCHER_WRITE; + } + if (events) + { + lib->watcher->add(lib->watcher, this->fd, events, + (watcher_cb_t)watch, this); + } +} + +METHOD(stream_t, on_read, void, + private_stream_t *this, stream_cb_t cb, void *data) +{ + remove_watcher(this); + + this->read_cb = cb; + this->read_data = data; + + add_watcher(this); +} + +METHOD(stream_t, on_write, void, + private_stream_t *this, stream_cb_t cb, void *data) +{ + remove_watcher(this); + + this->write_cb = cb; + this->write_data = data; + + add_watcher(this); +} + +METHOD(stream_t, get_file, FILE*, + private_stream_t *this) +{ + FILE *file; + int fd; + + /* fclose() closes the FD passed to fdopen(), so dup() it */ + fd = dup(this->fd); + if (fd == -1) + { + return NULL; + } + file = fdopen(fd, "w+"); + if (!file) + { + close(fd); + } + return file; +} + +METHOD(stream_t, destroy, void, + private_stream_t *this) +{ + remove_watcher(this); + close(this->fd); + free(this); +} + +/** + * See header + */ +stream_t *stream_create_from_fd(int fd) +{ + private_stream_t *this; + + INIT(this, + .public = { + .read = _read_, + .read_all = _read_all, + .on_read = _on_read, + .write = _write_, + .write_all = _write_all, + .on_write = _on_write, + .get_file = _get_file, + .destroy = _destroy, + }, + .fd = fd, + ); + + return &this->public; +} + +/** + * See header + */ +int stream_parse_uri_unix(char *uri, struct sockaddr_un *addr) +{ + if (!strpfx(uri, "unix://")) + { + return -1; + } + uri += strlen("unix://"); + + memset(addr, 0, sizeof(*addr)); + addr->sun_family = AF_UNIX; + strncpy(addr->sun_path, uri, sizeof(addr->sun_path)); + addr->sun_path[sizeof(addr->sun_path)-1] = '\0'; + + return offsetof(struct sockaddr_un, sun_path) + strlen(addr->sun_path); +} + +/** + * See header + */ +stream_t *stream_create_unix(char *uri) +{ + struct sockaddr_un addr; + int len, fd; + + len = stream_parse_uri_unix(uri, &addr); + if (len == -1) + { + DBG1(DBG_NET, "invalid stream URI: '%s'", uri); + return NULL; + } + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) + { + DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno)); + return NULL; + } + if (connect(fd, (struct sockaddr*)&addr, len) < 0) + { + DBG1(DBG_NET, "connecting to '%s' failed: %s", uri, strerror(errno)); + close(fd); + return NULL; + } + return stream_create_from_fd(fd); +} + +/** + * See header. + */ +int stream_parse_uri_tcp(char *uri, struct sockaddr *addr) +{ + char *pos, buf[128]; + host_t *host; + u_long port; + int len; + + if (!strpfx(uri, "tcp://")) + { + return -1; + } + uri += strlen("tcp://"); + pos = strrchr(uri, ':'); + if (!pos) + { + return -1; + } + if (*uri == '[' && pos > uri && *(pos - 1) == ']') + { + /* IPv6 URI */ + snprintf(buf, sizeof(buf), "%.*s", (int)(pos - uri - 2), uri + 1); + } + else + { + snprintf(buf, sizeof(buf), "%.*s", (int)(pos - uri), uri); + } + port = strtoul(pos + 1, &pos, 10); + if (port == ULONG_MAX || *pos || port > 65535) + { + return -1; + } + host = host_create_from_dns(buf, AF_UNSPEC, port); + if (!host) + { + return -1; + } + len = *host->get_sockaddr_len(host); + memcpy(addr, host->get_sockaddr(host), len); + host->destroy(host); + return len; +} + +/** + * See header + */ +stream_t *stream_create_tcp(char *uri) +{ + union { + struct sockaddr_in in; + struct sockaddr_in6 in6; + struct sockaddr sa; + } addr; + int fd, len; + + len = stream_parse_uri_tcp(uri, &addr.sa); + if (len == -1) + { + DBG1(DBG_NET, "invalid stream URI: '%s'", uri); + return NULL; + } + fd = socket(addr.sa.sa_family, SOCK_STREAM, 0); + if (fd < 0) + { + DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno)); + return NULL; + } + if (connect(fd, &addr.sa, len)) + { + DBG1(DBG_NET, "connecting to '%s' failed: %s", uri, strerror(errno)); + close(fd); + return NULL; + } + return stream_create_from_fd(fd); +} diff --git a/src/libstrongswan/networking/streams/stream.h b/src/libstrongswan/networking/streams/stream.h new file mode 100644 index 000000000..810514da9 --- /dev/null +++ b/src/libstrongswan/networking/streams/stream.h @@ -0,0 +1,199 @@ +/* + * Copyright (C) 2013 Martin Willi + * Copyright (C) 2013 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See . + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +/** + * @defgroup stream stream + * @{ @ingroup streams + */ + +#ifndef STREAM_H_ +#define STREAM_H_ + +typedef struct stream_t stream_t; + +#include + +#include +#include + +/** + * Constructor function prototype for stream_t. + * + * @param uri URI to create a stream for + * @return stream instance, NULL on error + */ +typedef stream_t*(*stream_constructor_t)(char *uri); + +/** + * Callback function prototype, called when stream is ready. + * + * It is allowed to destroy the stream during the callback, but only if it has + * no other active on_read()/on_write() callback and returns FALSE. It is not + * allowed to to call on_read()/on_write/() during the callback. + * + * As select() may return even if a read()/write() would actually block, it is + * recommended to use the non-blocking calls and handle return values + * appropriately. + * + * @param data data passed during callback registration + * @param stream associated stream + * @return FALSE unregisters the invoked callback, TRUE keeps it + */ +typedef bool (*stream_cb_t)(void *data, stream_t *stream); + +/** + * Abstraction of a Berkley socket using stream semantics. + */ +struct stream_t { + + /** + * Read data from the stream. + * + * If "block" is FALSE and no data is available, the function returns -1 + * and sets errno to EWOULDBLOCK. + * + * @param buf data buffer to read into + * @param len number of bytes to read + * @param block TRUE to use a blocking read + * @return number of bytes read, -1 on error + */ + ssize_t (*read)(stream_t *this, void *buf, size_t len, bool block); + + /** + * Read data from the stream, avoiding short reads. + * + * This call is always blocking, and reads until len has been read + * completely. If the connection is closed before enough bytes could be + * returned, errno is set to ECONNRESET. + * + * @param buf data buffer to read into + * @param len number of bytes to read + * @return TRUE if len bytes read, FALSE on error + */ + bool (*read_all)(stream_t *this, void *buf, size_t len); + + /** + * Register a callback to invoke when stream has data to read. + * + * @param cb callback function, NULL to unregister + * @param data data to pass to callback + */ + void (*on_read)(stream_t *this, stream_cb_t cb, void *data); + + /** + * Write data to the stream. + * + * If "block" is FALSE and the write would block, the function returns -1 + * and sets errno to EWOULDBLOCK. + * + * @param buf data buffer to write + * @param len number of bytes to write + * @param block TRUE to use a blocking write + * @return number of bytes written, -1 on error + */ + ssize_t (*write)(stream_t *this, void *buf, size_t len, bool block); + + /** + * Write data to the stream, avoiding short writes. + * + * This call is always blocking, and writes until len bytes has been + * written. + * + * @param buf data buffer to write + * @param len number of bytes to write + * @return TRUE if len bytes written, FALSE on error + */ + bool (*write_all)(stream_t *this, void *buf, size_t len); + + /** + * Register a callback to invoke when a write would not block. + * + * @param cb callback function, NULL to unregister + * @param data data to pass to callback + */ + void (*on_write)(stream_t *this, stream_cb_t cb, void *data); + + /** + * Get a FILE reference for this stream. + * + * @return FILE*, must be fclose()d, NULL on error + */ + FILE* (*get_file)(stream_t *this); + + /** + * Destroy a stream_t. + */ + void (*destroy)(stream_t *this); +}; + +/** + * Create a stream for UNIX sockets. + * + * UNIX URIs start with unix://, followed by the socket path. For absolute + * paths, an URI looks something like: + * + * unix:///path/to/socket + * + * @param uri UNIX socket specific URI, must start with "unix://" + * @return stream instance, NULL on failure + */ +stream_t *stream_create_unix(char *uri); + +/** + * Helper function to parse a unix:// URI to a sockaddr + * + * @param uri URI + * @param addr sockaddr + * @return length of sockaddr, -1 on error + */ +int stream_parse_uri_unix(char *uri, struct sockaddr_un *addr); + +/** + * Create a stream for TCP sockets. + * + * TCP URIs start with tcp://, followed by a hostname (FQDN or IP), followed + * by a colon separated port. A full TCP uri looks something like: + * + * tcp://srv.example.com:5555 + * tcp://0.0.0.0:1234 + * tcp://[fec2::1]:7654 + * + * There is no default port, so a colon after tcp:// is mandatory. + * + * @param uri TCP socket specific URI, must start with "tcp://" + * @return stream instance, NULL on failure + */ +stream_t *stream_create_tcp(char *uri); + +/** + * Helper function to parse a tcp:// URI to a sockaddr + * + * @param uri URI + * @param addr sockaddr, large enough for URI + * @return length of sockaddr, -1 on error + */ +int stream_parse_uri_tcp(char *uri, struct sockaddr *addr); + +/** + * Create a stream from a file descriptor. + * + * The file descriptor MUST be a socket for non-blocking operation. + * + * @param fd file descriptor to wrap into a stream_t + * @return stream instance + */ +stream_t *stream_create_from_fd(int fd); + +#endif /** STREAM_H_ @}*/ diff --git a/src/libstrongswan/networking/streams/stream_manager.c b/src/libstrongswan/networking/streams/stream_manager.c new file mode 100644 index 000000000..2cbd6127e --- /dev/null +++ b/src/libstrongswan/networking/streams/stream_manager.c @@ -0,0 +1,235 @@ +/* + * Copyright (C) 2013 Martin Willi + * Copyright (C) 2013 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See . + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "stream_manager.h" + +#include + +typedef struct private_stream_manager_t private_stream_manager_t; + +/** + * Private data of an stream_manager_t object. + */ +struct private_stream_manager_t { + + /** + * Public stream_manager_t interface. + */ + stream_manager_t public; + + /** + * List of registered stream constructors, as stream_entry_t + */ + linked_list_t *streams; + + /** + * List of registered service constructors, as service_entry_t + */ + linked_list_t *services; + + /** + * Lock for all lists + */ + rwlock_t *lock; +}; + +/** + * Registered stream backend + */ +typedef struct { + /** URI prefix */ + char *prefix; + /** constructor function */ + stream_constructor_t create; +} stream_entry_t; + +/** + * Registered service backend + */ +typedef struct { + /** URI prefix */ + char *prefix; + /** constructor function */ + stream_service_constructor_t create; +} service_entry_t; + +METHOD(stream_manager_t, connect_, stream_t*, + private_stream_manager_t *this, char *uri) +{ + enumerator_t *enumerator; + stream_entry_t *entry; + stream_t *stream = NULL; + + this->lock->read_lock(this->lock); + enumerator = this->streams->create_enumerator(this->streams); + while (enumerator->enumerate(enumerator, &entry)) + { + if (strpfx(uri, entry->prefix)) + { + stream = entry->create(uri); + if (stream) + { + break; + } + } + } + enumerator->destroy(enumerator); + this->lock->unlock(this->lock); + + return stream; +} + +METHOD(stream_manager_t, create_service, stream_service_t*, + private_stream_manager_t *this, char *uri, int backlog) +{ + enumerator_t *enumerator; + service_entry_t *entry; + stream_service_t *service = NULL; + + this->lock->read_lock(this->lock); + enumerator = this->services->create_enumerator(this->services); + while (enumerator->enumerate(enumerator, &entry)) + { + if (strpfx(uri, entry->prefix)) + { + service = entry->create(uri, backlog); + if (service) + { + break; + } + } + } + enumerator->destroy(enumerator); + this->lock->unlock(this->lock); + + return service; +} + +METHOD(stream_manager_t, add_stream, void, + private_stream_manager_t *this, char *prefix, stream_constructor_t create) +{ + stream_entry_t *entry; + + INIT(entry, + .prefix = strdup(prefix), + .create = create, + ); + + this->lock->write_lock(this->lock); + this->streams->insert_last(this->streams, entry); + this->lock->unlock(this->lock); +} + +METHOD(stream_manager_t, remove_stream, void, + private_stream_manager_t *this, stream_constructor_t create) +{ + enumerator_t *enumerator; + stream_entry_t *entry; + + this->lock->write_lock(this->lock); + enumerator = this->streams->create_enumerator(this->streams); + while (enumerator->enumerate(enumerator, &entry)) + { + if (entry->create == create) + { + this->streams->remove_at(this->streams, enumerator); + free(entry->prefix); + free(entry); + } + } + enumerator->destroy(enumerator); + this->lock->unlock(this->lock); +} + +METHOD(stream_manager_t, add_service, void, + private_stream_manager_t *this, char *prefix, + stream_service_constructor_t create) +{ + service_entry_t *entry; + + INIT(entry, + .prefix = strdup(prefix), + .create = create, + ); + + this->lock->write_lock(this->lock); + this->services->insert_last(this->services, entry); + this->lock->unlock(this->lock); +} + +METHOD(stream_manager_t, remove_service, void, + private_stream_manager_t *this, stream_service_constructor_t create) +{ + enumerator_t *enumerator; + service_entry_t *entry; + + this->lock->write_lock(this->lock); + enumerator = this->services->create_enumerator(this->services); + while (enumerator->enumerate(enumerator, &entry)) + { + if (entry->create == create) + { + this->services->remove_at(this->services, enumerator); + free(entry->prefix); + free(entry); + } + } + enumerator->destroy(enumerator); + this->lock->unlock(this->lock); +} + +METHOD(stream_manager_t, destroy, void, + private_stream_manager_t *this) +{ + remove_stream(this, stream_create_unix); + remove_stream(this, stream_create_tcp); + remove_service(this, stream_service_create_unix); + remove_service(this, stream_service_create_tcp); + + this->streams->destroy(this->streams); + this->services->destroy(this->services); + this->lock->destroy(this->lock); + free(this); +} + +/** + * See header + */ +stream_manager_t *stream_manager_create() +{ + private_stream_manager_t *this; + + INIT(this, + .public = { + .connect = _connect_, + .create_service = _create_service, + .add_stream = _add_stream, + .remove_stream = _remove_stream, + .add_service = _add_service, + .remove_service = _remove_service, + .destroy = _destroy, + }, + .streams = linked_list_create(), + .services = linked_list_create(), + .lock = rwlock_create(RWLOCK_TYPE_DEFAULT), + ); + + add_stream(this, "unix://", stream_create_unix); + add_stream(this, "tcp://", stream_create_tcp); + add_service(this, "unix://", stream_service_create_unix); + add_service(this, "tcp://", stream_service_create_tcp); + + return &this->public; +} diff --git a/src/libstrongswan/networking/streams/stream_manager.h b/src/libstrongswan/networking/streams/stream_manager.h new file mode 100644 index 000000000..352d93e2b --- /dev/null +++ b/src/libstrongswan/networking/streams/stream_manager.h @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2013 Martin Willi + * Copyright (C) 2013 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See . + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +/** + * @defgroup stream_manager stream_manager + * @{ @ingroup streams + */ + +#ifndef STREAM_MANAGER_H_ +#define STREAM_MANAGER_H_ + +typedef struct stream_manager_t stream_manager_t; + +#include +#include + +/** + * Manages client-server connections and services using stream_t backends. + */ +struct stream_manager_t { + + /** + * Create a client-server connection to a service. + * + * @param uri URI of service to connect to + * @return stream instance, NULL on error + */ + stream_t* (*connect)(stream_manager_t *this, char *uri); + + /** + * Create a new service under an URI to accept() client connections. + * + * @param uri URI of service to provide + * @param backlog size of the backlog queue, as passed to listen() + * @return service, NULL on error + */ + stream_service_t* (*create_service)(stream_manager_t *this, char *uri, + int backlog); + + /** + * Register a stream backend to the manager. + * + * @param prefix prefix of URIs to use the backend for + * @param create constructor function for the stream + */ + void (*add_stream)(stream_manager_t *this, char *prefix, + stream_constructor_t create); + + /** + * Unregister stream backends from the manager. + * + * @param create constructor function passed to add_stream() + */ + void (*remove_stream)(stream_manager_t *this, stream_constructor_t create); + + /** + * Register a stream service backend to the manager. + * + * @param prefix prefix of URIs to use the backend for + * @param create constructor function for the stream service + */ + void (*add_service)(stream_manager_t *this, char *prefix, + stream_service_constructor_t create); + + /** + * Unregister stream service backends from the manager. + * + * @param create constructor function passed to add_service() + */ + void (*remove_service)(stream_manager_t *this, + stream_service_constructor_t create); + + /** + * Destroy a stream_manager_t. + */ + void (*destroy)(stream_manager_t *this); +}; + +/** + * Create a stream_manager instance. + */ +stream_manager_t *stream_manager_create(); + +#endif /** STREAM_MANAGER_H_ @}*/ diff --git a/src/libstrongswan/networking/streams/stream_service.c b/src/libstrongswan/networking/streams/stream_service.c new file mode 100644 index 000000000..ece17b41f --- /dev/null +++ b/src/libstrongswan/networking/streams/stream_service.c @@ -0,0 +1,332 @@ +/* + * Copyright (C) 2013 Martin Willi + * Copyright (C) 2013 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See . + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +typedef struct private_stream_service_t private_stream_service_t; + +/** + * Private data of an stream_service_t object. + */ +struct private_stream_service_t { + + /** + * Public stream_service_t interface. + */ + stream_service_t public; + + /** + * Underlying socket + */ + int fd; + + /** + * Accept callback + */ + stream_service_cb_t cb; + + /** + * Accept callback data + */ + void *data; + + /** + * Job priority to invoke callback with + */ + job_priority_t prio; + + /** + * Maximum number of parallel callback invocations + */ + u_int cncrncy; + + /** + * Currently active jobs + */ + u_int active; + + /** + * mutex to lock active counter + */ + mutex_t *mutex; + + /** + * Condvar to wait for callback termination + */ + condvar_t *condvar; +}; + +/** + * Data to pass to async accept job + */ +typedef struct { + /** callback function */ + stream_service_cb_t cb; + /** callback data */ + void *data; + /** accepted connection */ + int fd; + /** reference to stream service */ + private_stream_service_t *this; +} async_data_t; + +/** + * Clean up accept data + */ +static void destroy_async_data(async_data_t *data) +{ + private_stream_service_t *this = data->this; + + this->mutex->lock(this->mutex); + if (this->active-- == this->cncrncy) + { + /* leaving concurrency limit, restart accept()ing. */ + this->public.on_accept(&this->public, this->cb, this->data, + this->prio, this->cncrncy); + } + this->condvar->signal(this->condvar); + this->mutex->unlock(this->mutex); + + if (data->fd != -1) + { + close(data->fd); + } + free(data); +} + +/** + * Async processing of accepted connection + */ +static job_requeue_t accept_async(async_data_t *data) +{ + stream_t *stream; + + stream = stream_create_from_fd(data->fd); + if (stream) + { + /* FD is now owned by stream, don't close it during cleanup */ + data->fd = -1; + thread_cleanup_push((void*)stream->destroy, stream); + thread_cleanup_pop(!data->cb(data->data, stream)); + } + return JOB_REQUEUE_NONE; +} + +/** + * Watcher callback function + */ +static bool watch(private_stream_service_t *this, int fd, watcher_event_t event) +{ + async_data_t *data; + bool keep = TRUE; + + INIT(data, + .cb = this->cb, + .data = this->data, + .fd = accept(fd, NULL, NULL), + .this = this, + ); + + if (data->fd != -1) + { + this->mutex->lock(this->mutex); + if (++this->active == this->cncrncy) + { + /* concurrency limit reached, stop accept()ing new connections */ + keep = FALSE; + } + this->mutex->unlock(this->mutex); + + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((void*)accept_async, data, + (void*)destroy_async_data, (callback_job_cancel_t)return_false, + this->prio)); + } + else + { + free(data); + } + return keep; +} + +METHOD(stream_service_t, on_accept, void, + private_stream_service_t *this, stream_service_cb_t cb, void *data, + job_priority_t prio, u_int cncrncy) +{ + this->mutex->lock(this->mutex); + + /* wait for all callbacks to return */ + while (this->active) + { + this->condvar->wait(this->condvar, this->mutex); + } + + if (this->cb) + { + lib->watcher->remove(lib->watcher, this->fd); + } + + this->cb = cb; + this->data = data; + if (prio <= JOB_PRIO_MAX) + { + this->prio = prio; + } + this->cncrncy = cncrncy; + + if (this->cb) + { + lib->watcher->add(lib->watcher, this->fd, + WATCHER_READ, (watcher_cb_t)watch, this); + } + + this->mutex->unlock(this->mutex); +} + +METHOD(stream_service_t, destroy, void, + private_stream_service_t *this) +{ + on_accept(this, NULL, NULL, this->prio, this->cncrncy); + close(this->fd); + this->mutex->destroy(this->mutex); + this->condvar->destroy(this->condvar); + free(this); +} + +/** + * See header + */ +stream_service_t *stream_service_create_from_fd(int fd) +{ + private_stream_service_t *this; + + INIT(this, + .public = { + .on_accept = _on_accept, + .destroy = _destroy, + }, + .fd = fd, + .prio = JOB_PRIO_MEDIUM, + .mutex = mutex_create(MUTEX_TYPE_RECURSIVE), + .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), + ); + + return &this->public; +} + +/** + * See header + */ +stream_service_t *stream_service_create_unix(char *uri, int backlog) +{ + struct sockaddr_un addr; + mode_t old; + int fd, len; + + len = stream_parse_uri_unix(uri, &addr); + if (len == -1) + { + DBG1(DBG_NET, "invalid stream URI: '%s'", uri); + return NULL; + } + if (!lib->caps->check(lib->caps, CAP_CHOWN)) + { /* required to chown(2) service socket */ + DBG1(DBG_NET, "socket '%s' requires CAP_CHOWN capability", uri); + return NULL; + } + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd == -1) + { + DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno)); + return NULL; + } + unlink(addr.sun_path); + + old = umask(~(S_IRWXU | S_IRWXG)); + if (bind(fd, (struct sockaddr*)&addr, len) < 0) + { + DBG1(DBG_NET, "binding socket '%s' failed: %s", uri, strerror(errno)); + close(fd); + return NULL; + } + umask(old); + if (chown(addr.sun_path, lib->caps->get_uid(lib->caps), + lib->caps->get_gid(lib->caps)) != 0) + { + DBG1(DBG_NET, "changing socket permissions for '%s' failed: %s", + uri, strerror(errno)); + } + if (listen(fd, backlog) < 0) + { + DBG1(DBG_NET, "listen on socket '%s' failed: %s", uri, strerror(errno)); + unlink(addr.sun_path); + close(fd); + return NULL; + } + return stream_service_create_from_fd(fd); +} + +/** + * See header + */ +stream_service_t *stream_service_create_tcp(char *uri, int backlog) +{ + union { + struct sockaddr_in in; + struct sockaddr_in6 in6; + struct sockaddr sa; + } addr; + int fd, len, on = 1; + + len = stream_parse_uri_tcp(uri, &addr.sa); + if (len == -1) + { + DBG1(DBG_NET, "invalid stream URI: '%s'", uri); + return NULL; + } + fd = socket(addr.sa.sa_family, SOCK_STREAM, 0); + if (fd < 0) + { + DBG1(DBG_NET, "opening socket '%s' failed: %s", uri, strerror(errno)); + return NULL; + } + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0) + { + DBG1(DBG_NET, "SO_REUSADDR on '%s' failed: %s", uri, strerror(errno)); + } + if (bind(fd, &addr.sa, len) < 0) + { + DBG1(DBG_NET, "binding socket '%s' failed: %s", uri, strerror(errno)); + close(fd); + return NULL; + } + if (listen(fd, backlog) < 0) + { + DBG1(DBG_NET, "listen on socket '%s' failed: %s", uri, strerror(errno)); + close(fd); + return NULL; + } + return stream_service_create_from_fd(fd); +} diff --git a/src/libstrongswan/networking/streams/stream_service.h b/src/libstrongswan/networking/streams/stream_service.h new file mode 100644 index 000000000..c8faba323 --- /dev/null +++ b/src/libstrongswan/networking/streams/stream_service.h @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2013 Martin Willi + * Copyright (C) 2013 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See . + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +/** + * @defgroup stream_service stream_service + * @{ @ingroup streams + */ + +#ifndef STREAM_SERVICE_H_ +#define STREAM_SERVICE_H_ + +typedef struct stream_service_t stream_service_t; + +#include +#include +#include + +/** + * Constructor function prototype for stream_service_t. + * + * @param uri URI to create a stream for + * @param backlog size of the backlog queue, as passed to listen() + * @return stream instance, NULL on error + */ +typedef stream_service_t*(*stream_service_constructor_t)(char *uri, int backlog); + +/** + * Service callback routine for accepting client connections. + * + * The passed stream gets closed/destroyed by the callback caller, unless + * TRUE is returned. + * + * @param data user data, as passed during registration + * @param stream accept()ed client connection + * @return TRUE to keep stream alive, FALSE to destroy it + */ +typedef bool (*stream_service_cb_t)(void *data, stream_t *stream); + +/** + * A service accepting client connection streams. + */ +struct stream_service_t { + + /** + * Start accepting client connections on this stream service. + * + * To stop accepting connections, pass a NULL callback function. + * + * @param cb callback function to call for accepted client streams + * @param data data to pass to callback function + * @param prio job priority to run callback with + * @param cncrncy maximum number of parallel callback invocations + */ + void (*on_accept)(stream_service_t *this, + stream_service_cb_t cb, void *data, + job_priority_t prio, u_int cncrncy); + + /** + * Destroy a stream_service_t. + */ + void (*destroy)(stream_service_t *this); +}; + +/** + * Create a service from a file descriptor. + * + * The file descriptor MUST be a socket. + * + * @param fd file descriptor to wrap into a stream_service_t + * @return stream_service instance + */ +stream_service_t *stream_service_create_from_fd(int fd); + +/** + * Create a service instance for UNIX sockets. + * + * @param uri UNIX socket specific URI, must start with "unix://" + * @param backlog size of the backlog queue, as passed to listen() + * @return stream_service instance, NULL on failure + */ +stream_service_t *stream_service_create_unix(char *uri, int backlog); + +/** + * Create a service instance for TCP sockets. + * + * @param uri TCP socket specific URI, must start with "tcp://" + * @param backlog size of the backlog queue, as passed to listen() + * @return stream_service instance, NULL on failure + */ +stream_service_t *stream_service_create_tcp(char *uri, int backlog); + +#endif /** STREAM_SERVICE_H_ @}*/ -- cgit v1.2.3