summaryrefslogtreecommitdiff
path: root/src/libcharon/plugins/lookip/lookip_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libcharon/plugins/lookip/lookip_socket.c')
-rw-r--r--src/libcharon/plugins/lookip/lookip_socket.c429
1 files changed, 189 insertions, 240 deletions
diff --git a/src/libcharon/plugins/lookip/lookip_socket.c b/src/libcharon/plugins/lookip/lookip_socket.c
index f2a469e92..d25573bf4 100644
--- a/src/libcharon/plugins/lookip/lookip_socket.c
+++ b/src/libcharon/plugins/lookip/lookip_socket.c
@@ -48,17 +48,12 @@ struct private_lookip_socket_t {
lookip_listener_t *listener;
/**
- * lookip unix socket file descriptor
+ * stream service accepting connections
*/
- int socket;
+ stream_service_t *service;
/**
- * List of registered listeners, as entry_t
- */
- linked_list_t *registered;
-
- /**
- * List of connected clients, as uintptr_t FD
+ * List of connected clients, as entry_t
*/
linked_list_t *connected;
@@ -69,88 +64,80 @@ struct private_lookip_socket_t {
};
/**
- * Open lookip unix socket
- */
-static bool open_socket(private_lookip_socket_t *this)
-{
- struct sockaddr_un addr;
- mode_t old;
-
- addr.sun_family = AF_UNIX;
- strcpy(addr.sun_path, LOOKIP_SOCKET);
-
- this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
- if (this->socket == -1)
- {
- DBG1(DBG_CFG, "creating lookip socket failed");
- return FALSE;
- }
- unlink(addr.sun_path);
- old = umask(~(S_IRWXU | S_IRWXG));
- if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
- {
- DBG1(DBG_CFG, "binding lookip socket failed: %s", strerror(errno));
- close(this->socket);
- return FALSE;
- }
- umask(old);
- if (chown(addr.sun_path, charon->caps->get_uid(charon->caps),
- charon->caps->get_gid(charon->caps)) != 0)
- {
- DBG1(DBG_CFG, "changing lookip socket permissions failed: %s",
- strerror(errno));
- }
- if (listen(this->socket, 10) < 0)
- {
- DBG1(DBG_CFG, "listening on lookip socket failed: %s", strerror(errno));
- close(this->socket);
- unlink(addr.sun_path);
- return FALSE;
- }
- return TRUE;
-}
-
-/**
- * Listener callback entry
+ * List entry for a connected stream
*/
typedef struct {
- /* FD to write to */
- int fd;
- /* message type to send */
- int type;
- /* back pointer to socket, only for subscriptions */
+ /* stream to write to */
+ stream_t *stream;
+ /* registered for up events? */
+ bool up;
+ /* registered for down events? */
+ bool down;
+ /** backref to this for unregistration */
private_lookip_socket_t *this;
} entry_t;
/**
- * Destroy entry
+ * Clean up a connection entry
*/
-static void entry_destroy(entry_t *this)
+static void entry_destroy(entry_t *entry)
{
- close(this->fd);
- free(this);
+ entry->stream->destroy(entry->stream);
+ free(entry);
+}
+
+/**
+ * Disconnect a stream, remove connection entry
+ */
+static void disconnect(private_lookip_socket_t *this, stream_t *stream)
+{
+ enumerator_t *enumerator;
+ entry_t *entry;
+
+ this->mutex->lock(this->mutex);
+ enumerator = this->connected->create_enumerator(this->connected);
+ while (enumerator->enumerate(enumerator, &entry))
+ {
+ if (entry->stream == stream)
+ {
+ this->connected->remove_at(this->connected, enumerator);
+ if (entry->up || entry->down)
+ {
+ this->listener->remove_listener(this->listener, entry);
+ }
+ entry_destroy(entry);
+ break;
+ }
+ }
+ enumerator->destroy(enumerator);
+ this->mutex->unlock(this->mutex);
}
/**
- * Callback function for listener
+ * Callback function for listener up/down events
*/
-static bool listener_cb(entry_t *entry, bool up, host_t *vip,
- host_t *other, identification_t *id,
- char *name, u_int unique_id)
+static bool event_cb(entry_t *entry, bool up, host_t *vip, host_t *other,
+ identification_t *id, char *name, u_int unique_id)
{
lookip_response_t resp = {
- .type = entry->type,
- .unique_id = unique_id,
+ .unique_id = htonl(unique_id),
};
- /* filter events */
- if (up && entry->type == LOOKIP_NOTIFY_DOWN)
+ if (up)
{
- return TRUE;
+ if (!entry->up)
+ {
+ return TRUE;
+ }
+ resp.type = htonl(LOOKIP_NOTIFY_UP);
}
- if (!up && entry->type == LOOKIP_NOTIFY_UP)
+ else
{
- return TRUE;
+ if (!entry->down)
+ {
+ return TRUE;
+ }
+ resp.type = htonl(LOOKIP_NOTIFY_DOWN);
}
snprintf(resp.vip, sizeof(resp.vip), "%H", vip);
@@ -158,37 +145,66 @@ static bool listener_cb(entry_t *entry, bool up, host_t *vip,
snprintf(resp.id, sizeof(resp.id), "%Y", id);
snprintf(resp.name, sizeof(resp.name), "%s", name);
- switch (send(entry->fd, &resp, sizeof(resp), 0))
+ if (entry->stream->write_all(entry->stream, &resp, sizeof(resp)))
{
- case sizeof(resp):
- return TRUE;
- case 0:
+ return TRUE;
+ }
+ switch (errno)
+ {
+ case ECONNRESET:
+ case EPIPE:
/* client disconnected, adios */
break;
default:
- DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno));
+ DBG1(DBG_CFG, "sending lookip event failed: %s", strerror(errno));
break;
}
- if (entry->this)
- { /* unregister listener */
- entry->this->mutex->lock(entry->this->mutex);
- entry->this->registered->remove(entry->this->registered, entry, NULL);
- entry->this->mutex->unlock(entry->this->mutex);
+ /* don't unregister, as we return FALSE */
+ entry->up = entry->down = FALSE;
+ disconnect(entry->this, entry->stream);
+ return FALSE;
+}
+
+/**
+ * Callback function for queries
+ */
+static bool query_cb(stream_t *stream, bool up, host_t *vip, host_t *other,
+ identification_t *id, char *name, u_int unique_id)
+{
+ lookip_response_t resp = {
+ .type = htonl(LOOKIP_ENTRY),
+ .unique_id = htonl(unique_id),
+ };
- entry_destroy(entry);
+ snprintf(resp.vip, sizeof(resp.vip), "%H", vip);
+ snprintf(resp.ip, sizeof(resp.ip), "%H", other);
+ snprintf(resp.id, sizeof(resp.id), "%Y", id);
+ snprintf(resp.name, sizeof(resp.name), "%s", name);
+
+ if (stream->write_all(stream, &resp, sizeof(resp)))
+ {
+ return TRUE;
+ }
+ switch (errno)
+ {
+ case ECONNRESET:
+ case EPIPE:
+ /* client disconnected, adios */
+ break;
+ default:
+ DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno));
+ break;
}
return FALSE;
}
/**
- * Perform a entry lookup
+ * Perform a lookup
*/
-static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
+static void query(private_lookip_socket_t *this, stream_t *stream,
+ lookip_request_t *req)
{
- entry_t entry = {
- .fd = fd,
- .type = LOOKIP_ENTRY,
- };
+
host_t *vip = NULL;
int matches = 0;
@@ -199,17 +215,17 @@ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
if (vip)
{
matches = this->listener->lookup(this->listener, vip,
- (void*)listener_cb, &entry);
+ (void*)query_cb, stream);
vip->destroy(vip);
}
if (matches == 0)
{
lookip_response_t resp = {
- .type = LOOKIP_NOT_FOUND,
+ .type = htonl(LOOKIP_NOT_FOUND),
};
snprintf(resp.vip, sizeof(resp.vip), "%s", req->vip);
- if (send(fd, &resp, sizeof(resp), 0) < 0)
+ if (!stream->write_all(stream, &resp, sizeof(resp)))
{
DBG1(DBG_CFG, "sending lookip not-found failed: %s",
strerror(errno));
@@ -219,214 +235,143 @@ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
else
{ /* dump */
this->listener->lookup(this->listener, NULL,
- (void*)listener_cb, &entry);
+ (void*)query_cb, stream);
}
}
/**
* Subscribe to virtual IP events
*/
-static void subscribe(private_lookip_socket_t *this, int fd, int type)
-{
- entry_t *entry;
-
- INIT(entry,
- .fd = fd,
- .type = type,
- .this = this,
- );
-
- this->mutex->lock(this->mutex);
- this->registered->insert_last(this->registered, entry);
- this->mutex->unlock(this->mutex);
-
- this->listener->add_listener(this->listener, (void*)listener_cb, entry);
-}
-
-/**
- * Check if a client is subscribed for notifications
- */
-static bool subscribed(private_lookip_socket_t *this, int fd)
+static void subscribe(private_lookip_socket_t *this, stream_t *stream, bool up)
{
enumerator_t *enumerator;
- bool subscribed = FALSE;
entry_t *entry;
this->mutex->lock(this->mutex);
- enumerator = this->registered->create_enumerator(this->registered);
+ enumerator = this->connected->create_enumerator(this->connected);
while (enumerator->enumerate(enumerator, &entry))
{
- if (entry->fd == fd)
+ if (entry->stream == stream)
{
- subscribed = TRUE;
- break;
+ if (!entry->up && !entry->down)
+ { /* newly registered */
+ this->listener->add_listener(this->listener,
+ (void*)event_cb, entry);
+ }
+ if (up)
+ {
+ entry->up = TRUE;
+ }
+ else
+ {
+ entry->down = TRUE;
+ }
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
-
- return subscribed;
}
/**
- * Create a fd_set from all bound sockets
- */
-static int build_fds(private_lookip_socket_t *this, fd_set *fds)
-{
- enumerator_t *enumerator;
- uintptr_t fd;
- int maxfd;
-
- FD_ZERO(fds);
- FD_SET(this->socket, fds);
- maxfd = this->socket;
-
- this->mutex->lock(this->mutex);
- enumerator = this->connected->create_enumerator(this->connected);
- while (enumerator->enumerate(enumerator, &fd))
- {
- FD_SET(fd, fds);
- maxfd = max(maxfd, fd);
- }
- enumerator->destroy(enumerator);
- this->mutex->unlock(this->mutex);
-
- return maxfd + 1;
-}
-
-/**
- * Find the socket select()ed
+ * Check if a client is subscribed for notifications
*/
-static int scan_fds(private_lookip_socket_t *this, fd_set *fds)
+static bool subscribed(private_lookip_socket_t *this, stream_t *stream)
{
enumerator_t *enumerator;
- uintptr_t fd;
- int selected = -1;
+ bool subscribed = FALSE;
+ entry_t *entry;
this->mutex->lock(this->mutex);
enumerator = this->connected->create_enumerator(this->connected);
- while (enumerator->enumerate(enumerator, &fd))
+ while (enumerator->enumerate(enumerator, &entry))
{
- if (FD_ISSET(fd, fds))
+ if (entry->stream == stream)
{
- selected = fd;
+ subscribed = entry->up || entry->down;
break;
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
- return selected;
+ return subscribed;
}
/**
- * Dispatch from a socket, return TRUE to end communication
+ * Dispatch from a socket, on-read callback
*/
-static bool dispatch(private_lookip_socket_t *this, int fd)
+static bool on_read(private_lookip_socket_t *this, stream_t *stream)
{
lookip_request_t req;
- int len;
- len = recv(fd, &req, sizeof(req), 0);
- if (len != sizeof(req))
+ if (stream->read_all(stream, &req, sizeof(req)))
{
- if (len != 0)
+ switch (ntohl(req.type))
+ {
+ case LOOKIP_LOOKUP:
+ query(this, stream, &req);
+ return TRUE;
+ case LOOKIP_DUMP:
+ query(this, stream, NULL);
+ return TRUE;
+ case LOOKIP_REGISTER_UP:
+ subscribe(this, stream, TRUE);
+ return TRUE;
+ case LOOKIP_REGISTER_DOWN:
+ subscribe(this, stream, FALSE);
+ return TRUE;
+ case LOOKIP_END:
+ break;
+ default:
+ DBG1(DBG_CFG, "received unknown lookip command");
+ break;
+ }
+ }
+ else
+ {
+ if (errno != ECONNRESET)
{
DBG1(DBG_CFG, "receiving lookip request failed: %s",
strerror(errno));
}
- return TRUE;
+ disconnect(this, stream);
+ return FALSE;
}
- switch (req.type)
+ if (subscribed(this, stream))
{
- case LOOKIP_LOOKUP:
- query(this, fd, &req);
- return FALSE;
- case LOOKIP_DUMP:
- query(this, fd, NULL);
- return FALSE;
- case LOOKIP_REGISTER_UP:
- subscribe(this, fd, LOOKIP_NOTIFY_UP);
- return FALSE;
- case LOOKIP_REGISTER_DOWN:
- subscribe(this, fd, LOOKIP_NOTIFY_DOWN);
- return FALSE;
- case LOOKIP_END:
- return TRUE;
- default:
- DBG1(DBG_CFG, "received unknown lookip command");
- return TRUE;
+ return TRUE;
}
+ disconnect(this, stream);
+ return FALSE;
}
/**
* Accept client connections, dispatch
*/
-static job_requeue_t receive(private_lookip_socket_t *this)
+static bool on_accept(private_lookip_socket_t *this, stream_t *stream)
{
- struct sockaddr_un addr;
- int fd, maxfd, len;
- bool oldstate;
- fd_set fds;
+ entry_t *entry;
- while (TRUE)
- {
- maxfd = build_fds(this, &fds);
- oldstate = thread_cancelability(TRUE);
- if (select(maxfd, &fds, NULL, NULL, NULL) <= 0)
- {
- thread_cancelability(oldstate);
- DBG1(DBG_CFG, "selecting lookip sockets failed: %s",
- strerror(errno));
- break;
- }
- thread_cancelability(oldstate);
+ INIT(entry,
+ .stream = stream,
+ .this = this,
+ );
- if (FD_ISSET(this->socket, &fds))
- { /* new connection, accept() */
- len = sizeof(addr);
- fd = accept(this->socket, (struct sockaddr*)&addr, &len);
- if (fd != -1)
- {
- this->mutex->lock(this->mutex);
- this->connected->insert_last(this->connected,
- (void*)(uintptr_t)fd);
- this->mutex->unlock(this->mutex);
- }
- else
- {
- DBG1(DBG_CFG, "accepting lookip connection failed: %s",
- strerror(errno));
- }
- continue;
- }
+ this->mutex->lock(this->mutex);
+ this->connected->insert_last(this->connected, entry);
+ this->mutex->unlock(this->mutex);
- fd = scan_fds(this, &fds);
- if (fd == -1)
- {
- continue;
- }
- if (dispatch(this, fd))
- {
- this->mutex->lock(this->mutex);
- this->connected->remove(this->connected, (void*)(uintptr_t)fd, NULL);
- this->mutex->unlock(this->mutex);
- if (!subscribed(this, fd))
- {
- close(fd);
- }
- }
- }
- return JOB_REQUEUE_FAIR;
+ stream->on_read(stream, (void*)on_read, this);
+
+ return TRUE;
}
METHOD(lookip_socket_t, destroy, void,
private_lookip_socket_t *this)
{
- this->registered->destroy_function(this->registered, (void*)entry_destroy);
- this->connected->destroy(this->connected);
+ DESTROY_IF(this->service);
+ this->connected->destroy_function(this->connected, (void*)entry_destroy);
this->mutex->destroy(this->mutex);
- close(this->socket);
free(this);
}
@@ -436,26 +381,30 @@ METHOD(lookip_socket_t, destroy, void,
lookip_socket_t *lookip_socket_create(lookip_listener_t *listener)
{
private_lookip_socket_t *this;
+ char *uri;
INIT(this,
.public = {
.destroy = _destroy,
},
.listener = listener,
- .registered = linked_list_create(),
.connected = linked_list_create(),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
);
- if (!open_socket(this))
+ uri = lib->settings->get_str(lib->settings,
+ "%s.plugins.lookip.socket", "unix://" LOOKIP_SOCKET,
+ charon->name);
+ this->service = lib->streams->create_service(lib->streams, uri, 10);
+ if (!this->service)
{
- free(this);
+ DBG1(DBG_CFG, "creating lookip socket failed");
+ destroy(this);
return NULL;
}
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
- NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+ this, JOB_PRIO_CRITICAL, 1);
return &this->public;
}