aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/charon/plugins/ha_sync/ha_sync_dispatcher.c31
-rw-r--r--src/charon/plugins/ha_sync/ha_sync_dispatcher.h1
-rw-r--r--src/charon/plugins/ha_sync/ha_sync_kernel.c31
-rw-r--r--src/charon/plugins/ha_sync/ha_sync_kernel.h3
-rw-r--r--src/charon/plugins/ha_sync/ha_sync_message.h2
-rw-r--r--src/charon/plugins/ha_sync/ha_sync_plugin.c30
-rw-r--r--src/charon/plugins/ha_sync/ha_sync_segments.c136
-rw-r--r--src/charon/plugins/ha_sync/ha_sync_segments.h12
-rw-r--r--src/charon/plugins/ha_sync/ha_sync_socket.c1
9 files changed, 177 insertions, 70 deletions
diff --git a/src/charon/plugins/ha_sync/ha_sync_dispatcher.c b/src/charon/plugins/ha_sync/ha_sync_dispatcher.c
index 7a79fc907..f5d3e288f 100644
--- a/src/charon/plugins/ha_sync/ha_sync_dispatcher.c
+++ b/src/charon/plugins/ha_sync/ha_sync_dispatcher.c
@@ -607,6 +607,34 @@ static void process_segment(private_ha_sync_dispatcher_t *this,
}
/**
+ * Process messages of type STATUS
+ */
+static void process_status(private_ha_sync_dispatcher_t *this,
+ ha_sync_message_t *message)
+{
+ ha_sync_message_attribute_t attribute;
+ ha_sync_message_value_t value;
+ enumerator_t *enumerator;
+ segment_mask_t mask = 0;
+
+ enumerator = message->create_attribute_enumerator(message);
+ while (enumerator->enumerate(enumerator, &attribute, &value))
+ {
+ switch (attribute)
+ {
+ case HA_SYNC_SEGMENT:
+ mask |= SEGMENTS_BIT(value.u16);
+ break;
+ default:
+ break;
+ }
+ }
+ enumerator->destroy(enumerator);
+
+ this->segments->handle_status(this->segments, mask);
+}
+
+/**
* Dispatcher job function
*/
static job_requeue_t dispatch(private_ha_sync_dispatcher_t *this)
@@ -637,6 +665,9 @@ static job_requeue_t dispatch(private_ha_sync_dispatcher_t *this)
case HA_SYNC_SEGMENT_TAKE:
process_segment(this, message, TRUE);
break;
+ case HA_SYNC_STATUS:
+ process_status(this, message);
+ break;
default:
DBG1(DBG_CFG, "received unknown HA sync message type %d",
message->get_type(message));
diff --git a/src/charon/plugins/ha_sync/ha_sync_dispatcher.h b/src/charon/plugins/ha_sync/ha_sync_dispatcher.h
index e9c92b8ca..a34b1f971 100644
--- a/src/charon/plugins/ha_sync/ha_sync_dispatcher.h
+++ b/src/charon/plugins/ha_sync/ha_sync_dispatcher.h
@@ -42,6 +42,7 @@ struct ha_sync_dispatcher_t {
*
* @param socket socket to pull messages from
* @param segments segments to control based on received messages
+ * @param manager distributed management logic for segment control
* @return dispatcher object
*/
ha_sync_dispatcher_t *ha_sync_dispatcher_create(ha_sync_socket_t *socket,
diff --git a/src/charon/plugins/ha_sync/ha_sync_kernel.c b/src/charon/plugins/ha_sync/ha_sync_kernel.c
index caba2b0be..199182022 100644
--- a/src/charon/plugins/ha_sync/ha_sync_kernel.c
+++ b/src/charon/plugins/ha_sync/ha_sync_kernel.c
@@ -46,7 +46,7 @@ struct private_ha_sync_kernel_t {
/**
* Total number of ClusterIP segments
*/
- u_int segment_count;
+ u_int count;
/**
* List of virtual addresses, as host_t*
@@ -68,7 +68,7 @@ static bool in_segment(private_ha_sync_kernel_t *this,
addr = *(u_int32_t*)host->get_address(host).ptr;
hash = jhash_1word(ntohl(addr), this->initval);
- if ((((u_int64_t)hash * this->segment_count) >> 32) + 1 == segment)
+ if ((((u_int64_t)hash * this->count) >> 32) + 1 == segment)
{
return TRUE;
}
@@ -128,8 +128,7 @@ static void deactivate(private_ha_sync_kernel_t *this, u_int segment)
/**
* Mangle IPtable rules for virtual addresses
*/
-static bool mangle_rules(private_ha_sync_kernel_t *this, bool add,
- segment_mask_t active)
+static bool mangle_rules(private_ha_sync_kernel_t *this, bool add)
{
enumerator_t *enumerator;
host_t *host;
@@ -148,13 +147,12 @@ static bool mangle_rules(private_ha_sync_kernel_t *this, bool add,
host->destroy(host);
continue;
}
- /* iptables insists of a local node specification. We add '1' but drop
- * it afterwards. */
+ /* iptables insists of a local node specification, enable node 1 */
snprintf(buf, sizeof(buf),
"/sbin/iptables -%c INPUT -i %s -d %H -j CLUSTERIP --new "
"--hashmode sourceip --clustermac 01:00:5e:00:00:%2x "
"--total-nodes %d --local-node 1",
- add ? 'A' : 'D', iface, host, mac++, this->segment_count);
+ add ? 'A' : 'D', iface, host, mac++, this->count);
free(iface);
if (system(buf) != 0)
{
@@ -165,13 +163,9 @@ static bool mangle_rules(private_ha_sync_kernel_t *this, bool add,
if (add)
{
- deactivate(this, 1);
- for (i = 0; i < SEGMENTS_MAX; i++)
+ for (i = 2; i <= this->count; i++)
{
- if (active & SEGMENTS_BIT(i))
- {
- activate(this, i);
- }
+ activate(this, i);
}
}
return TRUE;
@@ -207,7 +201,7 @@ static void parse_virtuals(private_ha_sync_kernel_t *this, char *virtual)
*/
static void destroy(private_ha_sync_kernel_t *this)
{
- mangle_rules(this, FALSE, 0);
+ mangle_rules(this, FALSE);
this->virtuals->destroy_offset(this->virtuals, offsetof(host_t, destroy));
free(this);
}
@@ -215,10 +209,11 @@ static void destroy(private_ha_sync_kernel_t *this)
/**
* See header
*/
-ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active,
- char *virtuals)
+ha_sync_kernel_t *ha_sync_kernel_create(u_int count, char *virtuals)
{
private_ha_sync_kernel_t *this = malloc_thing(private_ha_sync_kernel_t);
+ segment_mask_t active;
+ int i;
this->public.in_segment = (bool(*)(ha_sync_kernel_t*, host_t *host, u_int segment))in_segment;
this->public.activate = (void(*)(ha_sync_kernel_t*, u_int segment))activate;
@@ -226,12 +221,12 @@ ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active,
this->public.destroy = (void(*)(ha_sync_kernel_t*))destroy;
this->initval = 0;
- this->segment_count = count;
+ this->count = count;
this->virtuals = linked_list_create();
parse_virtuals(this, virtuals);
- if (!mangle_rules(this, TRUE, active))
+ if (!mangle_rules(this, TRUE))
{
destroy(this);
return NULL;
diff --git a/src/charon/plugins/ha_sync/ha_sync_kernel.h b/src/charon/plugins/ha_sync/ha_sync_kernel.h
index 87ee02aec..6803e58ea 100644
--- a/src/charon/plugins/ha_sync/ha_sync_kernel.h
+++ b/src/charon/plugins/ha_sync/ha_sync_kernel.h
@@ -66,7 +66,6 @@ struct ha_sync_kernel_t {
* @param active bitmask of initially active segments
* @param virtuals comma separated list of virtual cluster addresses
*/
-ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active,
- char *virtuals);
+ha_sync_kernel_t *ha_sync_kernel_create(u_int count, char *virtuals);
#endif /* HA_SYNC_KERNEL_ @}*/
diff --git a/src/charon/plugins/ha_sync/ha_sync_message.h b/src/charon/plugins/ha_sync/ha_sync_message.h
index 75f9b946e..20eb7eab2 100644
--- a/src/charon/plugins/ha_sync/ha_sync_message.h
+++ b/src/charon/plugins/ha_sync/ha_sync_message.h
@@ -55,6 +55,8 @@ enum ha_sync_message_type_t {
HA_SYNC_SEGMENT_DROP,
/** segments the sending node is taking over */
HA_SYNC_SEGMENT_TAKE,
+ /** status with the segments the sending node is currently serving */
+ HA_SYNC_STATUS,
};
/**
diff --git a/src/charon/plugins/ha_sync/ha_sync_plugin.c b/src/charon/plugins/ha_sync/ha_sync_plugin.c
index ff4341e57..5827b39af 100644
--- a/src/charon/plugins/ha_sync/ha_sync_plugin.c
+++ b/src/charon/plugins/ha_sync/ha_sync_plugin.c
@@ -97,29 +97,6 @@ static void destroy(private_ha_sync_plugin_t *this)
free(this);
}
-/**
- * Convert segment string to mask
- */
-static segment_mask_t parse_active(char *active)
-{
- enumerator_t *enumerator;
- u_int segment;
- segment_mask_t mask = 0;
-
- enumerator = enumerator_create_token(active, ",", " ");
- while (enumerator->enumerate(enumerator, &active))
- {
- segment = atoi(active);
- if (segment > 0 && segment < SEGMENTS_MAX)
- {
- mask |= SEGMENTS_BIT(segment);
- }
- }
- enumerator->destroy(enumerator);
-
- return mask;
-}
-
/*
* see header file
*/
@@ -127,7 +104,6 @@ plugin_t *plugin_create()
{
private_ha_sync_plugin_t *this;
char *local, *remote, *secret, *virtuals;
- segment_mask_t active;
u_int count;
bool fifo;
@@ -143,8 +119,6 @@ plugin_t *plugin_create()
"charon.plugins.ha_sync.fifo_interface", FALSE);
count = min(SEGMENTS_MAX, lib->settings->get_int(lib->settings,
"charon.plugins.ha_sync.segment_count", 1));
- active = parse_active(lib->settings->get_str(lib->settings,
- "charon.plugins.ha_sync.active_segments", "1"));
if (!local || !remote)
{
DBG1(DBG_CFG, "HA sync config misses local/remote address");
@@ -163,7 +137,7 @@ plugin_t *plugin_create()
free(this);
return NULL;
}
- this->kernel = ha_sync_kernel_create(count, active, virtuals);
+ this->kernel = ha_sync_kernel_create(count, virtuals);
if (!this->kernel)
{
this->socket->destroy(this->socket);
@@ -176,7 +150,7 @@ plugin_t *plugin_create()
this->tunnel = ha_sync_tunnel_create(local, remote, secret);
}
this->segments = ha_sync_segments_create(this->socket, this->kernel,
- this->tunnel, count, active);
+ this->tunnel, local, remote, count);
if (fifo)
{
this->ctl = ha_sync_ctl_create(this->segments);
diff --git a/src/charon/plugins/ha_sync/ha_sync_segments.c b/src/charon/plugins/ha_sync/ha_sync_segments.c
index 4d458038c..002061396 100644
--- a/src/charon/plugins/ha_sync/ha_sync_segments.c
+++ b/src/charon/plugins/ha_sync/ha_sync_segments.c
@@ -17,6 +17,7 @@
#include <utils/mutex.h>
#include <utils/linked_list.h>
+#include <processing/jobs/callback_job.h>
typedef struct private_ha_sync_segments_t private_ha_sync_segments_t;
@@ -53,12 +54,17 @@ struct private_ha_sync_segments_t {
/**
* Total number of ClusterIP segments
*/
- u_int segment_count;
+ u_int count;
/**
* mask of active segments
*/
segment_mask_t active;
+
+ /**
+ * Are we the master node handling segment assignement?
+ */
+ bool master;
};
/**
@@ -71,9 +77,9 @@ static void log_segments(private_ha_sync_segments_t *this, bool activated,
int i;
bool first = TRUE;
- for (i = 0; i < this->segment_count; i++)
+ for (i = 1; i <= this->count; i++)
{
- if (this->active & 0x01 << i)
+ if (this->active & SEGMENTS_BIT(i))
{
if (first)
{
@@ -83,7 +89,7 @@ static void log_segments(private_ha_sync_segments_t *this, bool activated,
{
pos += snprintf(pos, buf + sizeof(buf) - pos, ",");
}
- pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i+1);
+ pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i);
}
}
DBG1(DBG_CFG, "HA sync segment %d %sactivated, now active: %s",
@@ -98,19 +104,20 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
{
ike_sa_t *ike_sa;
enumerator_t *enumerator;
- u_int i, limit;
+ u_int i, from, to;
this->lock->write_lock(this->lock);
- if (segment == 0 || segment <= this->segment_count)
+ if (segment == 0 || segment <= this->count)
{
if (segment)
{ /* loop once for single segment ... */
- limit = segment + 1;
+ from = to = segment;
}
else
- { /* or segment_count times for all segments */
- limit = this->segment_count;
+ { /* or count times for all segments */
+ from = 1;
+ to = this->count;
}
enumerator = charon->ike_sa_manager->create_enumerator(charon->ike_sa_manager);
while (enumerator->enumerate(enumerator, &ike_sa))
@@ -123,7 +130,7 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
{
continue;
}
- for (i = segment; i < limit; i++)
+ for (i = from; i <= to; i++)
{
if (this->kernel->in_segment(this->kernel,
ike_sa->get_other_host(ike_sa), i))
@@ -133,7 +140,7 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
}
}
enumerator->destroy(enumerator);
- for (i = segment; i < limit; i++)
+ for (i = from; i <= to; i++)
{
if (enable)
{
@@ -152,7 +159,6 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
}
}
}
-
log_segments(this, enable, segment);
}
@@ -233,7 +239,7 @@ static void resync(private_ha_sync_segments_t *this, u_int segment)
list = linked_list_create();
this->lock->read_lock(this->lock);
- if (segment > 0 && segment <= this->segment_count && (this->active & mask))
+ if (segment > 0 && segment <= this->count && (this->active & mask))
{
this->active &= ~mask;
@@ -290,7 +296,7 @@ static bool alert_hook(private_ha_sync_segments_t *this, ike_sa_t *ike_sa,
{
int i;
- for (i = 0; i < SEGMENTS_MAX; i++)
+ for (i = 1; i <= this->count; i++)
{
if (this->active & SEGMENTS_BIT(i))
{
@@ -302,6 +308,88 @@ static bool alert_hook(private_ha_sync_segments_t *this, ike_sa_t *ike_sa,
}
/**
+ * Implementation of ha_sync_segments_t.handle_status
+ */
+static void handle_status(private_ha_sync_segments_t *this, segment_mask_t mask)
+{
+ segment_mask_t missing, overlap;
+ int i, active = 0;
+
+ this->lock->read_lock(this->lock);
+ missing = ~(this->active | mask);
+ overlap = this->active & mask;
+ for (i = 1; i <= this->count; i++)
+ {
+ if (this->active & SEGMENTS_BIT(i))
+ {
+ active++;
+ }
+ }
+ this->lock->unlock(this->lock);
+
+ /* Activate any missing segment. The master will disable overlapping
+ * segments if both nodes activate the missing segments simultaneously. */
+ for (i = 1; i <= this->count; i++)
+ {
+ if (missing & SEGMENTS_BIT(i))
+ {
+ DBG1(DBG_CFG, "HA segment %d was not handled", i);
+ activate(this, i, TRUE);
+ }
+ }
+ if (this->master && overlap)
+ {
+ /* Disable overlapping segment on one node, controlled by master */
+ for (i = 1; i <= this->count; i++)
+ {
+ if (overlap & SEGMENTS_BIT(i))
+ {
+ DBG1(DBG_CFG, "HA segment %d handled twice", i);
+ if (active > this->count)
+ {
+ deactivate(this, i, TRUE);
+ active--;
+ }
+ else
+ {
+ activate(this, i, TRUE);
+ active++;
+ }
+ }
+ }
+ }
+}
+
+/**
+ * Send a status message with our active segments
+ */
+static job_requeue_t send_status(private_ha_sync_segments_t *this)
+{
+ ha_sync_message_t *message;
+ int i;
+
+ message = ha_sync_message_create(HA_SYNC_STATUS);
+
+ for (i = 1; i <= this->count; i++)
+ {
+ if (this->active & SEGMENTS_BIT(i))
+ {
+ message->add_attribute(message, HA_SYNC_SEGMENT, i);
+ }
+ }
+
+ this->socket->push(this->socket, message);
+
+ /* schedule next invocation */
+ charon->scheduler->schedule_job_ms(charon->scheduler, (job_t*)
+ callback_job_create((callback_job_cb_t)
+ send_status, this, NULL, NULL),
+ 1000);
+
+ return JOB_REQUEUE_NONE;
+}
+
+/**
* Implementation of ha_sync_segments_t.destroy.
*/
static void destroy(private_ha_sync_segments_t *this)
@@ -314,25 +402,35 @@ static void destroy(private_ha_sync_segments_t *this)
* See header
*/
ha_sync_segments_t *ha_sync_segments_create(ha_sync_socket_t *socket,
- ha_sync_kernel_t *kernel,
- ha_sync_tunnel_t *tunnel,
- u_int count, segment_mask_t active)
+ ha_sync_kernel_t *kernel, ha_sync_tunnel_t *tunnel,
+ char *local, char *remote, u_int count)
{
private_ha_sync_segments_t *this = malloc_thing(private_ha_sync_segments_t);
+ int i;
memset(&this->public.listener, 0, sizeof(listener_t));
this->public.listener.alert = (bool(*)(listener_t*, ike_sa_t *, alert_t, va_list))alert_hook;
this->public.activate = (void(*)(ha_sync_segments_t*, u_int segment,bool))activate;
this->public.deactivate = (void(*)(ha_sync_segments_t*, u_int segment,bool))deactivate;
this->public.resync = (void(*)(ha_sync_segments_t*, u_int segment))resync;
+ this->public.handle_status = (void(*)(ha_sync_segments_t*, segment_mask_t mask))handle_status;
this->public.destroy = (void(*)(ha_sync_segments_t*))destroy;
this->socket = socket;
this->tunnel = tunnel;
this->kernel = kernel;
this->lock = rwlock_create(RWLOCK_TYPE_DEFAULT);
- this->active = active;
- this->segment_count = count;
+ this->count = count;
+ this->master = strcmp(local, remote) > 0;
+
+ /* initially all segments are active */
+ this->active = 0;
+ for (i = 1; i <= count; i++)
+ {
+ this->active |= SEGMENTS_BIT(i);
+ }
+
+ send_status(this);
return &this->public;
}
diff --git a/src/charon/plugins/ha_sync/ha_sync_segments.h b/src/charon/plugins/ha_sync/ha_sync_segments.h
index cf119a8e0..5f795db1e 100644
--- a/src/charon/plugins/ha_sync/ha_sync_segments.h
+++ b/src/charon/plugins/ha_sync/ha_sync_segments.h
@@ -80,6 +80,13 @@ struct ha_sync_segments_t {
void (*resync)(ha_sync_segments_t *this, u_int segment);
/**
+ * Handle a status message from the remote node.
+ *
+ * @param mask segments the remote node is serving actively
+ */
+ void (*handle_status)(ha_sync_segments_t *this, segment_mask_t mask);
+
+ /**
* Destroy a ha_sync_segments_t.
*/
void (*destroy)(ha_sync_segments_t *this);
@@ -95,8 +102,7 @@ struct ha_sync_segments_t {
* @return segment object
*/
ha_sync_segments_t *ha_sync_segments_create(ha_sync_socket_t *socket,
- ha_sync_kernel_t *kernel,
- ha_sync_tunnel_t *tunnel,
- u_int count, segment_mask_t active);
+ ha_sync_kernel_t *kernel, ha_sync_tunnel_t *tunnel,
+ char *local, char *remote, u_int count);
#endif /* HA_SYNC_SEGMENTS_ @}*/
diff --git a/src/charon/plugins/ha_sync/ha_sync_socket.c b/src/charon/plugins/ha_sync/ha_sync_socket.c
index 41e818528..bc010b76d 100644
--- a/src/charon/plugins/ha_sync/ha_sync_socket.c
+++ b/src/charon/plugins/ha_sync/ha_sync_socket.c
@@ -152,6 +152,7 @@ static void push(private_ha_sync_socket_t *this, ha_sync_message_t *message)
job = callback_job_create((callback_job_cb_t)send_message,
data, (void*)job_data_destroy, NULL);
charon->processor->queue_job(charon->processor, (job_t*)job);
+ sched_yield();
}
/**