diff options
author | Martin Willi <martin@revosec.ch> | 2013-06-24 14:58:01 +0200 |
---|---|---|
committer | Martin Willi <martin@revosec.ch> | 2013-07-18 16:00:27 +0200 |
commit | 32b2a5e04b075655564f72a902ee67a69c18ef2a (patch) | |
tree | 60f4e87ecad202c66be10e4545f20c7a76bdf2d2 /src/libstrongswan/processing | |
parent | e5b5a66712a81f3cbe5f84c0f8980a5f6daa4129 (diff) | |
download | strongswan-32b2a5e04b075655564f72a902ee67a69c18ef2a.tar.bz2 strongswan-32b2a5e04b075655564f72a902ee67a69c18ef2a.tar.xz |
watcher: add a centralized an generic facility to monitor file descriptors
Diffstat (limited to 'src/libstrongswan/processing')
-rw-r--r-- | src/libstrongswan/processing/watcher.c | 396 | ||||
-rw-r--r-- | src/libstrongswan/processing/watcher.h | 97 |
2 files changed, 493 insertions, 0 deletions
diff --git a/src/libstrongswan/processing/watcher.c b/src/libstrongswan/processing/watcher.c new file mode 100644 index 000000000..7ccac72bc --- /dev/null +++ b/src/libstrongswan/processing/watcher.c @@ -0,0 +1,396 @@ +/* + * Copyright (C) 2013 Martin Willi + * Copyright (C) 2013 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "watcher.h" + +#include <library.h> +#include <threading/thread.h> +#include <threading/mutex.h> +#include <threading/condvar.h> +#include <collections/linked_list.h> +#include <processing/jobs/callback_job.h> + +#include <unistd.h> +#include <errno.h> +#include <sys/select.h> + +typedef struct private_watcher_t private_watcher_t; + +/** + * Private data of an watcher_t object. + */ +struct private_watcher_t { + + /** + * Public watcher_t interface. + */ + watcher_t public; + + /** + * List of registered FDs, as entry_t + */ + linked_list_t *fds; + + /** + * Lock to access FD list + */ + mutex_t *mutex; + + /** + * Condvar to signal completion of callback + */ + condvar_t *condvar; + + /** + * Notification pipe to signal watcher thread + */ + int notify[2]; +}; + +/** + * Entry for a registered file descriptor + */ +typedef struct { + /** file descriptor */ + int fd; + /** events to watch */ + watcher_event_t events; + /** registered callback function */ + watcher_cb_t cb; + /** user data to pass to callback */ + void *data; + /** callback currently active? */ + bool active; +} entry_t; + +/** + * Data we pass on for an async notification + */ +typedef struct { + /** file descriptor */ + int fd; + /** event type */ + watcher_event_t event; + /** registered callback function */ + watcher_cb_t cb; + /** user data to pass to callback */ + void *data; + /** keep registered? */ + bool keep; + /** reference to watcher */ + private_watcher_t *this; +} notify_data_t; + +/** + * Notify watcher thread about changes + */ +static void update(private_watcher_t *this) +{ + char buf[1] = { 'u' }; + + if (this->notify[1] != -1) + { + ignore_result(write(this->notify[1], buf, sizeof(buf))); + } +} + + /** + * Execute callback of registered FD, asynchronous + */ +static job_requeue_t notify_async(notify_data_t *data) +{ + data->keep = data->cb(data->data, data->fd, data->event); + return JOB_REQUEUE_NONE; +} + +/** + * Clean up notification data, reactivate FD + */ +static void notify_end(notify_data_t *data) +{ + private_watcher_t *this = data->this; + enumerator_t *enumerator; + entry_t *entry; + + /* reactivate the disabled entry */ + this->mutex->lock(this->mutex); + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + if (entry->fd == data->fd) + { + if (!data->keep) + { + entry->events &= ~data->event; + if (!entry->events) + { + this->fds->remove_at(this->fds, enumerator); + free(entry); + break; + } + } + entry->active = TRUE; + break; + } + } + enumerator->destroy(enumerator); + + update(this); + this->condvar->broadcast(this->condvar); + this->mutex->unlock(this->mutex); + + free(data); +} + +/** + * Execute the callback for a registered FD + */ +static bool notify(private_watcher_t *this, entry_t *entry, + watcher_event_t event) +{ + notify_data_t *data; + + /* get a copy of entry for async job, but with specific event */ + INIT(data, + .fd = entry->fd, + .event = event, + .cb = entry->cb, + .data = entry->data, + .keep = TRUE, + .this = this, + ); + + /* deactivate entry, so we can select() other FDs even if the async + * processing did not handle the event yet */ + entry->active = FALSE; + + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((void*)notify_async, data, + (void*)notify_end, (callback_job_cancel_t)return_false, + JOB_PRIO_CRITICAL)); + return TRUE; +} + +/** + * Dispatching function + */ +static job_requeue_t watch(private_watcher_t *this) +{ + enumerator_t *enumerator; + entry_t *entry; + fd_set rd, wr, ex; + int maxfd = 0, res; + + FD_ZERO(&rd); + FD_ZERO(&wr); + FD_ZERO(&ex); + + this->mutex->lock(this->mutex); + if (this->fds->get_count(this->fds) == 0) + { + this->mutex->unlock(this->mutex); + return JOB_REQUEUE_NONE; + } + + if (this->notify[0] != -1) + { + FD_SET(this->notify[0], &rd); + maxfd = this->notify[0]; + } + + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + if (entry->active) + { + if (entry->events & WATCHER_READ) + { + FD_SET(entry->fd, &rd); + } + if (entry->events & WATCHER_WRITE) + { + FD_SET(entry->fd, &wr); + } + if (entry->events & WATCHER_EXCEPT) + { + FD_SET(entry->fd, &ex); + } + maxfd = max(maxfd, entry->fd); + } + } + enumerator->destroy(enumerator); + this->mutex->unlock(this->mutex); + + while (TRUE) + { + char buf[1]; + bool old, notified = FALSE; + + old = thread_cancelability(TRUE); + res = select(maxfd + 1, &rd, &wr, &ex, NULL); + thread_cancelability(old); + if (res > 0) + { + if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd)) + { + ignore_result(read(this->notify[0], buf, sizeof(buf))); + return JOB_REQUEUE_DIRECT; + } + + this->mutex->lock(this->mutex); + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + if (FD_ISSET(entry->fd, &rd)) + { + notified = notify(this, entry, WATCHER_READ); + break; + } + if (FD_ISSET(entry->fd, &wr)) + { + notified = notify(this, entry, WATCHER_WRITE); + break; + } + if (FD_ISSET(entry->fd, &ex)) + { + notified = notify(this, entry, WATCHER_EXCEPT); + break; + } + } + enumerator->destroy(enumerator); + this->mutex->unlock(this->mutex); + + if (notified) + { + /* we temporarily disable a notified FD, rebuild FDSET */ + return JOB_REQUEUE_DIRECT; + } + } + } +} + +METHOD(watcher_t, add, void, + private_watcher_t *this, int fd, watcher_event_t events, + watcher_cb_t cb, void *data) +{ + entry_t *entry; + + INIT(entry, + .fd = fd, + .events = events, + .cb = cb, + .data = data, + .active = TRUE, + ); + + this->mutex->lock(this->mutex); + this->fds->insert_last(this->fds, entry); + if (this->fds->get_count(this->fds) == 1) + { + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((void*)watch, this, + NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); + } + else + { + update(this); + } + this->mutex->unlock(this->mutex); +} + +METHOD(watcher_t, remove_, void, + private_watcher_t *this, int fd) +{ + enumerator_t *enumerator; + entry_t *entry; + + this->mutex->lock(this->mutex); + while (TRUE) + { + bool is_in_callback = FALSE; + + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + if (entry->fd == fd) + { + if (entry->active) + { + this->fds->remove_at(this->fds, enumerator); + free(entry); + } + else + { + is_in_callback = TRUE; + break; + } + } + } + enumerator->destroy(enumerator); + if (!is_in_callback) + { + break; + } + this->condvar->wait(this->condvar, this->mutex); + } + + update(this); + this->mutex->unlock(this->mutex); +} + +METHOD(watcher_t, destroy, void, + private_watcher_t *this) +{ + this->mutex->destroy(this->mutex); + this->condvar->destroy(this->condvar); + this->fds->destroy(this->fds); + if (this->notify[0] != -1) + { + close(this->notify[0]); + } + if (this->notify[1] != -1) + { + close(this->notify[1]); + } + free(this); +} + +/** + * See header + */ +watcher_t *watcher_create() +{ + private_watcher_t *this; + + INIT(this, + .public = { + .add = _add, + .remove = _remove_, + .destroy = _destroy, + }, + .fds = linked_list_create(), + .mutex = mutex_create(MUTEX_TYPE_DEFAULT), + .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), + .notify[0] = -1, + .notify[1] = -1, + ); + + if (pipe(this->notify) != 0) + { + DBG1(DBG_LIB, "creating watcher notify pipe failed: %s", + strerror(errno)); + } + return &this->public; +} diff --git a/src/libstrongswan/processing/watcher.h b/src/libstrongswan/processing/watcher.h new file mode 100644 index 000000000..db7dd4fa8 --- /dev/null +++ b/src/libstrongswan/processing/watcher.h @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2013 Martin Willi + * Copyright (C) 2013 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +/** + * @defgroup watcher watcher + * @{ @ingroup processor + */ + +#ifndef WATCHER_H_ +#define WATCHER_H_ + +typedef struct watcher_t watcher_t; +typedef enum watcher_event_t watcher_event_t; + +#include <library.h> + +/** + * Callback function to register for file descriptor events. + * + * The callback is executed asynchronously using a thread from the pool. + * Monitoring of fd is temporarily suspended to avoid additional events while + * it is processed asynchronously. To allow concurrent events, one can quickly + * process it (using a read/write) and return from the callback. This will + * re-enable the event, while the data read can be processed in another + * asynchronous job. + * + * On Linux, even if select() marks an FD as "ready", a subsequent read/write + * can block. It is therefore highly recommended to use non-blocking I/O + * and handle EAGAIN/EWOULDBLOCK gracefully. + * + * @param data user data passed during registration + * @param fd file descriptor the event occured on + * @param event type of event + * @return TRUE to keep watching event, FALSE to unregister fd for event + */ +typedef bool (*watcher_cb_t)(void *data, int fd, watcher_event_t event); + +/** + * What events to watch for a file descriptor. + */ +enum watcher_event_t { + WATCHER_READ = (1<<0), + WATCHER_WRITE = (1<<1), + WATCHER_EXCEPT = (1<<2), +}; + +/** + * Watch multiple file descriptors using select(). + */ +struct watcher_t { + + /** + * Start watching a new file descriptor. + * + * @param fd file descriptor to start watching + * @param events ORed set of events to watch + * @param cb callback function to invoke on events + * @param data data to pass to cb() + */ + void (*add)(watcher_t *this, int fd, watcher_event_t events, + watcher_cb_t cb, void *data); + + /** + * Stop watching a previously registered file descriptor. + * + * This call blocks until any active callback for this FD returns. + * + * @param fd file descriptor to stop watching + */ + void (*remove)(watcher_t *this, int fd); + + /** + * Destroy a watcher_t. + */ + void (*destroy)(watcher_t *this); +}; + +/** + * Create a watcher instance. + * + * @return watcher + */ +watcher_t *watcher_create(); + +#endif /** WATCHER_H_ @}*/ |