aboutsummaryrefslogtreecommitdiffstats
path: root/testing/netdisco
diff options
context:
space:
mode:
authorTimo Teräs <timo.teras@iki.fi>2017-11-24 11:51:14 +0200
committerTimo Teräs <timo.teras@iki.fi>2017-11-24 11:51:24 +0200
commitf360bd4cb9f4a224052e085f16ce23b8910ef51d (patch)
treef31fe8735e1631b7db797514808cbd3d53d19ea5 /testing/netdisco
parent6d31d2e0083814abf5a013599566cd8e5e9e2a8c (diff)
downloadaports-f360bd4cb9f4a224052e085f16ce23b8910ef51d.tar.bz2
aports-f360bd4cb9f4a224052e085f16ce23b8910ef51d.tar.xz
testing/netdisco: upgrade to 2.036012_002 + git changes
Diffstat (limited to 'testing/netdisco')
-rw-r--r--testing/netdisco/APKBUILD12
-rw-r--r--testing/netdisco/git-20171124.patch902
2 files changed, 910 insertions, 4 deletions
diff --git a/testing/netdisco/APKBUILD b/testing/netdisco/APKBUILD
index 64bee33c86..b124a846ba 100644
--- a/testing/netdisco/APKBUILD
+++ b/testing/netdisco/APKBUILD
@@ -3,7 +3,8 @@
# Maintainer: Timo Teräs <timo.teras@iki.fi>
pkgname=netdisco
_pkgreal=App-Netdisco
-pkgver=2.036011
+_pkgver=2.036012_002
+pkgver=${_pkgver/_/p}
pkgrel=0
pkgdesc="An open source web-based network management tool."
url="http://search.cpan.org/dist/App-Netdisco/"
@@ -28,8 +29,10 @@ depends="perl-term-ui perl-archive-extract $cpandepends"
makedepends="perl-dev perl-module-build $cpanmakedepends"
checkdepends="$cpancheckdepends"
subpackages="$pkgname-doc"
-source="http://search.cpan.org/CPAN/authors/id/O/OL/OLIVER/$_pkgreal-$pkgver.tar.gz"
-builddir="$srcdir/$_pkgreal-$pkgver"
+source="http://search.cpan.org/CPAN/authors/id/O/OL/OLIVER/$_pkgreal-$_pkgver.tar.gz
+ git-20171124.patch
+ "
+builddir="$srcdir/$_pkgreal-$_pkgver"
prepare() {
default_prepare
@@ -56,4 +59,5 @@ check() {
./Build test
}
-sha512sums="27a3fc60290e8081bedfb8628502916797ba8c68c030a97091723a6923187484260044587f980efd3d3b0c757129951769a5ecf1b3549d73b08e524dac19a540 App-Netdisco-2.036011.tar.gz"
+sha512sums="72a399ad204dd34d8f7ffb7b00b50e36b7622bf1402972875166fee650c6c3b3f6980a94c7cbdf5793e90a0a59079c813e1a7e76f6793f806ba0fab42c94f90a App-Netdisco-2.036012_002.tar.gz
+cd7e47db82b9371af1e68086c95c70ea9493c274b90818edf50da880a50bdf1f3f06cadda5357ba01980fdffcf642984cb69c0699dc6fc227ad33d1264d65337 git-20171124.patch"
diff --git a/testing/netdisco/git-20171124.patch b/testing/netdisco/git-20171124.patch
new file mode 100644
index 0000000000..dfef8140b6
--- /dev/null
+++ b/testing/netdisco/git-20171124.patch
@@ -0,0 +1,902 @@
+commit 4a51f83efc89412f1954b3cb32a3ee83ff354095
+Author: Oliver Gorwits <oliver@cpan.org>
+Date: Sun Nov 19 22:06:15 2017 +0000
+
+ fix detection of unknown action in netdisco-do
+
+diff --git a/bin/netdisco-do b/bin/netdisco-do
+index 1c5b0b43..ab60f98e 100755
+--- a/bin/netdisco-do
++++ b/bin/netdisco-do
+@@ -139,7 +139,7 @@ foreach my $host (@hostlist) {
+ $job->log("error running job: $_");
+ };
+
+- if ($job->log eq 'no worker succeeded during main phase') {
++ if ($job->log eq 'failed to report from any worker!') {
+ pod2usage(
+ -msg => (sprintf 'error: %s is not a valid action', $action),
+ -verbose => 2,
+
+commit c576a755af61186ec46a0dc2576b43aff0ed903a
+Author: Oliver Gorwits <oliver@cpan.org>
+Date: Tue Nov 21 10:00:53 2017 +0000
+
+ tweak log message
+
+diff --git a/lib/App/Netdisco/Worker/Runner.pm b/lib/App/Netdisco/Worker/Runner.pm
+index 2a81ab0c..334e9e97 100644
+--- a/lib/App/Netdisco/Worker/Runner.pm
++++ b/lib/App/Netdisco/Worker/Runner.pm
+@@ -76,7 +76,7 @@ sub run_workers {
+ foreach my $worker (@{ $self->$set }) {
+ try { $job->add_status( $worker->($job) ) }
+ catch {
+- debug "=> $_" if $_;
++ debug "-> $_" if $_;
+ $job->add_status( Status->error($_) );
+ };
+ }
+
+commit b694258a6580382c5a66864468e4e1b136709f79
+Author: Oliver Gorwits <oliver@cpan.org>
+Date: Tue Nov 21 10:01:09 2017 +0000
+
+ leave community rows in place
+
+diff --git a/lib/App/Netdisco/DB/Result/Device.pm b/lib/App/Netdisco/DB/Result/Device.pm
+index 6352e444..3d3e0a74 100644
+--- a/lib/App/Netdisco/DB/Result/Device.pm
++++ b/lib/App/Netdisco/DB/Result/Device.pm
+@@ -249,7 +249,6 @@ sub renumber {
+ DevicePortSsid
+ DevicePortVlan
+ DevicePortWireless
+- Community
+ /) {
+ $schema->resultset($set)
+ ->search({ip => $old_ip})
+
+commit 1bbe8c916481b9e5dc0c2ec26c4d922a6dabd5f0
+Merge: b694258a 4a51f83e
+Author: Oliver Gorwits <oliver@cpan.org>
+Date: Tue Nov 21 10:02:57 2017 +0000
+
+ Merge branch 'master' of github.com:netdisco/netdisco
+
+commit 0bb15f36b9e8f374f995b5cfeaf493b74fb458e6
+Author: Oliver Gorwits <oliver@cpan.org>
+Date: Thu Nov 23 19:23:55 2017 +0000
+
+ fixes for race conditions and dupes in job queue
+
+ we had situations where the manager would start workers on the same job,
+ either because of race conditions or because at the time of queueing it wasn't
+ known that the jobs were targeting the same device (due to device aliases).
+
+ this commit removes duplicate jobs, reduces the need for locking on the job
+ queue, and makes use of lldpRemChassisId to try to deduplicate jobs before
+ they are started. in effect we have several goes to prevent duplicate jobs:
+
+ 1. at neighbor discovery time we try to skip queueing same lldpRemChassisId
+ 2. at job selection we 'error out' jobs with same profile as job selected
+ 3. at job selection we check for running job with same profile as selected
+ 4. the job manager process also checks for duplicate job profiles
+ 5. at job lock we abort if the job was 'errored out'
+
+ all together this seems to work well. a test on a large university network of
+ 303 devices (four core routers and the rest edge routers, runing VRF with many
+ duplicate identities), ~1200 subnets, ~50k hosts, resulted in no DB deadlock
+ or contention and a complete discover+arpnip+macsuck (909 jobs) in ~3 minutes
+ (with ~150 duplicate jobs identified and skipped).
+
+diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm
+index 7811b53f..72850e8f 100644
+--- a/lib/App/Netdisco/Backend/Job.pm
++++ b/lib/App/Netdisco/Backend/Job.pm
+@@ -19,6 +19,7 @@ foreach my $slot (qw/
+ username
+ userip
+ log
++ device_key
+
+ _current_phase
+ _last_namespace
+diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm
+index 25d13ba7..d594df4d 100644
+--- a/lib/App/Netdisco/Backend/Role/Manager.pm
++++ b/lib/App/Netdisco/Backend/Role/Manager.pm
+@@ -34,6 +34,16 @@ sub worker_begin {
+ }
+ }
+
++# creates a 'signature' for each job so that we can check for duplicates ...
++# it happens from time to time due to the distributed nature of the job queue
++# and manager(s) - also kinder to the DB to skip here rather than jq_lock()
++my $memoize = sub {
++ no warnings 'uninitialized';
++ my $job = shift;
++ return join chr(28), map {$job->{$_}}
++ (qw/action port subaction/, ($job->{device_key} ? 'device_key' : 'device'));
++};
++
+ sub worker_body {
+ my $self = shift;
+ my $wid = $self->wid;
+@@ -46,6 +56,7 @@ sub worker_body {
+ while (1) {
+ prctl sprintf 'nd2: #%s mgr: gathering', $wid;
+ my $num_slots = 0;
++ my %seen_job = ();
+
+ $num_slots = parse_max_workers( setting('workers')->{tasks} )
+ - $self->{queue}->pending();
+@@ -54,6 +65,7 @@ sub worker_body {
+ # get some high priority jobs
+ # TODO also check for stale jobs in Netdisco DB
+ foreach my $job ( jq_getsomep($num_slots) ) {
++ next if $seen_job{ $memoize->($job) }++;
+
+ # mark job as running
+ next unless jq_lock($job);
+@@ -71,6 +83,7 @@ sub worker_body {
+ # get some normal priority jobs
+ # TODO also check for stale jobs in Netdisco DB
+ foreach my $job ( jq_getsome($num_slots) ) {
++ next if $seen_job{ $memoize->($job) }++;
+
+ # mark job as running
+ next unless jq_lock($job);
+@@ -81,6 +94,11 @@ sub worker_body {
+ $self->{queue}->enqueue($job);
+ }
+
++ #if (scalar grep {$_ > 1} values %seen_job) {
++ # debug 'WARNING: saw duplicate jobs after getsome()';
++ # use DDP; debug p %seen_job;
++ #}
++
+ debug "mgr ($wid): sleeping now...";
+ prctl sprintf 'nd2: #%s mgr: idle', $wid;
+ sleep( setting('workers')->{sleep_time} || 1 );
+diff --git a/lib/App/Netdisco/DB.pm b/lib/App/Netdisco/DB.pm
+index 808b45b6..7e864c36 100644
+--- a/lib/App/Netdisco/DB.pm
++++ b/lib/App/Netdisco/DB.pm
+@@ -11,7 +11,7 @@ __PACKAGE__->load_namespaces(
+ );
+
+ our # try to hide from kwalitee
+- $VERSION = 44; # schema version used for upgrades, keep as integer
++ $VERSION = 45; # schema version used for upgrades, keep as integer
+
+ use Path::Class;
+ use File::ShareDir 'dist_dir';
+diff --git a/lib/App/Netdisco/DB/Result/Admin.pm b/lib/App/Netdisco/DB/Result/Admin.pm
+index 27e0cf9f..c3075c08 100644
+--- a/lib/App/Netdisco/DB/Result/Admin.pm
++++ b/lib/App/Netdisco/DB/Result/Admin.pm
+@@ -46,6 +46,8 @@ __PACKAGE__->add_columns(
+ { data_type => "text", is_nullable => 1 },
+ "debug",
+ { data_type => "boolean", is_nullable => 1 },
++ "device_key",
++ { data_type => "text", is_nullable => 1 },
+ );
+
+
+diff --git a/lib/App/Netdisco/DB/Result/Device.pm b/lib/App/Netdisco/DB/Result/Device.pm
+index 3d3e0a74..e3a794d8 100644
+--- a/lib/App/Netdisco/DB/Result/Device.pm
++++ b/lib/App/Netdisco/DB/Result/Device.pm
+@@ -238,6 +238,7 @@ sub renumber {
+ if $new_ip eq '0.0.0.0'
+ or $new_ip eq '127.0.0.1';
+
++ # Community is not included as SNMP::test_connection will take care of it
+ foreach my $set (qw/
+ DeviceIp
+ DeviceModule
+@@ -259,10 +260,6 @@ sub renumber {
+ ->search({remote_ip => $old_ip})
+ ->update({remote_ip => $new_ip});
+
+- $schema->resultset('Admin')
+- ->search({device => $old_ip})
+- ->update({device => $new_ip});
+-
+ $schema->resultset('Node')
+ ->search({switch => $old_ip})
+ ->update({switch => $new_ip});
+diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm
+index d7b73d53..859e9380 100644
+--- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm
++++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm
+@@ -13,11 +13,11 @@ use Try::Tiny;
+ use base 'Exporter';
+ our @EXPORT = ();
+ our @EXPORT_OK = qw/
++ jq_warm_thrusters
+ jq_getsome
+ jq_getsomep
+ jq_locked
+ jq_queued
+- jq_warm_thrusters
+ jq_lock
+ jq_defer
+ jq_complete
+@@ -28,6 +28,47 @@ our @EXPORT_OK = qw/
+ /;
+ our %EXPORT_TAGS = ( all => \@EXPORT_OK );
+
++# given a device, tests if any of the primary acls applies
++# returns a list of job actions to be denied/skipped on this host.
++sub _get_denied_actions {
++ my $device = shift;
++ my @badactions = ();
++ return @badactions unless $device;
++
++ push @badactions, ('discover', @{ setting('job_prio')->{high} })
++ if not is_discoverable($device);
++
++ push @badactions, (qw/macsuck nbtstat/)
++ if not is_macsuckable($device);
++
++ push @badactions, 'arpnip'
++ if not is_arpnipable($device);
++
++ return @badactions;
++}
++
++sub jq_warm_thrusters {
++ my @devices = schema('netdisco')->resultset('Device')->all;
++ my $rs = schema('netdisco')->resultset('DeviceSkip');
++ my %actionset = ();
++
++ foreach my $d (@devices) {
++ my @badactions = _get_denied_actions($d);
++ $actionset{$d->ip} = \@badactions if scalar @badactions;
++ }
++
++ schema('netdisco')->txn_do(sub {
++ $rs->search({ backend => setting('workers')->{'BACKEND'} })->delete;
++ $rs->populate([
++ map {{
++ backend => setting('workers')->{'BACKEND'},
++ device => $_,
++ actionset => $actionset{$_},
++ }} keys %actionset
++ ]);
++ });
++}
++
+ sub _getsome {
+ my ($num_slots, $where) = @_;
+ return () if ((!defined $num_slots) or ($num_slots < 1));
+@@ -46,8 +87,56 @@ sub _getsome {
+
+ my @returned = ();
+ while (my $job = $rs->next) {
+- push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
++ if ($job->device) {
++ # need to handle device discovered since backend daemon started
++ # and the skiplist was primed. these should be checked against
++ # the various acls and have device_skip entry added if needed,
++ # and return false if it should have been skipped.
++ my @badactions = _get_denied_actions($job->device);
++ if (scalar @badactions) {
++ schema('netdisco')->resultset('DeviceSkip')->find_or_create({
++ backend => setting('workers')->{'BACKEND'}, device => $job->device,
++ },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions);
++
++ # will now not be selected in a future _getsome()
++ next if scalar grep {$_ eq $job->action} @badactions;
++ }
++ }
++
++ # remove any duplicate jobs, incuding possibly this job if there
++ # is already an equivalent job running
++
++ my %job_properties = (
++ action => $job->action,
++ port => $job->port,
++ subaction => $job->subaction,
++ -or => [
++ { device => $job->device },
++ ($job->device_key ? ({ device_key => $job->device_key }) : ()),
++ ],
++ );
++
++ my $gone = $jobs->search({
++ status => 'queued',
++ -and => [
++ %job_properties,
++ -or => [{
++ job => { '!=' => $job->id },
++ },{
++ job => $job->id,
++ -exists => $jobs->search({
++ status => { -like => 'queued-%' },
++ %job_properties,
++ })->as_query,
++ }],
++ ],
++ }, {for => 'update'})
++ ->update({ status => 'error', log => (sprintf 'duplicate of %s', $job->id) });
++
++ debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id;
++ push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
+ }
++
+ return @returned;
+ }
+
+@@ -89,90 +178,17 @@ sub jq_queued {
+ })->get_column('device')->all;
+ }
+
+-# given a device, tests if any of the primary acls applies
+-# returns a list of job actions to be denied/skipped on this host.
+-sub _get_denied_actions {
+- my $device = shift;
+- my @badactions = ();
+- return @badactions unless $device;
+-
+- push @badactions, ('discover', @{ setting('job_prio')->{high} })
+- if not is_discoverable($device);
+-
+- push @badactions, (qw/macsuck nbtstat/)
+- if not is_macsuckable($device);
+-
+- push @badactions, 'arpnip'
+- if not is_arpnipable($device);
+-
+- return @badactions;
+-}
+-
+-sub jq_warm_thrusters {
+- my @devices = schema('netdisco')->resultset('Device')->all;
+- my $rs = schema('netdisco')->resultset('DeviceSkip');
+- my %actionset = ();
+-
+- foreach my $d (@devices) {
+- my @badactions = _get_denied_actions($d);
+- $actionset{$d->ip} = \@badactions if scalar @badactions;
+- }
+-
+- schema('netdisco')->txn_do(sub {
+- $rs->search({ backend => setting('workers')->{'BACKEND'} })->delete;
+- $rs->populate([
+- map {{
+- backend => setting('workers')->{'BACKEND'},
+- device => $_,
+- actionset => $actionset{$_},
+- }} keys %actionset
+- ]);
+- });
+-}
+-
+ sub jq_lock {
+ my $job = shift;
+ my $happy = false;
+
+- if ($job->device) {
+- # need to handle device discovered since backend daemon started
+- # and the skiplist was primed. these should be checked against
+- # the various acls and have device_skip entry added if needed,
+- # and return false if it should have been skipped.
+- my @badactions = _get_denied_actions($job->device);
+- if (scalar @badactions) {
+- schema('netdisco')->resultset('DeviceSkip')->find_or_create({
+- backend => setting('workers')->{'BACKEND'}, device => $job->device,
+- },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions);
+-
+- return false if scalar grep {$_ eq $job->action} @badactions;
+- }
+- }
+-
+ # lock db row and update to show job has been picked
+ try {
+- schema('netdisco')->txn_do(sub {
+- schema('netdisco')->resultset('Admin')
+- ->search({ job => $job->id }, { for => 'update' })
+- ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) });
+-
+- return unless
+- schema('netdisco')->resultset('Admin')
+- ->count({ job => $job->id,
+- status => ('queued-'. setting('workers')->{'BACKEND'}) });
+-
+- # remove any duplicate jobs, needed because we have race conditions
+- # when queueing jobs of a type for all devices
+- schema('netdisco')->resultset('Admin')->search({
+- status => 'queued',
+- device => $job->device,
+- port => $job->port,
+- action => $job->action,
+- subaction => $job->subaction,
+- }, {for => 'update'})->delete();
+-
+- $happy = true;
+- });
++ my $updated = schema('netdisco')->resultset('Admin')
++ ->search({ job => $job->id, status => 'queued' }, { for => 'update' })
++ ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) });
++
++ $happy = true if $updated > 0;
+ }
+ catch {
+ error $_;
+@@ -243,6 +259,7 @@ sub jq_complete {
+ $happy = true;
+ }
+ catch {
++ # use DDP; p $job;
+ error $_;
+ };
+
+@@ -274,13 +291,14 @@ sub jq_insert {
+ schema('netdisco')->txn_do(sub {
+ schema('netdisco')->resultset('Admin')->populate([
+ map {{
+- device => $_->{device},
+- port => $_->{port},
+- action => $_->{action},
+- subaction => ($_->{extra} || $_->{subaction}),
+- username => $_->{username},
+- userip => $_->{userip},
+- status => 'queued',
++ device => $_->{device},
++ device_key => $_->{device_key},
++ port => $_->{port},
++ action => $_->{action},
++ subaction => ($_->{extra} || $_->{subaction}),
++ username => $_->{username},
++ userip => $_->{userip},
++ status => 'queued',
+ }} @$jobs
+ ]);
+ });
+diff --git a/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm b/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm
+index 105f4185..c6335187 100644
+--- a/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm
++++ b/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm
+@@ -72,6 +72,10 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub {
+ $device->renumber($new_ip)
+ or die "cannot renumber to: $new_ip"; # rollback
+
++ # is not done in renumber but required otherwise confusing at job end!
++ schema('netdisco')->resultset('Admin')
++ ->find({job => $job->id})->update({device => $new_ip});
++
+ return Status->noop(sprintf ' [%s] device - changed IP to %s (%s)',
+ $old_ip, $device->ip, ($device->dns || ''));
+ });
+diff --git a/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm b/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm
+index aecec0e9..a3b6b20a 100644
+--- a/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm
++++ b/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm
+@@ -39,11 +39,13 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub {
+ or return Status->defer("discover failed: could not SNMP connect to $device");
+
+ my @to_discover = store_neighbors($device);
++ my %seen_id = ();
+
+ # only enqueue if device is not already discovered,
+ # discover_* config permits the discovery
+ foreach my $neighbor (@to_discover) {
+- my ($ip, $remote_type) = @$neighbor;
++ my ($ip, $remote_type, $remote_id) = @$neighbor;
++ next if $remote_id and $seen_id{ $remote_id }++;
+
+ my $device = get_device($ip);
+ next if $device->in_storage;
+@@ -55,10 +57,14 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub {
+ next;
+ }
+
++ # risk of things going wrong...?
++ # https://quickview.cloudapps.cisco.com/quickview/bug/CSCur12254
++
+ jq_insert({
+ device => $ip,
+ action => 'discover',
+ subaction => 'with-nodes',
++ ($remote_id ? (device_key => $remote_id) : ()),
+ });
+ }
+ });
+@@ -171,7 +177,7 @@ sub store_neighbors {
+ # useable remote IP...
+
+ if ($remote_ip eq '0.0.0.0' or
+- check_acl_no($remote_ip, 'group:__LOCAL_ADDRESSES__')) {
++ check_acl_no($remote_ip, 'group:__LOCAL_ADDRESSES__')) {
+
+ if ($remote_id) {
+ my $devices = schema('netdisco')->resultset('Device');
+@@ -228,7 +234,7 @@ sub store_neighbors {
+ debug sprintf
+ ' [%s] neigh - adding neighbor %s, type [%s], on %s to discovery queue',
+ $device->ip, $remote_ip, ($remote_type || ''), $port;
+- push @to_discover, [$remote_ip, $remote_type];
++ push @to_discover, [$remote_ip, $remote_type, $remote_id];
+
+ $remote_port = $c_port->{$entry};
+ if (defined $remote_port) {
+diff --git a/lib/App/Netdisco/Worker/Plugin/Expire.pm b/lib/App/Netdisco/Worker/Plugin/Expire.pm
+index 28c93cc4..0625338d 100644
+--- a/lib/App/Netdisco/Worker/Plugin/Expire.pm
++++ b/lib/App/Netdisco/Worker/Plugin/Expire.pm
+@@ -6,6 +6,7 @@ use aliased 'App::Netdisco::Worker::Status';
+
+ use Dancer::Plugin::DBIC 'schema';
+ use App::Netdisco::Util::Statistics 'update_stats';
++use App::Netdisco::DB::ExplicitLocking ':modes';
+
+ register_worker({ phase => 'main' }, sub {
+ my ($job, $workerconf) = @_;
+@@ -40,7 +41,7 @@ register_worker({ phase => 'main' }, sub {
+ }
+
+ if (setting('expire_jobs') and setting('expire_jobs') > 0) {
+- schema('netdisco')->txn_do(sub {
++ schema('netdisco')->txn_do_locked('admin', 'EXCLUSIVE', sub {
+ schema('netdisco')->resultset('Admin')->search({
+ entered => \[q/< (now() - ?::interval)/,
+ (setting('expire_jobs') * 86400)],
+diff --git a/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql b/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql
+new file mode 100644
+index 00000000..31e258e8
+--- /dev/null
++++ b/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql
+@@ -0,0 +1,5 @@
++BEGIN;
++
++ALTER TABLE "admin" ADD "device_key" text;
++
++COMMIT;
+
+commit 3db242cbe868e0672cba3b8ba1756e55cf46980c
+Author: Oliver Gorwits <oliver@cpan.org>
+Date: Thu Nov 23 22:16:50 2017 +0000
+
+ support action::namespace for netdisco-do
+
+diff --git a/bin/netdisco-do b/bin/netdisco-do
+index ab60f98e..859de29d 100755
+--- a/bin/netdisco-do
++++ b/bin/netdisco-do
+@@ -109,7 +109,7 @@ my $exitstatus = 0;
+
+ foreach my $host (@hostlist) {
+ my $dev = $host ? get_device($host->addr) : undef;
+- if ($dev and not (blessed $dev and $dev->in_storage) and $action ne 'discover') {
++ if ($dev and not (blessed $dev and $dev->in_storage) and $action !~ m/^discover/) {
+ info sprintf "%s: error - Don't know device: %s", $action, $host->addr;
+ next;
+ }
+@@ -139,7 +139,7 @@ foreach my $host (@hostlist) {
+ $job->log("error running job: $_");
+ };
+
+- if ($job->log eq 'failed to report from any worker!') {
++ if ($job->log eq 'failed to report from any worker!' and not $job->only_namespace) {
+ pod2usage(
+ -msg => (sprintf 'error: %s is not a valid action', $action),
+ -verbose => 2,
+diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm
+index 72850e8f..ddbfd034 100644
+--- a/lib/App/Netdisco/Backend/Job.pm
++++ b/lib/App/Netdisco/Backend/Job.pm
+@@ -14,6 +14,7 @@ foreach my $slot (qw/
+ device
+ port
+ action
++ only_namespace
+ subaction
+ status
+ username
+@@ -36,6 +37,15 @@ has '_statuslist' => (
+ default => sub { [] },
+ );
+
++sub BUILD {
++ my ($job, $args) = @_;
++
++ if ($job->action =~ m/^(\w+)::(\w+)$/i) {
++ $job->action($1);
++ $job->only_namespace($2);
++ }
++}
++
+ =head1 METHODS
+
+ =head2 summary
+diff --git a/lib/App/Netdisco/Worker/Plugin.pm b/lib/App/Netdisco/Worker/Plugin.pm
+index 68440f52..746d8de0 100644
+--- a/lib/App/Netdisco/Worker/Plugin.pm
++++ b/lib/App/Netdisco/Worker/Plugin.pm
+@@ -36,6 +36,13 @@ register 'register_worker' => sub {
+ # check to see if this namespace has already passed at higher priority
+ return if $job->namespace_passed($workerconf);
+
++ # support part-actions via action::namespace
++ if ($job->only_namespace and $workerconf->{phase} ne 'check') {
++ return unless $workerconf->{namespace} eq lc( $job->only_namespace )
++ or (($workerconf->{phase} eq 'early')
++ and ($job->device and not $job->device->in_storage));
++ }
++
+ my @newuserconf = ();
+ my @userconf = @{ setting('device_auth') || [] };
+
+
+commit de594c647ff3e8d43afa69a1ce1bdfc54442e5c0 (HEAD -> master, origin/master, origin/HEAD)
+Author: Oliver Gorwits <oliver@cpan.org>
+Date: Fri Nov 24 06:31:34 2017 +0000
+
+ single DB poll for new jobs both high and normal priority
+
+diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm
+index ddbfd034..9eef998e 100644
+--- a/lib/App/Netdisco/Backend/Job.pm
++++ b/lib/App/Netdisco/Backend/Job.pm
+@@ -21,6 +21,7 @@ foreach my $slot (qw/
+ userip
+ log
+ device_key
++ job_priority
+
+ _current_phase
+ _last_namespace
+diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm
+index d594df4d..3c6e7e00 100644
+--- a/lib/App/Netdisco/Backend/Role/Manager.pm
++++ b/lib/App/Netdisco/Backend/Role/Manager.pm
+@@ -6,7 +6,7 @@ use List::Util 'sum';
+ use App::Netdisco::Util::MCE;
+
+ use App::Netdisco::JobQueue
+- qw/jq_locked jq_getsome jq_getsomep jq_lock jq_warm_thrusters/;
++ qw/jq_locked jq_getsome jq_lock jq_warm_thrusters/;
+
+ use Role::Tiny;
+ use namespace::clean;
+@@ -60,28 +60,8 @@ sub worker_body {
+
+ $num_slots = parse_max_workers( setting('workers')->{tasks} )
+ - $self->{queue}->pending();
+- debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)";
++ debug "mgr ($wid): getting potential jobs for $num_slots workers";
+
+- # get some high priority jobs
+- # TODO also check for stale jobs in Netdisco DB
+- foreach my $job ( jq_getsomep($num_slots) ) {
+- next if $seen_job{ $memoize->($job) }++;
+-
+- # mark job as running
+- next unless jq_lock($job);
+- info sprintf "mgr (%s): job %s booked out for this processing node",
+- $wid, $job->id;
+-
+- # copy job to local queue
+- $self->{queue}->enqueuep(100, $job);
+- }
+-
+- $num_slots = parse_max_workers( setting('workers')->{tasks} )
+- - $self->{queue}->pending();
+- debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)";
+-
+- # get some normal priority jobs
+- # TODO also check for stale jobs in Netdisco DB
+ foreach my $job ( jq_getsome($num_slots) ) {
+ next if $seen_job{ $memoize->($job) }++;
+
+@@ -91,7 +71,7 @@ sub worker_body {
+ $wid, $job->id;
+
+ # copy job to local queue
+- $self->{queue}->enqueue($job);
++ $self->{queue}->enqueuep($job->job_priority, $job);
+ }
+
+ #if (scalar grep {$_ > 1} values %seen_job) {
+diff --git a/lib/App/Netdisco/DB/ResultSet.pm b/lib/App/Netdisco/DB/ResultSet.pm
+index 953c8e80..22c25cf8 100644
+--- a/lib/App/Netdisco/DB/ResultSet.pm
++++ b/lib/App/Netdisco/DB/ResultSet.pm
+@@ -6,7 +6,7 @@ use warnings;
+ use base 'DBIx::Class::ResultSet';
+
+ __PACKAGE__->load_components(qw/
+- Helper::ResultSet::SetOperations
++ +App::Netdisco::DB::SetOperations
+ Helper::ResultSet::Shortcut
+ Helper::ResultSet::CorrelateRelationship
+ /);
+diff --git a/lib/App/Netdisco/DB/SetOperations.pm b/lib/App/Netdisco/DB/SetOperations.pm
+new file mode 100644
+index 00000000..fef5efb4
+--- /dev/null
++++ b/lib/App/Netdisco/DB/SetOperations.pm
+@@ -0,0 +1,50 @@
++package App::Netdisco::DB::SetOperations;
++
++use strict;
++use warnings;
++
++use parent 'DBIx::Class::Helper::ResultSet::SetOperations';
++
++sub _set_operation {
++ my ( $self, $operation, $other ) = @_;
++
++ my @sql;
++ my @params;
++
++ my $as = $self->_resolved_attrs->{as};
++
++ my @operands = ( $self, ref $other eq 'ARRAY' ? @$other : $other );
++
++ for (@operands) {
++ $self->throw_exception("ResultClass of ResultSets do not match!")
++ unless $self->result_class eq $_->result_class;
++
++ my $attrs = $_->_resolved_attrs;
++
++ $self->throw_exception('ResultSets do not all have the same selected columns!')
++ unless $self->_compare_arrays($as, $attrs->{as});
++
++ my ($sql, @bind) = @{${$_->as_query}};
++ # $sql =~ s/^\s*\((.*)\)\s*$/$1/;
++ $sql = q<(> . $sql . q<)>;
++
++ push @sql, $sql;
++ push @params, @bind;
++ }
++
++ my $query = q<(> . join(" $operation ", @sql). q<)>;
++
++ my $attrs = $self->_resolved_attrs;
++ return $self->result_source->resultset->search(undef, {
++ alias => $self->current_source_alias,
++ from => [{
++ $self->current_source_alias => \[ $query, @params ],
++ -alias => $self->current_source_alias,
++ -source_handle => $self->result_source->handle,
++ }],
++ columns => $attrs->{as},
++ result_class => $self->result_class,
++ });
++}
++
++1;
+diff --git a/lib/App/Netdisco/JobQueue.pm b/lib/App/Netdisco/JobQueue.pm
+index 875ec468..733cf4bb 100644
+--- a/lib/App/Netdisco/JobQueue.pm
++++ b/lib/App/Netdisco/JobQueue.pm
+@@ -9,16 +9,15 @@ Module::Load::load
+ use base 'Exporter';
+ our @EXPORT = ();
+ our @EXPORT_OK = qw/
++ jq_warm_thrusters
+ jq_getsome
+- jq_getsomep
+ jq_locked
+ jq_queued
+- jq_warm_thrusters
+- jq_log
+- jq_userlog
+ jq_lock
+ jq_defer
+ jq_complete
++ jq_log
++ jq_userlog
+ jq_insert
+ jq_delete
+ /;
+@@ -43,10 +42,6 @@ Returns a list of randomly selected queued jobs. Default is to return one job,
+ unless C<$num> is provided. Jobs are returned as objects which implement the
+ Netdisco job instance interface (see below).
+
+-=head2 jq_getsomep( $num? )
+-
+-Same as C<jq_getsome> but for high priority jobs.
+-
+ =head2 jq_locked()
+
+ Returns the list of jobs currently booked out to this processing node (denoted
+diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm
+index 859e9380..80405701 100644
+--- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm
++++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm
+@@ -15,7 +15,6 @@ our @EXPORT = ();
+ our @EXPORT_OK = qw/
+ jq_warm_thrusters
+ jq_getsome
+- jq_getsomep
+ jq_locked
+ jq_queued
+ jq_lock
+@@ -69,23 +68,49 @@ sub jq_warm_thrusters {
+ });
+ }
+
+-sub _getsome {
+- my ($num_slots, $where) = @_;
+- return () if ((!defined $num_slots) or ($num_slots < 1));
+- return () if ((!defined $where) or (ref {} ne ref $where));
++sub jq_getsome {
++ my $num_slots = shift;
++ return () unless $num_slots and $num_slots > 0;
+
+ my $jobs = schema('netdisco')->resultset('Admin');
+- my $rs = $jobs->search({
++ my @returned = ();
++
++ my %jobsearch = (
+ status => 'queued',
+ device => { '-not_in' =>
+ $jobs->skipped(setting('workers')->{'BACKEND'},
+ setting('workers')->{'max_deferrals'},
+ setting('workers')->{'retry_after'})
+ ->columns('device')->as_query },
+- %$where,
+- }, { order_by => 'random()', rows => $num_slots });
++ );
++ my %randoms = (order_by => 'random()', rows => $num_slots );
++
++ my $hiprio = $jobs->search({
++ %jobsearch,
++ -or => [{
++ username => { '!=' => undef },
++ action => { -in => setting('job_prio')->{'normal'} },
++ },{
++ action => { -in => setting('job_prio')->{'high'} },
++ }],
++ }, {
++ %randoms,
++ '+select' => [\'100 as job_priority'], '+as' => ['me.job_priority'],
++ });
++
++ my $loprio = $jobs->search({
++ %jobsearch,
++ action => { -in => setting('job_prio')->{'normal'} },
++ }, {
++ %randoms,
++ '+select' => [\'0 as job_priority'], '+as' => ['me.job_priority'],
++ });
++
++ my $rs = $hiprio->union($loprio)->search(undef, {
++ order_by => { '-desc' => 'job_priority' },
++ rows => $num_slots,
++ });
+
+- my @returned = ();
+ while (my $job = $rs->next) {
+ if ($job->device) {
+ # need to handle device discovered since backend daemon started
+@@ -140,23 +165,6 @@ sub _getsome {
+ return @returned;
+ }
+
+-sub jq_getsome {
+- return _getsome(shift,
+- { action => { -in => setting('job_prio')->{'normal'} } }
+- );
+-}
+-
+-sub jq_getsomep {
+- return _getsome(shift, {
+- -or => [{
+- username => { '!=' => undef },
+- action => { -in => setting('job_prio')->{'normal'} },
+- },{
+- action => { -in => setting('job_prio')->{'high'} },
+- }],
+- });
+-}
+-
+ sub jq_locked {
+ my @returned = ();
+ my $rs = schema('netdisco')->resultset('Admin')