diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/charon/plugins/ha_sync/ha_sync_dispatcher.c | 31 | ||||
-rw-r--r-- | src/charon/plugins/ha_sync/ha_sync_dispatcher.h | 1 | ||||
-rw-r--r-- | src/charon/plugins/ha_sync/ha_sync_kernel.c | 31 | ||||
-rw-r--r-- | src/charon/plugins/ha_sync/ha_sync_kernel.h | 3 | ||||
-rw-r--r-- | src/charon/plugins/ha_sync/ha_sync_message.h | 2 | ||||
-rw-r--r-- | src/charon/plugins/ha_sync/ha_sync_plugin.c | 30 | ||||
-rw-r--r-- | src/charon/plugins/ha_sync/ha_sync_segments.c | 136 | ||||
-rw-r--r-- | src/charon/plugins/ha_sync/ha_sync_segments.h | 12 | ||||
-rw-r--r-- | src/charon/plugins/ha_sync/ha_sync_socket.c | 1 |
9 files changed, 177 insertions, 70 deletions
diff --git a/src/charon/plugins/ha_sync/ha_sync_dispatcher.c b/src/charon/plugins/ha_sync/ha_sync_dispatcher.c index 7a79fc907..f5d3e288f 100644 --- a/src/charon/plugins/ha_sync/ha_sync_dispatcher.c +++ b/src/charon/plugins/ha_sync/ha_sync_dispatcher.c @@ -607,6 +607,34 @@ static void process_segment(private_ha_sync_dispatcher_t *this, } /** + * Process messages of type STATUS + */ +static void process_status(private_ha_sync_dispatcher_t *this, + ha_sync_message_t *message) +{ + ha_sync_message_attribute_t attribute; + ha_sync_message_value_t value; + enumerator_t *enumerator; + segment_mask_t mask = 0; + + enumerator = message->create_attribute_enumerator(message); + while (enumerator->enumerate(enumerator, &attribute, &value)) + { + switch (attribute) + { + case HA_SYNC_SEGMENT: + mask |= SEGMENTS_BIT(value.u16); + break; + default: + break; + } + } + enumerator->destroy(enumerator); + + this->segments->handle_status(this->segments, mask); +} + +/** * Dispatcher job function */ static job_requeue_t dispatch(private_ha_sync_dispatcher_t *this) @@ -637,6 +665,9 @@ static job_requeue_t dispatch(private_ha_sync_dispatcher_t *this) case HA_SYNC_SEGMENT_TAKE: process_segment(this, message, TRUE); break; + case HA_SYNC_STATUS: + process_status(this, message); + break; default: DBG1(DBG_CFG, "received unknown HA sync message type %d", message->get_type(message)); diff --git a/src/charon/plugins/ha_sync/ha_sync_dispatcher.h b/src/charon/plugins/ha_sync/ha_sync_dispatcher.h index e9c92b8ca..a34b1f971 100644 --- a/src/charon/plugins/ha_sync/ha_sync_dispatcher.h +++ b/src/charon/plugins/ha_sync/ha_sync_dispatcher.h @@ -42,6 +42,7 @@ struct ha_sync_dispatcher_t { * * @param socket socket to pull messages from * @param segments segments to control based on received messages + * @param manager distributed management logic for segment control * @return dispatcher object */ ha_sync_dispatcher_t *ha_sync_dispatcher_create(ha_sync_socket_t *socket, diff --git a/src/charon/plugins/ha_sync/ha_sync_kernel.c b/src/charon/plugins/ha_sync/ha_sync_kernel.c index caba2b0be..199182022 100644 --- a/src/charon/plugins/ha_sync/ha_sync_kernel.c +++ b/src/charon/plugins/ha_sync/ha_sync_kernel.c @@ -46,7 +46,7 @@ struct private_ha_sync_kernel_t { /** * Total number of ClusterIP segments */ - u_int segment_count; + u_int count; /** * List of virtual addresses, as host_t* @@ -68,7 +68,7 @@ static bool in_segment(private_ha_sync_kernel_t *this, addr = *(u_int32_t*)host->get_address(host).ptr; hash = jhash_1word(ntohl(addr), this->initval); - if ((((u_int64_t)hash * this->segment_count) >> 32) + 1 == segment) + if ((((u_int64_t)hash * this->count) >> 32) + 1 == segment) { return TRUE; } @@ -128,8 +128,7 @@ static void deactivate(private_ha_sync_kernel_t *this, u_int segment) /** * Mangle IPtable rules for virtual addresses */ -static bool mangle_rules(private_ha_sync_kernel_t *this, bool add, - segment_mask_t active) +static bool mangle_rules(private_ha_sync_kernel_t *this, bool add) { enumerator_t *enumerator; host_t *host; @@ -148,13 +147,12 @@ static bool mangle_rules(private_ha_sync_kernel_t *this, bool add, host->destroy(host); continue; } - /* iptables insists of a local node specification. We add '1' but drop - * it afterwards. */ + /* iptables insists of a local node specification, enable node 1 */ snprintf(buf, sizeof(buf), "/sbin/iptables -%c INPUT -i %s -d %H -j CLUSTERIP --new " "--hashmode sourceip --clustermac 01:00:5e:00:00:%2x " "--total-nodes %d --local-node 1", - add ? 'A' : 'D', iface, host, mac++, this->segment_count); + add ? 'A' : 'D', iface, host, mac++, this->count); free(iface); if (system(buf) != 0) { @@ -165,13 +163,9 @@ static bool mangle_rules(private_ha_sync_kernel_t *this, bool add, if (add) { - deactivate(this, 1); - for (i = 0; i < SEGMENTS_MAX; i++) + for (i = 2; i <= this->count; i++) { - if (active & SEGMENTS_BIT(i)) - { - activate(this, i); - } + activate(this, i); } } return TRUE; @@ -207,7 +201,7 @@ static void parse_virtuals(private_ha_sync_kernel_t *this, char *virtual) */ static void destroy(private_ha_sync_kernel_t *this) { - mangle_rules(this, FALSE, 0); + mangle_rules(this, FALSE); this->virtuals->destroy_offset(this->virtuals, offsetof(host_t, destroy)); free(this); } @@ -215,10 +209,11 @@ static void destroy(private_ha_sync_kernel_t *this) /** * See header */ -ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active, - char *virtuals) +ha_sync_kernel_t *ha_sync_kernel_create(u_int count, char *virtuals) { private_ha_sync_kernel_t *this = malloc_thing(private_ha_sync_kernel_t); + segment_mask_t active; + int i; this->public.in_segment = (bool(*)(ha_sync_kernel_t*, host_t *host, u_int segment))in_segment; this->public.activate = (void(*)(ha_sync_kernel_t*, u_int segment))activate; @@ -226,12 +221,12 @@ ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active, this->public.destroy = (void(*)(ha_sync_kernel_t*))destroy; this->initval = 0; - this->segment_count = count; + this->count = count; this->virtuals = linked_list_create(); parse_virtuals(this, virtuals); - if (!mangle_rules(this, TRUE, active)) + if (!mangle_rules(this, TRUE)) { destroy(this); return NULL; diff --git a/src/charon/plugins/ha_sync/ha_sync_kernel.h b/src/charon/plugins/ha_sync/ha_sync_kernel.h index 87ee02aec..6803e58ea 100644 --- a/src/charon/plugins/ha_sync/ha_sync_kernel.h +++ b/src/charon/plugins/ha_sync/ha_sync_kernel.h @@ -66,7 +66,6 @@ struct ha_sync_kernel_t { * @param active bitmask of initially active segments * @param virtuals comma separated list of virtual cluster addresses */ -ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active, - char *virtuals); +ha_sync_kernel_t *ha_sync_kernel_create(u_int count, char *virtuals); #endif /* HA_SYNC_KERNEL_ @}*/ diff --git a/src/charon/plugins/ha_sync/ha_sync_message.h b/src/charon/plugins/ha_sync/ha_sync_message.h index 75f9b946e..20eb7eab2 100644 --- a/src/charon/plugins/ha_sync/ha_sync_message.h +++ b/src/charon/plugins/ha_sync/ha_sync_message.h @@ -55,6 +55,8 @@ enum ha_sync_message_type_t { HA_SYNC_SEGMENT_DROP, /** segments the sending node is taking over */ HA_SYNC_SEGMENT_TAKE, + /** status with the segments the sending node is currently serving */ + HA_SYNC_STATUS, }; /** diff --git a/src/charon/plugins/ha_sync/ha_sync_plugin.c b/src/charon/plugins/ha_sync/ha_sync_plugin.c index ff4341e57..5827b39af 100644 --- a/src/charon/plugins/ha_sync/ha_sync_plugin.c +++ b/src/charon/plugins/ha_sync/ha_sync_plugin.c @@ -97,29 +97,6 @@ static void destroy(private_ha_sync_plugin_t *this) free(this); } -/** - * Convert segment string to mask - */ -static segment_mask_t parse_active(char *active) -{ - enumerator_t *enumerator; - u_int segment; - segment_mask_t mask = 0; - - enumerator = enumerator_create_token(active, ",", " "); - while (enumerator->enumerate(enumerator, &active)) - { - segment = atoi(active); - if (segment > 0 && segment < SEGMENTS_MAX) - { - mask |= SEGMENTS_BIT(segment); - } - } - enumerator->destroy(enumerator); - - return mask; -} - /* * see header file */ @@ -127,7 +104,6 @@ plugin_t *plugin_create() { private_ha_sync_plugin_t *this; char *local, *remote, *secret, *virtuals; - segment_mask_t active; u_int count; bool fifo; @@ -143,8 +119,6 @@ plugin_t *plugin_create() "charon.plugins.ha_sync.fifo_interface", FALSE); count = min(SEGMENTS_MAX, lib->settings->get_int(lib->settings, "charon.plugins.ha_sync.segment_count", 1)); - active = parse_active(lib->settings->get_str(lib->settings, - "charon.plugins.ha_sync.active_segments", "1")); if (!local || !remote) { DBG1(DBG_CFG, "HA sync config misses local/remote address"); @@ -163,7 +137,7 @@ plugin_t *plugin_create() free(this); return NULL; } - this->kernel = ha_sync_kernel_create(count, active, virtuals); + this->kernel = ha_sync_kernel_create(count, virtuals); if (!this->kernel) { this->socket->destroy(this->socket); @@ -176,7 +150,7 @@ plugin_t *plugin_create() this->tunnel = ha_sync_tunnel_create(local, remote, secret); } this->segments = ha_sync_segments_create(this->socket, this->kernel, - this->tunnel, count, active); + this->tunnel, local, remote, count); if (fifo) { this->ctl = ha_sync_ctl_create(this->segments); diff --git a/src/charon/plugins/ha_sync/ha_sync_segments.c b/src/charon/plugins/ha_sync/ha_sync_segments.c index 4d458038c..002061396 100644 --- a/src/charon/plugins/ha_sync/ha_sync_segments.c +++ b/src/charon/plugins/ha_sync/ha_sync_segments.c @@ -17,6 +17,7 @@ #include <utils/mutex.h> #include <utils/linked_list.h> +#include <processing/jobs/callback_job.h> typedef struct private_ha_sync_segments_t private_ha_sync_segments_t; @@ -53,12 +54,17 @@ struct private_ha_sync_segments_t { /** * Total number of ClusterIP segments */ - u_int segment_count; + u_int count; /** * mask of active segments */ segment_mask_t active; + + /** + * Are we the master node handling segment assignement? + */ + bool master; }; /** @@ -71,9 +77,9 @@ static void log_segments(private_ha_sync_segments_t *this, bool activated, int i; bool first = TRUE; - for (i = 0; i < this->segment_count; i++) + for (i = 1; i <= this->count; i++) { - if (this->active & 0x01 << i) + if (this->active & SEGMENTS_BIT(i)) { if (first) { @@ -83,7 +89,7 @@ static void log_segments(private_ha_sync_segments_t *this, bool activated, { pos += snprintf(pos, buf + sizeof(buf) - pos, ","); } - pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i+1); + pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i); } } DBG1(DBG_CFG, "HA sync segment %d %sactivated, now active: %s", @@ -98,19 +104,20 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment, { ike_sa_t *ike_sa; enumerator_t *enumerator; - u_int i, limit; + u_int i, from, to; this->lock->write_lock(this->lock); - if (segment == 0 || segment <= this->segment_count) + if (segment == 0 || segment <= this->count) { if (segment) { /* loop once for single segment ... */ - limit = segment + 1; + from = to = segment; } else - { /* or segment_count times for all segments */ - limit = this->segment_count; + { /* or count times for all segments */ + from = 1; + to = this->count; } enumerator = charon->ike_sa_manager->create_enumerator(charon->ike_sa_manager); while (enumerator->enumerate(enumerator, &ike_sa)) @@ -123,7 +130,7 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment, { continue; } - for (i = segment; i < limit; i++) + for (i = from; i <= to; i++) { if (this->kernel->in_segment(this->kernel, ike_sa->get_other_host(ike_sa), i)) @@ -133,7 +140,7 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment, } } enumerator->destroy(enumerator); - for (i = segment; i < limit; i++) + for (i = from; i <= to; i++) { if (enable) { @@ -152,7 +159,6 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment, } } } - log_segments(this, enable, segment); } @@ -233,7 +239,7 @@ static void resync(private_ha_sync_segments_t *this, u_int segment) list = linked_list_create(); this->lock->read_lock(this->lock); - if (segment > 0 && segment <= this->segment_count && (this->active & mask)) + if (segment > 0 && segment <= this->count && (this->active & mask)) { this->active &= ~mask; @@ -290,7 +296,7 @@ static bool alert_hook(private_ha_sync_segments_t *this, ike_sa_t *ike_sa, { int i; - for (i = 0; i < SEGMENTS_MAX; i++) + for (i = 1; i <= this->count; i++) { if (this->active & SEGMENTS_BIT(i)) { @@ -302,6 +308,88 @@ static bool alert_hook(private_ha_sync_segments_t *this, ike_sa_t *ike_sa, } /** + * Implementation of ha_sync_segments_t.handle_status + */ +static void handle_status(private_ha_sync_segments_t *this, segment_mask_t mask) +{ + segment_mask_t missing, overlap; + int i, active = 0; + + this->lock->read_lock(this->lock); + missing = ~(this->active | mask); + overlap = this->active & mask; + for (i = 1; i <= this->count; i++) + { + if (this->active & SEGMENTS_BIT(i)) + { + active++; + } + } + this->lock->unlock(this->lock); + + /* Activate any missing segment. The master will disable overlapping + * segments if both nodes activate the missing segments simultaneously. */ + for (i = 1; i <= this->count; i++) + { + if (missing & SEGMENTS_BIT(i)) + { + DBG1(DBG_CFG, "HA segment %d was not handled", i); + activate(this, i, TRUE); + } + } + if (this->master && overlap) + { + /* Disable overlapping segment on one node, controlled by master */ + for (i = 1; i <= this->count; i++) + { + if (overlap & SEGMENTS_BIT(i)) + { + DBG1(DBG_CFG, "HA segment %d handled twice", i); + if (active > this->count) + { + deactivate(this, i, TRUE); + active--; + } + else + { + activate(this, i, TRUE); + active++; + } + } + } + } +} + +/** + * Send a status message with our active segments + */ +static job_requeue_t send_status(private_ha_sync_segments_t *this) +{ + ha_sync_message_t *message; + int i; + + message = ha_sync_message_create(HA_SYNC_STATUS); + + for (i = 1; i <= this->count; i++) + { + if (this->active & SEGMENTS_BIT(i)) + { + message->add_attribute(message, HA_SYNC_SEGMENT, i); + } + } + + this->socket->push(this->socket, message); + + /* schedule next invocation */ + charon->scheduler->schedule_job_ms(charon->scheduler, (job_t*) + callback_job_create((callback_job_cb_t) + send_status, this, NULL, NULL), + 1000); + + return JOB_REQUEUE_NONE; +} + +/** * Implementation of ha_sync_segments_t.destroy. */ static void destroy(private_ha_sync_segments_t *this) @@ -314,25 +402,35 @@ static void destroy(private_ha_sync_segments_t *this) * See header */ ha_sync_segments_t *ha_sync_segments_create(ha_sync_socket_t *socket, - ha_sync_kernel_t *kernel, - ha_sync_tunnel_t *tunnel, - u_int count, segment_mask_t active) + ha_sync_kernel_t *kernel, ha_sync_tunnel_t *tunnel, + char *local, char *remote, u_int count) { private_ha_sync_segments_t *this = malloc_thing(private_ha_sync_segments_t); + int i; memset(&this->public.listener, 0, sizeof(listener_t)); this->public.listener.alert = (bool(*)(listener_t*, ike_sa_t *, alert_t, va_list))alert_hook; this->public.activate = (void(*)(ha_sync_segments_t*, u_int segment,bool))activate; this->public.deactivate = (void(*)(ha_sync_segments_t*, u_int segment,bool))deactivate; this->public.resync = (void(*)(ha_sync_segments_t*, u_int segment))resync; + this->public.handle_status = (void(*)(ha_sync_segments_t*, segment_mask_t mask))handle_status; this->public.destroy = (void(*)(ha_sync_segments_t*))destroy; this->socket = socket; this->tunnel = tunnel; this->kernel = kernel; this->lock = rwlock_create(RWLOCK_TYPE_DEFAULT); - this->active = active; - this->segment_count = count; + this->count = count; + this->master = strcmp(local, remote) > 0; + + /* initially all segments are active */ + this->active = 0; + for (i = 1; i <= count; i++) + { + this->active |= SEGMENTS_BIT(i); + } + + send_status(this); return &this->public; } diff --git a/src/charon/plugins/ha_sync/ha_sync_segments.h b/src/charon/plugins/ha_sync/ha_sync_segments.h index cf119a8e0..5f795db1e 100644 --- a/src/charon/plugins/ha_sync/ha_sync_segments.h +++ b/src/charon/plugins/ha_sync/ha_sync_segments.h @@ -80,6 +80,13 @@ struct ha_sync_segments_t { void (*resync)(ha_sync_segments_t *this, u_int segment); /** + * Handle a status message from the remote node. + * + * @param mask segments the remote node is serving actively + */ + void (*handle_status)(ha_sync_segments_t *this, segment_mask_t mask); + + /** * Destroy a ha_sync_segments_t. */ void (*destroy)(ha_sync_segments_t *this); @@ -95,8 +102,7 @@ struct ha_sync_segments_t { * @return segment object */ ha_sync_segments_t *ha_sync_segments_create(ha_sync_socket_t *socket, - ha_sync_kernel_t *kernel, - ha_sync_tunnel_t *tunnel, - u_int count, segment_mask_t active); + ha_sync_kernel_t *kernel, ha_sync_tunnel_t *tunnel, + char *local, char *remote, u_int count); #endif /* HA_SYNC_SEGMENTS_ @}*/ diff --git a/src/charon/plugins/ha_sync/ha_sync_socket.c b/src/charon/plugins/ha_sync/ha_sync_socket.c index 41e818528..bc010b76d 100644 --- a/src/charon/plugins/ha_sync/ha_sync_socket.c +++ b/src/charon/plugins/ha_sync/ha_sync_socket.c @@ -152,6 +152,7 @@ static void push(private_ha_sync_socket_t *this, ha_sync_message_t *message) job = callback_job_create((callback_job_cb_t)send_message, data, (void*)job_data_destroy, NULL); charon->processor->queue_job(charon->processor, (job_t*)job); + sched_yield(); } /** |