diff options
Diffstat (limited to 'src/libcharon/plugins/lookip/lookip_socket.c')
-rw-r--r-- | src/libcharon/plugins/lookip/lookip_socket.c | 429 |
1 files changed, 189 insertions, 240 deletions
diff --git a/src/libcharon/plugins/lookip/lookip_socket.c b/src/libcharon/plugins/lookip/lookip_socket.c index f2a469e92..d25573bf4 100644 --- a/src/libcharon/plugins/lookip/lookip_socket.c +++ b/src/libcharon/plugins/lookip/lookip_socket.c @@ -48,17 +48,12 @@ struct private_lookip_socket_t { lookip_listener_t *listener; /** - * lookip unix socket file descriptor + * stream service accepting connections */ - int socket; + stream_service_t *service; /** - * List of registered listeners, as entry_t - */ - linked_list_t *registered; - - /** - * List of connected clients, as uintptr_t FD + * List of connected clients, as entry_t */ linked_list_t *connected; @@ -69,88 +64,80 @@ struct private_lookip_socket_t { }; /** - * Open lookip unix socket - */ -static bool open_socket(private_lookip_socket_t *this) -{ - struct sockaddr_un addr; - mode_t old; - - addr.sun_family = AF_UNIX; - strcpy(addr.sun_path, LOOKIP_SOCKET); - - this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0); - if (this->socket == -1) - { - DBG1(DBG_CFG, "creating lookip socket failed"); - return FALSE; - } - unlink(addr.sun_path); - old = umask(~(S_IRWXU | S_IRWXG)); - if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0) - { - DBG1(DBG_CFG, "binding lookip socket failed: %s", strerror(errno)); - close(this->socket); - return FALSE; - } - umask(old); - if (chown(addr.sun_path, charon->caps->get_uid(charon->caps), - charon->caps->get_gid(charon->caps)) != 0) - { - DBG1(DBG_CFG, "changing lookip socket permissions failed: %s", - strerror(errno)); - } - if (listen(this->socket, 10) < 0) - { - DBG1(DBG_CFG, "listening on lookip socket failed: %s", strerror(errno)); - close(this->socket); - unlink(addr.sun_path); - return FALSE; - } - return TRUE; -} - -/** - * Listener callback entry + * List entry for a connected stream */ typedef struct { - /* FD to write to */ - int fd; - /* message type to send */ - int type; - /* back pointer to socket, only for subscriptions */ + /* stream to write to */ + stream_t *stream; + /* registered for up events? */ + bool up; + /* registered for down events? */ + bool down; + /** backref to this for unregistration */ private_lookip_socket_t *this; } entry_t; /** - * Destroy entry + * Clean up a connection entry */ -static void entry_destroy(entry_t *this) +static void entry_destroy(entry_t *entry) { - close(this->fd); - free(this); + entry->stream->destroy(entry->stream); + free(entry); +} + +/** + * Disconnect a stream, remove connection entry + */ +static void disconnect(private_lookip_socket_t *this, stream_t *stream) +{ + enumerator_t *enumerator; + entry_t *entry; + + this->mutex->lock(this->mutex); + enumerator = this->connected->create_enumerator(this->connected); + while (enumerator->enumerate(enumerator, &entry)) + { + if (entry->stream == stream) + { + this->connected->remove_at(this->connected, enumerator); + if (entry->up || entry->down) + { + this->listener->remove_listener(this->listener, entry); + } + entry_destroy(entry); + break; + } + } + enumerator->destroy(enumerator); + this->mutex->unlock(this->mutex); } /** - * Callback function for listener + * Callback function for listener up/down events */ -static bool listener_cb(entry_t *entry, bool up, host_t *vip, - host_t *other, identification_t *id, - char *name, u_int unique_id) +static bool event_cb(entry_t *entry, bool up, host_t *vip, host_t *other, + identification_t *id, char *name, u_int unique_id) { lookip_response_t resp = { - .type = entry->type, - .unique_id = unique_id, + .unique_id = htonl(unique_id), }; - /* filter events */ - if (up && entry->type == LOOKIP_NOTIFY_DOWN) + if (up) { - return TRUE; + if (!entry->up) + { + return TRUE; + } + resp.type = htonl(LOOKIP_NOTIFY_UP); } - if (!up && entry->type == LOOKIP_NOTIFY_UP) + else { - return TRUE; + if (!entry->down) + { + return TRUE; + } + resp.type = htonl(LOOKIP_NOTIFY_DOWN); } snprintf(resp.vip, sizeof(resp.vip), "%H", vip); @@ -158,37 +145,66 @@ static bool listener_cb(entry_t *entry, bool up, host_t *vip, snprintf(resp.id, sizeof(resp.id), "%Y", id); snprintf(resp.name, sizeof(resp.name), "%s", name); - switch (send(entry->fd, &resp, sizeof(resp), 0)) + if (entry->stream->write_all(entry->stream, &resp, sizeof(resp))) { - case sizeof(resp): - return TRUE; - case 0: + return TRUE; + } + switch (errno) + { + case ECONNRESET: + case EPIPE: /* client disconnected, adios */ break; default: - DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno)); + DBG1(DBG_CFG, "sending lookip event failed: %s", strerror(errno)); break; } - if (entry->this) - { /* unregister listener */ - entry->this->mutex->lock(entry->this->mutex); - entry->this->registered->remove(entry->this->registered, entry, NULL); - entry->this->mutex->unlock(entry->this->mutex); + /* don't unregister, as we return FALSE */ + entry->up = entry->down = FALSE; + disconnect(entry->this, entry->stream); + return FALSE; +} + +/** + * Callback function for queries + */ +static bool query_cb(stream_t *stream, bool up, host_t *vip, host_t *other, + identification_t *id, char *name, u_int unique_id) +{ + lookip_response_t resp = { + .type = htonl(LOOKIP_ENTRY), + .unique_id = htonl(unique_id), + }; - entry_destroy(entry); + snprintf(resp.vip, sizeof(resp.vip), "%H", vip); + snprintf(resp.ip, sizeof(resp.ip), "%H", other); + snprintf(resp.id, sizeof(resp.id), "%Y", id); + snprintf(resp.name, sizeof(resp.name), "%s", name); + + if (stream->write_all(stream, &resp, sizeof(resp))) + { + return TRUE; + } + switch (errno) + { + case ECONNRESET: + case EPIPE: + /* client disconnected, adios */ + break; + default: + DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno)); + break; } return FALSE; } /** - * Perform a entry lookup + * Perform a lookup */ -static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req) +static void query(private_lookip_socket_t *this, stream_t *stream, + lookip_request_t *req) { - entry_t entry = { - .fd = fd, - .type = LOOKIP_ENTRY, - }; + host_t *vip = NULL; int matches = 0; @@ -199,17 +215,17 @@ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req) if (vip) { matches = this->listener->lookup(this->listener, vip, - (void*)listener_cb, &entry); + (void*)query_cb, stream); vip->destroy(vip); } if (matches == 0) { lookip_response_t resp = { - .type = LOOKIP_NOT_FOUND, + .type = htonl(LOOKIP_NOT_FOUND), }; snprintf(resp.vip, sizeof(resp.vip), "%s", req->vip); - if (send(fd, &resp, sizeof(resp), 0) < 0) + if (!stream->write_all(stream, &resp, sizeof(resp))) { DBG1(DBG_CFG, "sending lookip not-found failed: %s", strerror(errno)); @@ -219,214 +235,143 @@ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req) else { /* dump */ this->listener->lookup(this->listener, NULL, - (void*)listener_cb, &entry); + (void*)query_cb, stream); } } /** * Subscribe to virtual IP events */ -static void subscribe(private_lookip_socket_t *this, int fd, int type) -{ - entry_t *entry; - - INIT(entry, - .fd = fd, - .type = type, - .this = this, - ); - - this->mutex->lock(this->mutex); - this->registered->insert_last(this->registered, entry); - this->mutex->unlock(this->mutex); - - this->listener->add_listener(this->listener, (void*)listener_cb, entry); -} - -/** - * Check if a client is subscribed for notifications - */ -static bool subscribed(private_lookip_socket_t *this, int fd) +static void subscribe(private_lookip_socket_t *this, stream_t *stream, bool up) { enumerator_t *enumerator; - bool subscribed = FALSE; entry_t *entry; this->mutex->lock(this->mutex); - enumerator = this->registered->create_enumerator(this->registered); + enumerator = this->connected->create_enumerator(this->connected); while (enumerator->enumerate(enumerator, &entry)) { - if (entry->fd == fd) + if (entry->stream == stream) { - subscribed = TRUE; - break; + if (!entry->up && !entry->down) + { /* newly registered */ + this->listener->add_listener(this->listener, + (void*)event_cb, entry); + } + if (up) + { + entry->up = TRUE; + } + else + { + entry->down = TRUE; + } } } enumerator->destroy(enumerator); this->mutex->unlock(this->mutex); - - return subscribed; } /** - * Create a fd_set from all bound sockets - */ -static int build_fds(private_lookip_socket_t *this, fd_set *fds) -{ - enumerator_t *enumerator; - uintptr_t fd; - int maxfd; - - FD_ZERO(fds); - FD_SET(this->socket, fds); - maxfd = this->socket; - - this->mutex->lock(this->mutex); - enumerator = this->connected->create_enumerator(this->connected); - while (enumerator->enumerate(enumerator, &fd)) - { - FD_SET(fd, fds); - maxfd = max(maxfd, fd); - } - enumerator->destroy(enumerator); - this->mutex->unlock(this->mutex); - - return maxfd + 1; -} - -/** - * Find the socket select()ed + * Check if a client is subscribed for notifications */ -static int scan_fds(private_lookip_socket_t *this, fd_set *fds) +static bool subscribed(private_lookip_socket_t *this, stream_t *stream) { enumerator_t *enumerator; - uintptr_t fd; - int selected = -1; + bool subscribed = FALSE; + entry_t *entry; this->mutex->lock(this->mutex); enumerator = this->connected->create_enumerator(this->connected); - while (enumerator->enumerate(enumerator, &fd)) + while (enumerator->enumerate(enumerator, &entry)) { - if (FD_ISSET(fd, fds)) + if (entry->stream == stream) { - selected = fd; + subscribed = entry->up || entry->down; break; } } enumerator->destroy(enumerator); this->mutex->unlock(this->mutex); - return selected; + return subscribed; } /** - * Dispatch from a socket, return TRUE to end communication + * Dispatch from a socket, on-read callback */ -static bool dispatch(private_lookip_socket_t *this, int fd) +static bool on_read(private_lookip_socket_t *this, stream_t *stream) { lookip_request_t req; - int len; - len = recv(fd, &req, sizeof(req), 0); - if (len != sizeof(req)) + if (stream->read_all(stream, &req, sizeof(req))) { - if (len != 0) + switch (ntohl(req.type)) + { + case LOOKIP_LOOKUP: + query(this, stream, &req); + return TRUE; + case LOOKIP_DUMP: + query(this, stream, NULL); + return TRUE; + case LOOKIP_REGISTER_UP: + subscribe(this, stream, TRUE); + return TRUE; + case LOOKIP_REGISTER_DOWN: + subscribe(this, stream, FALSE); + return TRUE; + case LOOKIP_END: + break; + default: + DBG1(DBG_CFG, "received unknown lookip command"); + break; + } + } + else + { + if (errno != ECONNRESET) { DBG1(DBG_CFG, "receiving lookip request failed: %s", strerror(errno)); } - return TRUE; + disconnect(this, stream); + return FALSE; } - switch (req.type) + if (subscribed(this, stream)) { - case LOOKIP_LOOKUP: - query(this, fd, &req); - return FALSE; - case LOOKIP_DUMP: - query(this, fd, NULL); - return FALSE; - case LOOKIP_REGISTER_UP: - subscribe(this, fd, LOOKIP_NOTIFY_UP); - return FALSE; - case LOOKIP_REGISTER_DOWN: - subscribe(this, fd, LOOKIP_NOTIFY_DOWN); - return FALSE; - case LOOKIP_END: - return TRUE; - default: - DBG1(DBG_CFG, "received unknown lookip command"); - return TRUE; + return TRUE; } + disconnect(this, stream); + return FALSE; } /** * Accept client connections, dispatch */ -static job_requeue_t receive(private_lookip_socket_t *this) +static bool on_accept(private_lookip_socket_t *this, stream_t *stream) { - struct sockaddr_un addr; - int fd, maxfd, len; - bool oldstate; - fd_set fds; + entry_t *entry; - while (TRUE) - { - maxfd = build_fds(this, &fds); - oldstate = thread_cancelability(TRUE); - if (select(maxfd, &fds, NULL, NULL, NULL) <= 0) - { - thread_cancelability(oldstate); - DBG1(DBG_CFG, "selecting lookip sockets failed: %s", - strerror(errno)); - break; - } - thread_cancelability(oldstate); + INIT(entry, + .stream = stream, + .this = this, + ); - if (FD_ISSET(this->socket, &fds)) - { /* new connection, accept() */ - len = sizeof(addr); - fd = accept(this->socket, (struct sockaddr*)&addr, &len); - if (fd != -1) - { - this->mutex->lock(this->mutex); - this->connected->insert_last(this->connected, - (void*)(uintptr_t)fd); - this->mutex->unlock(this->mutex); - } - else - { - DBG1(DBG_CFG, "accepting lookip connection failed: %s", - strerror(errno)); - } - continue; - } + this->mutex->lock(this->mutex); + this->connected->insert_last(this->connected, entry); + this->mutex->unlock(this->mutex); - fd = scan_fds(this, &fds); - if (fd == -1) - { - continue; - } - if (dispatch(this, fd)) - { - this->mutex->lock(this->mutex); - this->connected->remove(this->connected, (void*)(uintptr_t)fd, NULL); - this->mutex->unlock(this->mutex); - if (!subscribed(this, fd)) - { - close(fd); - } - } - } - return JOB_REQUEUE_FAIR; + stream->on_read(stream, (void*)on_read, this); + + return TRUE; } METHOD(lookip_socket_t, destroy, void, private_lookip_socket_t *this) { - this->registered->destroy_function(this->registered, (void*)entry_destroy); - this->connected->destroy(this->connected); + DESTROY_IF(this->service); + this->connected->destroy_function(this->connected, (void*)entry_destroy); this->mutex->destroy(this->mutex); - close(this->socket); free(this); } @@ -436,26 +381,30 @@ METHOD(lookip_socket_t, destroy, void, lookip_socket_t *lookip_socket_create(lookip_listener_t *listener) { private_lookip_socket_t *this; + char *uri; INIT(this, .public = { .destroy = _destroy, }, .listener = listener, - .registered = linked_list_create(), .connected = linked_list_create(), .mutex = mutex_create(MUTEX_TYPE_DEFAULT), ); - if (!open_socket(this)) + uri = lib->settings->get_str(lib->settings, + "%s.plugins.lookip.socket", "unix://" LOOKIP_SOCKET, + charon->name); + this->service = lib->streams->create_service(lib->streams, uri, 10); + if (!this->service) { - free(this); + DBG1(DBG_CFG, "creating lookip socket failed"); + destroy(this); return NULL; } - lib->processor->queue_job(lib->processor, - (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this, - NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); + this->service->on_accept(this->service, (stream_service_cb_t)on_accept, + this, JOB_PRIO_CRITICAL, 1); return &this->public; } |