diff options
Diffstat (limited to 'testing')
-rw-r--r-- | testing/netdisco/APKBUILD | 12 | ||||
-rw-r--r-- | testing/netdisco/git-20171124.patch | 902 |
2 files changed, 910 insertions, 4 deletions
diff --git a/testing/netdisco/APKBUILD b/testing/netdisco/APKBUILD index 64bee33c86..b124a846ba 100644 --- a/testing/netdisco/APKBUILD +++ b/testing/netdisco/APKBUILD @@ -3,7 +3,8 @@ # Maintainer: Timo Teräs <timo.teras@iki.fi> pkgname=netdisco _pkgreal=App-Netdisco -pkgver=2.036011 +_pkgver=2.036012_002 +pkgver=${_pkgver/_/p} pkgrel=0 pkgdesc="An open source web-based network management tool." url="http://search.cpan.org/dist/App-Netdisco/" @@ -28,8 +29,10 @@ depends="perl-term-ui perl-archive-extract $cpandepends" makedepends="perl-dev perl-module-build $cpanmakedepends" checkdepends="$cpancheckdepends" subpackages="$pkgname-doc" -source="http://search.cpan.org/CPAN/authors/id/O/OL/OLIVER/$_pkgreal-$pkgver.tar.gz" -builddir="$srcdir/$_pkgreal-$pkgver" +source="http://search.cpan.org/CPAN/authors/id/O/OL/OLIVER/$_pkgreal-$_pkgver.tar.gz + git-20171124.patch + " +builddir="$srcdir/$_pkgreal-$_pkgver" prepare() { default_prepare @@ -56,4 +59,5 @@ check() { ./Build test } -sha512sums="27a3fc60290e8081bedfb8628502916797ba8c68c030a97091723a6923187484260044587f980efd3d3b0c757129951769a5ecf1b3549d73b08e524dac19a540 App-Netdisco-2.036011.tar.gz" +sha512sums="72a399ad204dd34d8f7ffb7b00b50e36b7622bf1402972875166fee650c6c3b3f6980a94c7cbdf5793e90a0a59079c813e1a7e76f6793f806ba0fab42c94f90a App-Netdisco-2.036012_002.tar.gz +cd7e47db82b9371af1e68086c95c70ea9493c274b90818edf50da880a50bdf1f3f06cadda5357ba01980fdffcf642984cb69c0699dc6fc227ad33d1264d65337 git-20171124.patch" diff --git a/testing/netdisco/git-20171124.patch b/testing/netdisco/git-20171124.patch new file mode 100644 index 0000000000..dfef8140b6 --- /dev/null +++ b/testing/netdisco/git-20171124.patch @@ -0,0 +1,902 @@ +commit 4a51f83efc89412f1954b3cb32a3ee83ff354095 +Author: Oliver Gorwits <oliver@cpan.org> +Date: Sun Nov 19 22:06:15 2017 +0000 + + fix detection of unknown action in netdisco-do + +diff --git a/bin/netdisco-do b/bin/netdisco-do +index 1c5b0b43..ab60f98e 100755 +--- a/bin/netdisco-do ++++ b/bin/netdisco-do +@@ -139,7 +139,7 @@ foreach my $host (@hostlist) { + $job->log("error running job: $_"); + }; + +- if ($job->log eq 'no worker succeeded during main phase') { ++ if ($job->log eq 'failed to report from any worker!') { + pod2usage( + -msg => (sprintf 'error: %s is not a valid action', $action), + -verbose => 2, + +commit c576a755af61186ec46a0dc2576b43aff0ed903a +Author: Oliver Gorwits <oliver@cpan.org> +Date: Tue Nov 21 10:00:53 2017 +0000 + + tweak log message + +diff --git a/lib/App/Netdisco/Worker/Runner.pm b/lib/App/Netdisco/Worker/Runner.pm +index 2a81ab0c..334e9e97 100644 +--- a/lib/App/Netdisco/Worker/Runner.pm ++++ b/lib/App/Netdisco/Worker/Runner.pm +@@ -76,7 +76,7 @@ sub run_workers { + foreach my $worker (@{ $self->$set }) { + try { $job->add_status( $worker->($job) ) } + catch { +- debug "=> $_" if $_; ++ debug "-> $_" if $_; + $job->add_status( Status->error($_) ); + }; + } + +commit b694258a6580382c5a66864468e4e1b136709f79 +Author: Oliver Gorwits <oliver@cpan.org> +Date: Tue Nov 21 10:01:09 2017 +0000 + + leave community rows in place + +diff --git a/lib/App/Netdisco/DB/Result/Device.pm b/lib/App/Netdisco/DB/Result/Device.pm +index 6352e444..3d3e0a74 100644 +--- a/lib/App/Netdisco/DB/Result/Device.pm ++++ b/lib/App/Netdisco/DB/Result/Device.pm +@@ -249,7 +249,6 @@ sub renumber { + DevicePortSsid + DevicePortVlan + DevicePortWireless +- Community + /) { + $schema->resultset($set) + ->search({ip => $old_ip}) + +commit 1bbe8c916481b9e5dc0c2ec26c4d922a6dabd5f0 +Merge: b694258a 4a51f83e +Author: Oliver Gorwits <oliver@cpan.org> +Date: Tue Nov 21 10:02:57 2017 +0000 + + Merge branch 'master' of github.com:netdisco/netdisco + +commit 0bb15f36b9e8f374f995b5cfeaf493b74fb458e6 +Author: Oliver Gorwits <oliver@cpan.org> +Date: Thu Nov 23 19:23:55 2017 +0000 + + fixes for race conditions and dupes in job queue + + we had situations where the manager would start workers on the same job, + either because of race conditions or because at the time of queueing it wasn't + known that the jobs were targeting the same device (due to device aliases). + + this commit removes duplicate jobs, reduces the need for locking on the job + queue, and makes use of lldpRemChassisId to try to deduplicate jobs before + they are started. in effect we have several goes to prevent duplicate jobs: + + 1. at neighbor discovery time we try to skip queueing same lldpRemChassisId + 2. at job selection we 'error out' jobs with same profile as job selected + 3. at job selection we check for running job with same profile as selected + 4. the job manager process also checks for duplicate job profiles + 5. at job lock we abort if the job was 'errored out' + + all together this seems to work well. a test on a large university network of + 303 devices (four core routers and the rest edge routers, runing VRF with many + duplicate identities), ~1200 subnets, ~50k hosts, resulted in no DB deadlock + or contention and a complete discover+arpnip+macsuck (909 jobs) in ~3 minutes + (with ~150 duplicate jobs identified and skipped). + +diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm +index 7811b53f..72850e8f 100644 +--- a/lib/App/Netdisco/Backend/Job.pm ++++ b/lib/App/Netdisco/Backend/Job.pm +@@ -19,6 +19,7 @@ foreach my $slot (qw/ + username + userip + log ++ device_key + + _current_phase + _last_namespace +diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm +index 25d13ba7..d594df4d 100644 +--- a/lib/App/Netdisco/Backend/Role/Manager.pm ++++ b/lib/App/Netdisco/Backend/Role/Manager.pm +@@ -34,6 +34,16 @@ sub worker_begin { + } + } + ++# creates a 'signature' for each job so that we can check for duplicates ... ++# it happens from time to time due to the distributed nature of the job queue ++# and manager(s) - also kinder to the DB to skip here rather than jq_lock() ++my $memoize = sub { ++ no warnings 'uninitialized'; ++ my $job = shift; ++ return join chr(28), map {$job->{$_}} ++ (qw/action port subaction/, ($job->{device_key} ? 'device_key' : 'device')); ++}; ++ + sub worker_body { + my $self = shift; + my $wid = $self->wid; +@@ -46,6 +56,7 @@ sub worker_body { + while (1) { + prctl sprintf 'nd2: #%s mgr: gathering', $wid; + my $num_slots = 0; ++ my %seen_job = (); + + $num_slots = parse_max_workers( setting('workers')->{tasks} ) + - $self->{queue}->pending(); +@@ -54,6 +65,7 @@ sub worker_body { + # get some high priority jobs + # TODO also check for stale jobs in Netdisco DB + foreach my $job ( jq_getsomep($num_slots) ) { ++ next if $seen_job{ $memoize->($job) }++; + + # mark job as running + next unless jq_lock($job); +@@ -71,6 +83,7 @@ sub worker_body { + # get some normal priority jobs + # TODO also check for stale jobs in Netdisco DB + foreach my $job ( jq_getsome($num_slots) ) { ++ next if $seen_job{ $memoize->($job) }++; + + # mark job as running + next unless jq_lock($job); +@@ -81,6 +94,11 @@ sub worker_body { + $self->{queue}->enqueue($job); + } + ++ #if (scalar grep {$_ > 1} values %seen_job) { ++ # debug 'WARNING: saw duplicate jobs after getsome()'; ++ # use DDP; debug p %seen_job; ++ #} ++ + debug "mgr ($wid): sleeping now..."; + prctl sprintf 'nd2: #%s mgr: idle', $wid; + sleep( setting('workers')->{sleep_time} || 1 ); +diff --git a/lib/App/Netdisco/DB.pm b/lib/App/Netdisco/DB.pm +index 808b45b6..7e864c36 100644 +--- a/lib/App/Netdisco/DB.pm ++++ b/lib/App/Netdisco/DB.pm +@@ -11,7 +11,7 @@ __PACKAGE__->load_namespaces( + ); + + our # try to hide from kwalitee +- $VERSION = 44; # schema version used for upgrades, keep as integer ++ $VERSION = 45; # schema version used for upgrades, keep as integer + + use Path::Class; + use File::ShareDir 'dist_dir'; +diff --git a/lib/App/Netdisco/DB/Result/Admin.pm b/lib/App/Netdisco/DB/Result/Admin.pm +index 27e0cf9f..c3075c08 100644 +--- a/lib/App/Netdisco/DB/Result/Admin.pm ++++ b/lib/App/Netdisco/DB/Result/Admin.pm +@@ -46,6 +46,8 @@ __PACKAGE__->add_columns( + { data_type => "text", is_nullable => 1 }, + "debug", + { data_type => "boolean", is_nullable => 1 }, ++ "device_key", ++ { data_type => "text", is_nullable => 1 }, + ); + + +diff --git a/lib/App/Netdisco/DB/Result/Device.pm b/lib/App/Netdisco/DB/Result/Device.pm +index 3d3e0a74..e3a794d8 100644 +--- a/lib/App/Netdisco/DB/Result/Device.pm ++++ b/lib/App/Netdisco/DB/Result/Device.pm +@@ -238,6 +238,7 @@ sub renumber { + if $new_ip eq '0.0.0.0' + or $new_ip eq '127.0.0.1'; + ++ # Community is not included as SNMP::test_connection will take care of it + foreach my $set (qw/ + DeviceIp + DeviceModule +@@ -259,10 +260,6 @@ sub renumber { + ->search({remote_ip => $old_ip}) + ->update({remote_ip => $new_ip}); + +- $schema->resultset('Admin') +- ->search({device => $old_ip}) +- ->update({device => $new_ip}); +- + $schema->resultset('Node') + ->search({switch => $old_ip}) + ->update({switch => $new_ip}); +diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm +index d7b73d53..859e9380 100644 +--- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm ++++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm +@@ -13,11 +13,11 @@ use Try::Tiny; + use base 'Exporter'; + our @EXPORT = (); + our @EXPORT_OK = qw/ ++ jq_warm_thrusters + jq_getsome + jq_getsomep + jq_locked + jq_queued +- jq_warm_thrusters + jq_lock + jq_defer + jq_complete +@@ -28,6 +28,47 @@ our @EXPORT_OK = qw/ + /; + our %EXPORT_TAGS = ( all => \@EXPORT_OK ); + ++# given a device, tests if any of the primary acls applies ++# returns a list of job actions to be denied/skipped on this host. ++sub _get_denied_actions { ++ my $device = shift; ++ my @badactions = (); ++ return @badactions unless $device; ++ ++ push @badactions, ('discover', @{ setting('job_prio')->{high} }) ++ if not is_discoverable($device); ++ ++ push @badactions, (qw/macsuck nbtstat/) ++ if not is_macsuckable($device); ++ ++ push @badactions, 'arpnip' ++ if not is_arpnipable($device); ++ ++ return @badactions; ++} ++ ++sub jq_warm_thrusters { ++ my @devices = schema('netdisco')->resultset('Device')->all; ++ my $rs = schema('netdisco')->resultset('DeviceSkip'); ++ my %actionset = (); ++ ++ foreach my $d (@devices) { ++ my @badactions = _get_denied_actions($d); ++ $actionset{$d->ip} = \@badactions if scalar @badactions; ++ } ++ ++ schema('netdisco')->txn_do(sub { ++ $rs->search({ backend => setting('workers')->{'BACKEND'} })->delete; ++ $rs->populate([ ++ map {{ ++ backend => setting('workers')->{'BACKEND'}, ++ device => $_, ++ actionset => $actionset{$_}, ++ }} keys %actionset ++ ]); ++ }); ++} ++ + sub _getsome { + my ($num_slots, $where) = @_; + return () if ((!defined $num_slots) or ($num_slots < 1)); +@@ -46,8 +87,56 @@ sub _getsome { + + my @returned = (); + while (my $job = $rs->next) { +- push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); ++ if ($job->device) { ++ # need to handle device discovered since backend daemon started ++ # and the skiplist was primed. these should be checked against ++ # the various acls and have device_skip entry added if needed, ++ # and return false if it should have been skipped. ++ my @badactions = _get_denied_actions($job->device); ++ if (scalar @badactions) { ++ schema('netdisco')->resultset('DeviceSkip')->find_or_create({ ++ backend => setting('workers')->{'BACKEND'}, device => $job->device, ++ },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions); ++ ++ # will now not be selected in a future _getsome() ++ next if scalar grep {$_ eq $job->action} @badactions; ++ } ++ } ++ ++ # remove any duplicate jobs, incuding possibly this job if there ++ # is already an equivalent job running ++ ++ my %job_properties = ( ++ action => $job->action, ++ port => $job->port, ++ subaction => $job->subaction, ++ -or => [ ++ { device => $job->device }, ++ ($job->device_key ? ({ device_key => $job->device_key }) : ()), ++ ], ++ ); ++ ++ my $gone = $jobs->search({ ++ status => 'queued', ++ -and => [ ++ %job_properties, ++ -or => [{ ++ job => { '!=' => $job->id }, ++ },{ ++ job => $job->id, ++ -exists => $jobs->search({ ++ status => { -like => 'queued-%' }, ++ %job_properties, ++ })->as_query, ++ }], ++ ], ++ }, {for => 'update'}) ++ ->update({ status => 'error', log => (sprintf 'duplicate of %s', $job->id) }); ++ ++ debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id; ++ push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); + } ++ + return @returned; + } + +@@ -89,90 +178,17 @@ sub jq_queued { + })->get_column('device')->all; + } + +-# given a device, tests if any of the primary acls applies +-# returns a list of job actions to be denied/skipped on this host. +-sub _get_denied_actions { +- my $device = shift; +- my @badactions = (); +- return @badactions unless $device; +- +- push @badactions, ('discover', @{ setting('job_prio')->{high} }) +- if not is_discoverable($device); +- +- push @badactions, (qw/macsuck nbtstat/) +- if not is_macsuckable($device); +- +- push @badactions, 'arpnip' +- if not is_arpnipable($device); +- +- return @badactions; +-} +- +-sub jq_warm_thrusters { +- my @devices = schema('netdisco')->resultset('Device')->all; +- my $rs = schema('netdisco')->resultset('DeviceSkip'); +- my %actionset = (); +- +- foreach my $d (@devices) { +- my @badactions = _get_denied_actions($d); +- $actionset{$d->ip} = \@badactions if scalar @badactions; +- } +- +- schema('netdisco')->txn_do(sub { +- $rs->search({ backend => setting('workers')->{'BACKEND'} })->delete; +- $rs->populate([ +- map {{ +- backend => setting('workers')->{'BACKEND'}, +- device => $_, +- actionset => $actionset{$_}, +- }} keys %actionset +- ]); +- }); +-} +- + sub jq_lock { + my $job = shift; + my $happy = false; + +- if ($job->device) { +- # need to handle device discovered since backend daemon started +- # and the skiplist was primed. these should be checked against +- # the various acls and have device_skip entry added if needed, +- # and return false if it should have been skipped. +- my @badactions = _get_denied_actions($job->device); +- if (scalar @badactions) { +- schema('netdisco')->resultset('DeviceSkip')->find_or_create({ +- backend => setting('workers')->{'BACKEND'}, device => $job->device, +- },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions); +- +- return false if scalar grep {$_ eq $job->action} @badactions; +- } +- } +- + # lock db row and update to show job has been picked + try { +- schema('netdisco')->txn_do(sub { +- schema('netdisco')->resultset('Admin') +- ->search({ job => $job->id }, { for => 'update' }) +- ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) }); +- +- return unless +- schema('netdisco')->resultset('Admin') +- ->count({ job => $job->id, +- status => ('queued-'. setting('workers')->{'BACKEND'}) }); +- +- # remove any duplicate jobs, needed because we have race conditions +- # when queueing jobs of a type for all devices +- schema('netdisco')->resultset('Admin')->search({ +- status => 'queued', +- device => $job->device, +- port => $job->port, +- action => $job->action, +- subaction => $job->subaction, +- }, {for => 'update'})->delete(); +- +- $happy = true; +- }); ++ my $updated = schema('netdisco')->resultset('Admin') ++ ->search({ job => $job->id, status => 'queued' }, { for => 'update' }) ++ ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) }); ++ ++ $happy = true if $updated > 0; + } + catch { + error $_; +@@ -243,6 +259,7 @@ sub jq_complete { + $happy = true; + } + catch { ++ # use DDP; p $job; + error $_; + }; + +@@ -274,13 +291,14 @@ sub jq_insert { + schema('netdisco')->txn_do(sub { + schema('netdisco')->resultset('Admin')->populate([ + map {{ +- device => $_->{device}, +- port => $_->{port}, +- action => $_->{action}, +- subaction => ($_->{extra} || $_->{subaction}), +- username => $_->{username}, +- userip => $_->{userip}, +- status => 'queued', ++ device => $_->{device}, ++ device_key => $_->{device_key}, ++ port => $_->{port}, ++ action => $_->{action}, ++ subaction => ($_->{extra} || $_->{subaction}), ++ username => $_->{username}, ++ userip => $_->{userip}, ++ status => 'queued', + }} @$jobs + ]); + }); +diff --git a/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm b/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm +index 105f4185..c6335187 100644 +--- a/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm ++++ b/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm +@@ -72,6 +72,10 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub { + $device->renumber($new_ip) + or die "cannot renumber to: $new_ip"; # rollback + ++ # is not done in renumber but required otherwise confusing at job end! ++ schema('netdisco')->resultset('Admin') ++ ->find({job => $job->id})->update({device => $new_ip}); ++ + return Status->noop(sprintf ' [%s] device - changed IP to %s (%s)', + $old_ip, $device->ip, ($device->dns || '')); + }); +diff --git a/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm b/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm +index aecec0e9..a3b6b20a 100644 +--- a/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm ++++ b/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm +@@ -39,11 +39,13 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub { + or return Status->defer("discover failed: could not SNMP connect to $device"); + + my @to_discover = store_neighbors($device); ++ my %seen_id = (); + + # only enqueue if device is not already discovered, + # discover_* config permits the discovery + foreach my $neighbor (@to_discover) { +- my ($ip, $remote_type) = @$neighbor; ++ my ($ip, $remote_type, $remote_id) = @$neighbor; ++ next if $remote_id and $seen_id{ $remote_id }++; + + my $device = get_device($ip); + next if $device->in_storage; +@@ -55,10 +57,14 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub { + next; + } + ++ # risk of things going wrong...? ++ # https://quickview.cloudapps.cisco.com/quickview/bug/CSCur12254 ++ + jq_insert({ + device => $ip, + action => 'discover', + subaction => 'with-nodes', ++ ($remote_id ? (device_key => $remote_id) : ()), + }); + } + }); +@@ -171,7 +177,7 @@ sub store_neighbors { + # useable remote IP... + + if ($remote_ip eq '0.0.0.0' or +- check_acl_no($remote_ip, 'group:__LOCAL_ADDRESSES__')) { ++ check_acl_no($remote_ip, 'group:__LOCAL_ADDRESSES__')) { + + if ($remote_id) { + my $devices = schema('netdisco')->resultset('Device'); +@@ -228,7 +234,7 @@ sub store_neighbors { + debug sprintf + ' [%s] neigh - adding neighbor %s, type [%s], on %s to discovery queue', + $device->ip, $remote_ip, ($remote_type || ''), $port; +- push @to_discover, [$remote_ip, $remote_type]; ++ push @to_discover, [$remote_ip, $remote_type, $remote_id]; + + $remote_port = $c_port->{$entry}; + if (defined $remote_port) { +diff --git a/lib/App/Netdisco/Worker/Plugin/Expire.pm b/lib/App/Netdisco/Worker/Plugin/Expire.pm +index 28c93cc4..0625338d 100644 +--- a/lib/App/Netdisco/Worker/Plugin/Expire.pm ++++ b/lib/App/Netdisco/Worker/Plugin/Expire.pm +@@ -6,6 +6,7 @@ use aliased 'App::Netdisco::Worker::Status'; + + use Dancer::Plugin::DBIC 'schema'; + use App::Netdisco::Util::Statistics 'update_stats'; ++use App::Netdisco::DB::ExplicitLocking ':modes'; + + register_worker({ phase => 'main' }, sub { + my ($job, $workerconf) = @_; +@@ -40,7 +41,7 @@ register_worker({ phase => 'main' }, sub { + } + + if (setting('expire_jobs') and setting('expire_jobs') > 0) { +- schema('netdisco')->txn_do(sub { ++ schema('netdisco')->txn_do_locked('admin', 'EXCLUSIVE', sub { + schema('netdisco')->resultset('Admin')->search({ + entered => \[q/< (now() - ?::interval)/, + (setting('expire_jobs') * 86400)], +diff --git a/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql b/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql +new file mode 100644 +index 00000000..31e258e8 +--- /dev/null ++++ b/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql +@@ -0,0 +1,5 @@ ++BEGIN; ++ ++ALTER TABLE "admin" ADD "device_key" text; ++ ++COMMIT; + +commit 3db242cbe868e0672cba3b8ba1756e55cf46980c +Author: Oliver Gorwits <oliver@cpan.org> +Date: Thu Nov 23 22:16:50 2017 +0000 + + support action::namespace for netdisco-do + +diff --git a/bin/netdisco-do b/bin/netdisco-do +index ab60f98e..859de29d 100755 +--- a/bin/netdisco-do ++++ b/bin/netdisco-do +@@ -109,7 +109,7 @@ my $exitstatus = 0; + + foreach my $host (@hostlist) { + my $dev = $host ? get_device($host->addr) : undef; +- if ($dev and not (blessed $dev and $dev->in_storage) and $action ne 'discover') { ++ if ($dev and not (blessed $dev and $dev->in_storage) and $action !~ m/^discover/) { + info sprintf "%s: error - Don't know device: %s", $action, $host->addr; + next; + } +@@ -139,7 +139,7 @@ foreach my $host (@hostlist) { + $job->log("error running job: $_"); + }; + +- if ($job->log eq 'failed to report from any worker!') { ++ if ($job->log eq 'failed to report from any worker!' and not $job->only_namespace) { + pod2usage( + -msg => (sprintf 'error: %s is not a valid action', $action), + -verbose => 2, +diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm +index 72850e8f..ddbfd034 100644 +--- a/lib/App/Netdisco/Backend/Job.pm ++++ b/lib/App/Netdisco/Backend/Job.pm +@@ -14,6 +14,7 @@ foreach my $slot (qw/ + device + port + action ++ only_namespace + subaction + status + username +@@ -36,6 +37,15 @@ has '_statuslist' => ( + default => sub { [] }, + ); + ++sub BUILD { ++ my ($job, $args) = @_; ++ ++ if ($job->action =~ m/^(\w+)::(\w+)$/i) { ++ $job->action($1); ++ $job->only_namespace($2); ++ } ++} ++ + =head1 METHODS + + =head2 summary +diff --git a/lib/App/Netdisco/Worker/Plugin.pm b/lib/App/Netdisco/Worker/Plugin.pm +index 68440f52..746d8de0 100644 +--- a/lib/App/Netdisco/Worker/Plugin.pm ++++ b/lib/App/Netdisco/Worker/Plugin.pm +@@ -36,6 +36,13 @@ register 'register_worker' => sub { + # check to see if this namespace has already passed at higher priority + return if $job->namespace_passed($workerconf); + ++ # support part-actions via action::namespace ++ if ($job->only_namespace and $workerconf->{phase} ne 'check') { ++ return unless $workerconf->{namespace} eq lc( $job->only_namespace ) ++ or (($workerconf->{phase} eq 'early') ++ and ($job->device and not $job->device->in_storage)); ++ } ++ + my @newuserconf = (); + my @userconf = @{ setting('device_auth') || [] }; + + +commit de594c647ff3e8d43afa69a1ce1bdfc54442e5c0 (HEAD -> master, origin/master, origin/HEAD) +Author: Oliver Gorwits <oliver@cpan.org> +Date: Fri Nov 24 06:31:34 2017 +0000 + + single DB poll for new jobs both high and normal priority + +diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm +index ddbfd034..9eef998e 100644 +--- a/lib/App/Netdisco/Backend/Job.pm ++++ b/lib/App/Netdisco/Backend/Job.pm +@@ -21,6 +21,7 @@ foreach my $slot (qw/ + userip + log + device_key ++ job_priority + + _current_phase + _last_namespace +diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm +index d594df4d..3c6e7e00 100644 +--- a/lib/App/Netdisco/Backend/Role/Manager.pm ++++ b/lib/App/Netdisco/Backend/Role/Manager.pm +@@ -6,7 +6,7 @@ use List::Util 'sum'; + use App::Netdisco::Util::MCE; + + use App::Netdisco::JobQueue +- qw/jq_locked jq_getsome jq_getsomep jq_lock jq_warm_thrusters/; ++ qw/jq_locked jq_getsome jq_lock jq_warm_thrusters/; + + use Role::Tiny; + use namespace::clean; +@@ -60,28 +60,8 @@ sub worker_body { + + $num_slots = parse_max_workers( setting('workers')->{tasks} ) + - $self->{queue}->pending(); +- debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)"; ++ debug "mgr ($wid): getting potential jobs for $num_slots workers"; + +- # get some high priority jobs +- # TODO also check for stale jobs in Netdisco DB +- foreach my $job ( jq_getsomep($num_slots) ) { +- next if $seen_job{ $memoize->($job) }++; +- +- # mark job as running +- next unless jq_lock($job); +- info sprintf "mgr (%s): job %s booked out for this processing node", +- $wid, $job->id; +- +- # copy job to local queue +- $self->{queue}->enqueuep(100, $job); +- } +- +- $num_slots = parse_max_workers( setting('workers')->{tasks} ) +- - $self->{queue}->pending(); +- debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)"; +- +- # get some normal priority jobs +- # TODO also check for stale jobs in Netdisco DB + foreach my $job ( jq_getsome($num_slots) ) { + next if $seen_job{ $memoize->($job) }++; + +@@ -91,7 +71,7 @@ sub worker_body { + $wid, $job->id; + + # copy job to local queue +- $self->{queue}->enqueue($job); ++ $self->{queue}->enqueuep($job->job_priority, $job); + } + + #if (scalar grep {$_ > 1} values %seen_job) { +diff --git a/lib/App/Netdisco/DB/ResultSet.pm b/lib/App/Netdisco/DB/ResultSet.pm +index 953c8e80..22c25cf8 100644 +--- a/lib/App/Netdisco/DB/ResultSet.pm ++++ b/lib/App/Netdisco/DB/ResultSet.pm +@@ -6,7 +6,7 @@ use warnings; + use base 'DBIx::Class::ResultSet'; + + __PACKAGE__->load_components(qw/ +- Helper::ResultSet::SetOperations ++ +App::Netdisco::DB::SetOperations + Helper::ResultSet::Shortcut + Helper::ResultSet::CorrelateRelationship + /); +diff --git a/lib/App/Netdisco/DB/SetOperations.pm b/lib/App/Netdisco/DB/SetOperations.pm +new file mode 100644 +index 00000000..fef5efb4 +--- /dev/null ++++ b/lib/App/Netdisco/DB/SetOperations.pm +@@ -0,0 +1,50 @@ ++package App::Netdisco::DB::SetOperations; ++ ++use strict; ++use warnings; ++ ++use parent 'DBIx::Class::Helper::ResultSet::SetOperations'; ++ ++sub _set_operation { ++ my ( $self, $operation, $other ) = @_; ++ ++ my @sql; ++ my @params; ++ ++ my $as = $self->_resolved_attrs->{as}; ++ ++ my @operands = ( $self, ref $other eq 'ARRAY' ? @$other : $other ); ++ ++ for (@operands) { ++ $self->throw_exception("ResultClass of ResultSets do not match!") ++ unless $self->result_class eq $_->result_class; ++ ++ my $attrs = $_->_resolved_attrs; ++ ++ $self->throw_exception('ResultSets do not all have the same selected columns!') ++ unless $self->_compare_arrays($as, $attrs->{as}); ++ ++ my ($sql, @bind) = @{${$_->as_query}}; ++ # $sql =~ s/^\s*\((.*)\)\s*$/$1/; ++ $sql = q<(> . $sql . q<)>; ++ ++ push @sql, $sql; ++ push @params, @bind; ++ } ++ ++ my $query = q<(> . join(" $operation ", @sql). q<)>; ++ ++ my $attrs = $self->_resolved_attrs; ++ return $self->result_source->resultset->search(undef, { ++ alias => $self->current_source_alias, ++ from => [{ ++ $self->current_source_alias => \[ $query, @params ], ++ -alias => $self->current_source_alias, ++ -source_handle => $self->result_source->handle, ++ }], ++ columns => $attrs->{as}, ++ result_class => $self->result_class, ++ }); ++} ++ ++1; +diff --git a/lib/App/Netdisco/JobQueue.pm b/lib/App/Netdisco/JobQueue.pm +index 875ec468..733cf4bb 100644 +--- a/lib/App/Netdisco/JobQueue.pm ++++ b/lib/App/Netdisco/JobQueue.pm +@@ -9,16 +9,15 @@ Module::Load::load + use base 'Exporter'; + our @EXPORT = (); + our @EXPORT_OK = qw/ ++ jq_warm_thrusters + jq_getsome +- jq_getsomep + jq_locked + jq_queued +- jq_warm_thrusters +- jq_log +- jq_userlog + jq_lock + jq_defer + jq_complete ++ jq_log ++ jq_userlog + jq_insert + jq_delete + /; +@@ -43,10 +42,6 @@ Returns a list of randomly selected queued jobs. Default is to return one job, + unless C<$num> is provided. Jobs are returned as objects which implement the + Netdisco job instance interface (see below). + +-=head2 jq_getsomep( $num? ) +- +-Same as C<jq_getsome> but for high priority jobs. +- + =head2 jq_locked() + + Returns the list of jobs currently booked out to this processing node (denoted +diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm +index 859e9380..80405701 100644 +--- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm ++++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm +@@ -15,7 +15,6 @@ our @EXPORT = (); + our @EXPORT_OK = qw/ + jq_warm_thrusters + jq_getsome +- jq_getsomep + jq_locked + jq_queued + jq_lock +@@ -69,23 +68,49 @@ sub jq_warm_thrusters { + }); + } + +-sub _getsome { +- my ($num_slots, $where) = @_; +- return () if ((!defined $num_slots) or ($num_slots < 1)); +- return () if ((!defined $where) or (ref {} ne ref $where)); ++sub jq_getsome { ++ my $num_slots = shift; ++ return () unless $num_slots and $num_slots > 0; + + my $jobs = schema('netdisco')->resultset('Admin'); +- my $rs = $jobs->search({ ++ my @returned = (); ++ ++ my %jobsearch = ( + status => 'queued', + device => { '-not_in' => + $jobs->skipped(setting('workers')->{'BACKEND'}, + setting('workers')->{'max_deferrals'}, + setting('workers')->{'retry_after'}) + ->columns('device')->as_query }, +- %$where, +- }, { order_by => 'random()', rows => $num_slots }); ++ ); ++ my %randoms = (order_by => 'random()', rows => $num_slots ); ++ ++ my $hiprio = $jobs->search({ ++ %jobsearch, ++ -or => [{ ++ username => { '!=' => undef }, ++ action => { -in => setting('job_prio')->{'normal'} }, ++ },{ ++ action => { -in => setting('job_prio')->{'high'} }, ++ }], ++ }, { ++ %randoms, ++ '+select' => [\'100 as job_priority'], '+as' => ['me.job_priority'], ++ }); ++ ++ my $loprio = $jobs->search({ ++ %jobsearch, ++ action => { -in => setting('job_prio')->{'normal'} }, ++ }, { ++ %randoms, ++ '+select' => [\'0 as job_priority'], '+as' => ['me.job_priority'], ++ }); ++ ++ my $rs = $hiprio->union($loprio)->search(undef, { ++ order_by => { '-desc' => 'job_priority' }, ++ rows => $num_slots, ++ }); + +- my @returned = (); + while (my $job = $rs->next) { + if ($job->device) { + # need to handle device discovered since backend daemon started +@@ -140,23 +165,6 @@ sub _getsome { + return @returned; + } + +-sub jq_getsome { +- return _getsome(shift, +- { action => { -in => setting('job_prio')->{'normal'} } } +- ); +-} +- +-sub jq_getsomep { +- return _getsome(shift, { +- -or => [{ +- username => { '!=' => undef }, +- action => { -in => setting('job_prio')->{'normal'} }, +- },{ +- action => { -in => setting('job_prio')->{'high'} }, +- }], +- }); +-} +- + sub jq_locked { + my @returned = (); + my $rs = schema('netdisco')->resultset('Admin') |