From 7b05249a9069022d447b09e0211f5d82f3057bad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20Ter=C3=A4s?= Date: Thu, 14 Dec 2017 06:19:59 +0000 Subject: testing/netdisco: upgrade to 2.036012_003 --- testing/netdisco/APKBUILD | 8 +- testing/netdisco/git-20171124.patch | 902 ------------------------------------ 2 files changed, 3 insertions(+), 907 deletions(-) delete mode 100644 testing/netdisco/git-20171124.patch (limited to 'testing/netdisco') diff --git a/testing/netdisco/APKBUILD b/testing/netdisco/APKBUILD index fa04107a65..1508391cbd 100644 --- a/testing/netdisco/APKBUILD +++ b/testing/netdisco/APKBUILD @@ -3,9 +3,9 @@ # Maintainer: Timo Teräs pkgname=netdisco _pkgreal=App-Netdisco -_pkgver=2.036012_002 +_pkgver=2.036012_003 pkgver=${_pkgver/_/p} -pkgrel=1 +pkgrel=0 pkgdesc="An open source web-based network management tool." url="http://search.cpan.org/dist/App-Netdisco/" arch="noarch" @@ -30,7 +30,6 @@ makedepends="perl-dev perl-module-build $cpanmakedepends" checkdepends="$cpancheckdepends" subpackages="$pkgname-doc" source="http://search.cpan.org/CPAN/authors/id/O/OL/OLIVER/$_pkgreal-$_pkgver.tar.gz - git-20171124.patch " builddir="$srcdir/$_pkgreal-$_pkgver" @@ -59,5 +58,4 @@ check() { ./Build test } -sha512sums="72a399ad204dd34d8f7ffb7b00b50e36b7622bf1402972875166fee650c6c3b3f6980a94c7cbdf5793e90a0a59079c813e1a7e76f6793f806ba0fab42c94f90a App-Netdisco-2.036012_002.tar.gz -cd7e47db82b9371af1e68086c95c70ea9493c274b90818edf50da880a50bdf1f3f06cadda5357ba01980fdffcf642984cb69c0699dc6fc227ad33d1264d65337 git-20171124.patch" +sha512sums="4c0f5dcd6f6174f5531399f1bc6aafb3a650e4abe4ff97c39fdeb6ce982e7a7c95cd205e53f871b56aafa98a9cd4d5b6812aea8415e3ef5a2ca06c1f9576ae3f App-Netdisco-2.036012_003.tar.gz" diff --git a/testing/netdisco/git-20171124.patch b/testing/netdisco/git-20171124.patch deleted file mode 100644 index dfef8140b6..0000000000 --- a/testing/netdisco/git-20171124.patch +++ /dev/null @@ -1,902 +0,0 @@ -commit 4a51f83efc89412f1954b3cb32a3ee83ff354095 -Author: Oliver Gorwits -Date: Sun Nov 19 22:06:15 2017 +0000 - - fix detection of unknown action in netdisco-do - -diff --git a/bin/netdisco-do b/bin/netdisco-do -index 1c5b0b43..ab60f98e 100755 ---- a/bin/netdisco-do -+++ b/bin/netdisco-do -@@ -139,7 +139,7 @@ foreach my $host (@hostlist) { - $job->log("error running job: $_"); - }; - -- if ($job->log eq 'no worker succeeded during main phase') { -+ if ($job->log eq 'failed to report from any worker!') { - pod2usage( - -msg => (sprintf 'error: %s is not a valid action', $action), - -verbose => 2, - -commit c576a755af61186ec46a0dc2576b43aff0ed903a -Author: Oliver Gorwits -Date: Tue Nov 21 10:00:53 2017 +0000 - - tweak log message - -diff --git a/lib/App/Netdisco/Worker/Runner.pm b/lib/App/Netdisco/Worker/Runner.pm -index 2a81ab0c..334e9e97 100644 ---- a/lib/App/Netdisco/Worker/Runner.pm -+++ b/lib/App/Netdisco/Worker/Runner.pm -@@ -76,7 +76,7 @@ sub run_workers { - foreach my $worker (@{ $self->$set }) { - try { $job->add_status( $worker->($job) ) } - catch { -- debug "=> $_" if $_; -+ debug "-> $_" if $_; - $job->add_status( Status->error($_) ); - }; - } - -commit b694258a6580382c5a66864468e4e1b136709f79 -Author: Oliver Gorwits -Date: Tue Nov 21 10:01:09 2017 +0000 - - leave community rows in place - -diff --git a/lib/App/Netdisco/DB/Result/Device.pm b/lib/App/Netdisco/DB/Result/Device.pm -index 6352e444..3d3e0a74 100644 ---- a/lib/App/Netdisco/DB/Result/Device.pm -+++ b/lib/App/Netdisco/DB/Result/Device.pm -@@ -249,7 +249,6 @@ sub renumber { - DevicePortSsid - DevicePortVlan - DevicePortWireless -- Community - /) { - $schema->resultset($set) - ->search({ip => $old_ip}) - -commit 1bbe8c916481b9e5dc0c2ec26c4d922a6dabd5f0 -Merge: b694258a 4a51f83e -Author: Oliver Gorwits -Date: Tue Nov 21 10:02:57 2017 +0000 - - Merge branch 'master' of github.com:netdisco/netdisco - -commit 0bb15f36b9e8f374f995b5cfeaf493b74fb458e6 -Author: Oliver Gorwits -Date: Thu Nov 23 19:23:55 2017 +0000 - - fixes for race conditions and dupes in job queue - - we had situations where the manager would start workers on the same job, - either because of race conditions or because at the time of queueing it wasn't - known that the jobs were targeting the same device (due to device aliases). - - this commit removes duplicate jobs, reduces the need for locking on the job - queue, and makes use of lldpRemChassisId to try to deduplicate jobs before - they are started. in effect we have several goes to prevent duplicate jobs: - - 1. at neighbor discovery time we try to skip queueing same lldpRemChassisId - 2. at job selection we 'error out' jobs with same profile as job selected - 3. at job selection we check for running job with same profile as selected - 4. the job manager process also checks for duplicate job profiles - 5. at job lock we abort if the job was 'errored out' - - all together this seems to work well. a test on a large university network of - 303 devices (four core routers and the rest edge routers, runing VRF with many - duplicate identities), ~1200 subnets, ~50k hosts, resulted in no DB deadlock - or contention and a complete discover+arpnip+macsuck (909 jobs) in ~3 minutes - (with ~150 duplicate jobs identified and skipped). - -diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm -index 7811b53f..72850e8f 100644 ---- a/lib/App/Netdisco/Backend/Job.pm -+++ b/lib/App/Netdisco/Backend/Job.pm -@@ -19,6 +19,7 @@ foreach my $slot (qw/ - username - userip - log -+ device_key - - _current_phase - _last_namespace -diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm -index 25d13ba7..d594df4d 100644 ---- a/lib/App/Netdisco/Backend/Role/Manager.pm -+++ b/lib/App/Netdisco/Backend/Role/Manager.pm -@@ -34,6 +34,16 @@ sub worker_begin { - } - } - -+# creates a 'signature' for each job so that we can check for duplicates ... -+# it happens from time to time due to the distributed nature of the job queue -+# and manager(s) - also kinder to the DB to skip here rather than jq_lock() -+my $memoize = sub { -+ no warnings 'uninitialized'; -+ my $job = shift; -+ return join chr(28), map {$job->{$_}} -+ (qw/action port subaction/, ($job->{device_key} ? 'device_key' : 'device')); -+}; -+ - sub worker_body { - my $self = shift; - my $wid = $self->wid; -@@ -46,6 +56,7 @@ sub worker_body { - while (1) { - prctl sprintf 'nd2: #%s mgr: gathering', $wid; - my $num_slots = 0; -+ my %seen_job = (); - - $num_slots = parse_max_workers( setting('workers')->{tasks} ) - - $self->{queue}->pending(); -@@ -54,6 +65,7 @@ sub worker_body { - # get some high priority jobs - # TODO also check for stale jobs in Netdisco DB - foreach my $job ( jq_getsomep($num_slots) ) { -+ next if $seen_job{ $memoize->($job) }++; - - # mark job as running - next unless jq_lock($job); -@@ -71,6 +83,7 @@ sub worker_body { - # get some normal priority jobs - # TODO also check for stale jobs in Netdisco DB - foreach my $job ( jq_getsome($num_slots) ) { -+ next if $seen_job{ $memoize->($job) }++; - - # mark job as running - next unless jq_lock($job); -@@ -81,6 +94,11 @@ sub worker_body { - $self->{queue}->enqueue($job); - } - -+ #if (scalar grep {$_ > 1} values %seen_job) { -+ # debug 'WARNING: saw duplicate jobs after getsome()'; -+ # use DDP; debug p %seen_job; -+ #} -+ - debug "mgr ($wid): sleeping now..."; - prctl sprintf 'nd2: #%s mgr: idle', $wid; - sleep( setting('workers')->{sleep_time} || 1 ); -diff --git a/lib/App/Netdisco/DB.pm b/lib/App/Netdisco/DB.pm -index 808b45b6..7e864c36 100644 ---- a/lib/App/Netdisco/DB.pm -+++ b/lib/App/Netdisco/DB.pm -@@ -11,7 +11,7 @@ __PACKAGE__->load_namespaces( - ); - - our # try to hide from kwalitee -- $VERSION = 44; # schema version used for upgrades, keep as integer -+ $VERSION = 45; # schema version used for upgrades, keep as integer - - use Path::Class; - use File::ShareDir 'dist_dir'; -diff --git a/lib/App/Netdisco/DB/Result/Admin.pm b/lib/App/Netdisco/DB/Result/Admin.pm -index 27e0cf9f..c3075c08 100644 ---- a/lib/App/Netdisco/DB/Result/Admin.pm -+++ b/lib/App/Netdisco/DB/Result/Admin.pm -@@ -46,6 +46,8 @@ __PACKAGE__->add_columns( - { data_type => "text", is_nullable => 1 }, - "debug", - { data_type => "boolean", is_nullable => 1 }, -+ "device_key", -+ { data_type => "text", is_nullable => 1 }, - ); - - -diff --git a/lib/App/Netdisco/DB/Result/Device.pm b/lib/App/Netdisco/DB/Result/Device.pm -index 3d3e0a74..e3a794d8 100644 ---- a/lib/App/Netdisco/DB/Result/Device.pm -+++ b/lib/App/Netdisco/DB/Result/Device.pm -@@ -238,6 +238,7 @@ sub renumber { - if $new_ip eq '0.0.0.0' - or $new_ip eq '127.0.0.1'; - -+ # Community is not included as SNMP::test_connection will take care of it - foreach my $set (qw/ - DeviceIp - DeviceModule -@@ -259,10 +260,6 @@ sub renumber { - ->search({remote_ip => $old_ip}) - ->update({remote_ip => $new_ip}); - -- $schema->resultset('Admin') -- ->search({device => $old_ip}) -- ->update({device => $new_ip}); -- - $schema->resultset('Node') - ->search({switch => $old_ip}) - ->update({switch => $new_ip}); -diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm -index d7b73d53..859e9380 100644 ---- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm -+++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm -@@ -13,11 +13,11 @@ use Try::Tiny; - use base 'Exporter'; - our @EXPORT = (); - our @EXPORT_OK = qw/ -+ jq_warm_thrusters - jq_getsome - jq_getsomep - jq_locked - jq_queued -- jq_warm_thrusters - jq_lock - jq_defer - jq_complete -@@ -28,6 +28,47 @@ our @EXPORT_OK = qw/ - /; - our %EXPORT_TAGS = ( all => \@EXPORT_OK ); - -+# given a device, tests if any of the primary acls applies -+# returns a list of job actions to be denied/skipped on this host. -+sub _get_denied_actions { -+ my $device = shift; -+ my @badactions = (); -+ return @badactions unless $device; -+ -+ push @badactions, ('discover', @{ setting('job_prio')->{high} }) -+ if not is_discoverable($device); -+ -+ push @badactions, (qw/macsuck nbtstat/) -+ if not is_macsuckable($device); -+ -+ push @badactions, 'arpnip' -+ if not is_arpnipable($device); -+ -+ return @badactions; -+} -+ -+sub jq_warm_thrusters { -+ my @devices = schema('netdisco')->resultset('Device')->all; -+ my $rs = schema('netdisco')->resultset('DeviceSkip'); -+ my %actionset = (); -+ -+ foreach my $d (@devices) { -+ my @badactions = _get_denied_actions($d); -+ $actionset{$d->ip} = \@badactions if scalar @badactions; -+ } -+ -+ schema('netdisco')->txn_do(sub { -+ $rs->search({ backend => setting('workers')->{'BACKEND'} })->delete; -+ $rs->populate([ -+ map {{ -+ backend => setting('workers')->{'BACKEND'}, -+ device => $_, -+ actionset => $actionset{$_}, -+ }} keys %actionset -+ ]); -+ }); -+} -+ - sub _getsome { - my ($num_slots, $where) = @_; - return () if ((!defined $num_slots) or ($num_slots < 1)); -@@ -46,8 +87,56 @@ sub _getsome { - - my @returned = (); - while (my $job = $rs->next) { -- push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); -+ if ($job->device) { -+ # need to handle device discovered since backend daemon started -+ # and the skiplist was primed. these should be checked against -+ # the various acls and have device_skip entry added if needed, -+ # and return false if it should have been skipped. -+ my @badactions = _get_denied_actions($job->device); -+ if (scalar @badactions) { -+ schema('netdisco')->resultset('DeviceSkip')->find_or_create({ -+ backend => setting('workers')->{'BACKEND'}, device => $job->device, -+ },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions); -+ -+ # will now not be selected in a future _getsome() -+ next if scalar grep {$_ eq $job->action} @badactions; -+ } -+ } -+ -+ # remove any duplicate jobs, incuding possibly this job if there -+ # is already an equivalent job running -+ -+ my %job_properties = ( -+ action => $job->action, -+ port => $job->port, -+ subaction => $job->subaction, -+ -or => [ -+ { device => $job->device }, -+ ($job->device_key ? ({ device_key => $job->device_key }) : ()), -+ ], -+ ); -+ -+ my $gone = $jobs->search({ -+ status => 'queued', -+ -and => [ -+ %job_properties, -+ -or => [{ -+ job => { '!=' => $job->id }, -+ },{ -+ job => $job->id, -+ -exists => $jobs->search({ -+ status => { -like => 'queued-%' }, -+ %job_properties, -+ })->as_query, -+ }], -+ ], -+ }, {for => 'update'}) -+ ->update({ status => 'error', log => (sprintf 'duplicate of %s', $job->id) }); -+ -+ debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id; -+ push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); - } -+ - return @returned; - } - -@@ -89,90 +178,17 @@ sub jq_queued { - })->get_column('device')->all; - } - --# given a device, tests if any of the primary acls applies --# returns a list of job actions to be denied/skipped on this host. --sub _get_denied_actions { -- my $device = shift; -- my @badactions = (); -- return @badactions unless $device; -- -- push @badactions, ('discover', @{ setting('job_prio')->{high} }) -- if not is_discoverable($device); -- -- push @badactions, (qw/macsuck nbtstat/) -- if not is_macsuckable($device); -- -- push @badactions, 'arpnip' -- if not is_arpnipable($device); -- -- return @badactions; --} -- --sub jq_warm_thrusters { -- my @devices = schema('netdisco')->resultset('Device')->all; -- my $rs = schema('netdisco')->resultset('DeviceSkip'); -- my %actionset = (); -- -- foreach my $d (@devices) { -- my @badactions = _get_denied_actions($d); -- $actionset{$d->ip} = \@badactions if scalar @badactions; -- } -- -- schema('netdisco')->txn_do(sub { -- $rs->search({ backend => setting('workers')->{'BACKEND'} })->delete; -- $rs->populate([ -- map {{ -- backend => setting('workers')->{'BACKEND'}, -- device => $_, -- actionset => $actionset{$_}, -- }} keys %actionset -- ]); -- }); --} -- - sub jq_lock { - my $job = shift; - my $happy = false; - -- if ($job->device) { -- # need to handle device discovered since backend daemon started -- # and the skiplist was primed. these should be checked against -- # the various acls and have device_skip entry added if needed, -- # and return false if it should have been skipped. -- my @badactions = _get_denied_actions($job->device); -- if (scalar @badactions) { -- schema('netdisco')->resultset('DeviceSkip')->find_or_create({ -- backend => setting('workers')->{'BACKEND'}, device => $job->device, -- },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions); -- -- return false if scalar grep {$_ eq $job->action} @badactions; -- } -- } -- - # lock db row and update to show job has been picked - try { -- schema('netdisco')->txn_do(sub { -- schema('netdisco')->resultset('Admin') -- ->search({ job => $job->id }, { for => 'update' }) -- ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) }); -- -- return unless -- schema('netdisco')->resultset('Admin') -- ->count({ job => $job->id, -- status => ('queued-'. setting('workers')->{'BACKEND'}) }); -- -- # remove any duplicate jobs, needed because we have race conditions -- # when queueing jobs of a type for all devices -- schema('netdisco')->resultset('Admin')->search({ -- status => 'queued', -- device => $job->device, -- port => $job->port, -- action => $job->action, -- subaction => $job->subaction, -- }, {for => 'update'})->delete(); -- -- $happy = true; -- }); -+ my $updated = schema('netdisco')->resultset('Admin') -+ ->search({ job => $job->id, status => 'queued' }, { for => 'update' }) -+ ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) }); -+ -+ $happy = true if $updated > 0; - } - catch { - error $_; -@@ -243,6 +259,7 @@ sub jq_complete { - $happy = true; - } - catch { -+ # use DDP; p $job; - error $_; - }; - -@@ -274,13 +291,14 @@ sub jq_insert { - schema('netdisco')->txn_do(sub { - schema('netdisco')->resultset('Admin')->populate([ - map {{ -- device => $_->{device}, -- port => $_->{port}, -- action => $_->{action}, -- subaction => ($_->{extra} || $_->{subaction}), -- username => $_->{username}, -- userip => $_->{userip}, -- status => 'queued', -+ device => $_->{device}, -+ device_key => $_->{device_key}, -+ port => $_->{port}, -+ action => $_->{action}, -+ subaction => ($_->{extra} || $_->{subaction}), -+ username => $_->{username}, -+ userip => $_->{userip}, -+ status => 'queued', - }} @$jobs - ]); - }); -diff --git a/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm b/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm -index 105f4185..c6335187 100644 ---- a/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm -+++ b/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm -@@ -72,6 +72,10 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub { - $device->renumber($new_ip) - or die "cannot renumber to: $new_ip"; # rollback - -+ # is not done in renumber but required otherwise confusing at job end! -+ schema('netdisco')->resultset('Admin') -+ ->find({job => $job->id})->update({device => $new_ip}); -+ - return Status->noop(sprintf ' [%s] device - changed IP to %s (%s)', - $old_ip, $device->ip, ($device->dns || '')); - }); -diff --git a/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm b/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm -index aecec0e9..a3b6b20a 100644 ---- a/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm -+++ b/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm -@@ -39,11 +39,13 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub { - or return Status->defer("discover failed: could not SNMP connect to $device"); - - my @to_discover = store_neighbors($device); -+ my %seen_id = (); - - # only enqueue if device is not already discovered, - # discover_* config permits the discovery - foreach my $neighbor (@to_discover) { -- my ($ip, $remote_type) = @$neighbor; -+ my ($ip, $remote_type, $remote_id) = @$neighbor; -+ next if $remote_id and $seen_id{ $remote_id }++; - - my $device = get_device($ip); - next if $device->in_storage; -@@ -55,10 +57,14 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub { - next; - } - -+ # risk of things going wrong...? -+ # https://quickview.cloudapps.cisco.com/quickview/bug/CSCur12254 -+ - jq_insert({ - device => $ip, - action => 'discover', - subaction => 'with-nodes', -+ ($remote_id ? (device_key => $remote_id) : ()), - }); - } - }); -@@ -171,7 +177,7 @@ sub store_neighbors { - # useable remote IP... - - if ($remote_ip eq '0.0.0.0' or -- check_acl_no($remote_ip, 'group:__LOCAL_ADDRESSES__')) { -+ check_acl_no($remote_ip, 'group:__LOCAL_ADDRESSES__')) { - - if ($remote_id) { - my $devices = schema('netdisco')->resultset('Device'); -@@ -228,7 +234,7 @@ sub store_neighbors { - debug sprintf - ' [%s] neigh - adding neighbor %s, type [%s], on %s to discovery queue', - $device->ip, $remote_ip, ($remote_type || ''), $port; -- push @to_discover, [$remote_ip, $remote_type]; -+ push @to_discover, [$remote_ip, $remote_type, $remote_id]; - - $remote_port = $c_port->{$entry}; - if (defined $remote_port) { -diff --git a/lib/App/Netdisco/Worker/Plugin/Expire.pm b/lib/App/Netdisco/Worker/Plugin/Expire.pm -index 28c93cc4..0625338d 100644 ---- a/lib/App/Netdisco/Worker/Plugin/Expire.pm -+++ b/lib/App/Netdisco/Worker/Plugin/Expire.pm -@@ -6,6 +6,7 @@ use aliased 'App::Netdisco::Worker::Status'; - - use Dancer::Plugin::DBIC 'schema'; - use App::Netdisco::Util::Statistics 'update_stats'; -+use App::Netdisco::DB::ExplicitLocking ':modes'; - - register_worker({ phase => 'main' }, sub { - my ($job, $workerconf) = @_; -@@ -40,7 +41,7 @@ register_worker({ phase => 'main' }, sub { - } - - if (setting('expire_jobs') and setting('expire_jobs') > 0) { -- schema('netdisco')->txn_do(sub { -+ schema('netdisco')->txn_do_locked('admin', 'EXCLUSIVE', sub { - schema('netdisco')->resultset('Admin')->search({ - entered => \[q/< (now() - ?::interval)/, - (setting('expire_jobs') * 86400)], -diff --git a/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql b/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql -new file mode 100644 -index 00000000..31e258e8 ---- /dev/null -+++ b/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql -@@ -0,0 +1,5 @@ -+BEGIN; -+ -+ALTER TABLE "admin" ADD "device_key" text; -+ -+COMMIT; - -commit 3db242cbe868e0672cba3b8ba1756e55cf46980c -Author: Oliver Gorwits -Date: Thu Nov 23 22:16:50 2017 +0000 - - support action::namespace for netdisco-do - -diff --git a/bin/netdisco-do b/bin/netdisco-do -index ab60f98e..859de29d 100755 ---- a/bin/netdisco-do -+++ b/bin/netdisco-do -@@ -109,7 +109,7 @@ my $exitstatus = 0; - - foreach my $host (@hostlist) { - my $dev = $host ? get_device($host->addr) : undef; -- if ($dev and not (blessed $dev and $dev->in_storage) and $action ne 'discover') { -+ if ($dev and not (blessed $dev and $dev->in_storage) and $action !~ m/^discover/) { - info sprintf "%s: error - Don't know device: %s", $action, $host->addr; - next; - } -@@ -139,7 +139,7 @@ foreach my $host (@hostlist) { - $job->log("error running job: $_"); - }; - -- if ($job->log eq 'failed to report from any worker!') { -+ if ($job->log eq 'failed to report from any worker!' and not $job->only_namespace) { - pod2usage( - -msg => (sprintf 'error: %s is not a valid action', $action), - -verbose => 2, -diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm -index 72850e8f..ddbfd034 100644 ---- a/lib/App/Netdisco/Backend/Job.pm -+++ b/lib/App/Netdisco/Backend/Job.pm -@@ -14,6 +14,7 @@ foreach my $slot (qw/ - device - port - action -+ only_namespace - subaction - status - username -@@ -36,6 +37,15 @@ has '_statuslist' => ( - default => sub { [] }, - ); - -+sub BUILD { -+ my ($job, $args) = @_; -+ -+ if ($job->action =~ m/^(\w+)::(\w+)$/i) { -+ $job->action($1); -+ $job->only_namespace($2); -+ } -+} -+ - =head1 METHODS - - =head2 summary -diff --git a/lib/App/Netdisco/Worker/Plugin.pm b/lib/App/Netdisco/Worker/Plugin.pm -index 68440f52..746d8de0 100644 ---- a/lib/App/Netdisco/Worker/Plugin.pm -+++ b/lib/App/Netdisco/Worker/Plugin.pm -@@ -36,6 +36,13 @@ register 'register_worker' => sub { - # check to see if this namespace has already passed at higher priority - return if $job->namespace_passed($workerconf); - -+ # support part-actions via action::namespace -+ if ($job->only_namespace and $workerconf->{phase} ne 'check') { -+ return unless $workerconf->{namespace} eq lc( $job->only_namespace ) -+ or (($workerconf->{phase} eq 'early') -+ and ($job->device and not $job->device->in_storage)); -+ } -+ - my @newuserconf = (); - my @userconf = @{ setting('device_auth') || [] }; - - -commit de594c647ff3e8d43afa69a1ce1bdfc54442e5c0 (HEAD -> master, origin/master, origin/HEAD) -Author: Oliver Gorwits -Date: Fri Nov 24 06:31:34 2017 +0000 - - single DB poll for new jobs both high and normal priority - -diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm -index ddbfd034..9eef998e 100644 ---- a/lib/App/Netdisco/Backend/Job.pm -+++ b/lib/App/Netdisco/Backend/Job.pm -@@ -21,6 +21,7 @@ foreach my $slot (qw/ - userip - log - device_key -+ job_priority - - _current_phase - _last_namespace -diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm -index d594df4d..3c6e7e00 100644 ---- a/lib/App/Netdisco/Backend/Role/Manager.pm -+++ b/lib/App/Netdisco/Backend/Role/Manager.pm -@@ -6,7 +6,7 @@ use List::Util 'sum'; - use App::Netdisco::Util::MCE; - - use App::Netdisco::JobQueue -- qw/jq_locked jq_getsome jq_getsomep jq_lock jq_warm_thrusters/; -+ qw/jq_locked jq_getsome jq_lock jq_warm_thrusters/; - - use Role::Tiny; - use namespace::clean; -@@ -60,28 +60,8 @@ sub worker_body { - - $num_slots = parse_max_workers( setting('workers')->{tasks} ) - - $self->{queue}->pending(); -- debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)"; -+ debug "mgr ($wid): getting potential jobs for $num_slots workers"; - -- # get some high priority jobs -- # TODO also check for stale jobs in Netdisco DB -- foreach my $job ( jq_getsomep($num_slots) ) { -- next if $seen_job{ $memoize->($job) }++; -- -- # mark job as running -- next unless jq_lock($job); -- info sprintf "mgr (%s): job %s booked out for this processing node", -- $wid, $job->id; -- -- # copy job to local queue -- $self->{queue}->enqueuep(100, $job); -- } -- -- $num_slots = parse_max_workers( setting('workers')->{tasks} ) -- - $self->{queue}->pending(); -- debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)"; -- -- # get some normal priority jobs -- # TODO also check for stale jobs in Netdisco DB - foreach my $job ( jq_getsome($num_slots) ) { - next if $seen_job{ $memoize->($job) }++; - -@@ -91,7 +71,7 @@ sub worker_body { - $wid, $job->id; - - # copy job to local queue -- $self->{queue}->enqueue($job); -+ $self->{queue}->enqueuep($job->job_priority, $job); - } - - #if (scalar grep {$_ > 1} values %seen_job) { -diff --git a/lib/App/Netdisco/DB/ResultSet.pm b/lib/App/Netdisco/DB/ResultSet.pm -index 953c8e80..22c25cf8 100644 ---- a/lib/App/Netdisco/DB/ResultSet.pm -+++ b/lib/App/Netdisco/DB/ResultSet.pm -@@ -6,7 +6,7 @@ use warnings; - use base 'DBIx::Class::ResultSet'; - - __PACKAGE__->load_components(qw/ -- Helper::ResultSet::SetOperations -+ +App::Netdisco::DB::SetOperations - Helper::ResultSet::Shortcut - Helper::ResultSet::CorrelateRelationship - /); -diff --git a/lib/App/Netdisco/DB/SetOperations.pm b/lib/App/Netdisco/DB/SetOperations.pm -new file mode 100644 -index 00000000..fef5efb4 ---- /dev/null -+++ b/lib/App/Netdisco/DB/SetOperations.pm -@@ -0,0 +1,50 @@ -+package App::Netdisco::DB::SetOperations; -+ -+use strict; -+use warnings; -+ -+use parent 'DBIx::Class::Helper::ResultSet::SetOperations'; -+ -+sub _set_operation { -+ my ( $self, $operation, $other ) = @_; -+ -+ my @sql; -+ my @params; -+ -+ my $as = $self->_resolved_attrs->{as}; -+ -+ my @operands = ( $self, ref $other eq 'ARRAY' ? @$other : $other ); -+ -+ for (@operands) { -+ $self->throw_exception("ResultClass of ResultSets do not match!") -+ unless $self->result_class eq $_->result_class; -+ -+ my $attrs = $_->_resolved_attrs; -+ -+ $self->throw_exception('ResultSets do not all have the same selected columns!') -+ unless $self->_compare_arrays($as, $attrs->{as}); -+ -+ my ($sql, @bind) = @{${$_->as_query}}; -+ # $sql =~ s/^\s*\((.*)\)\s*$/$1/; -+ $sql = q<(> . $sql . q<)>; -+ -+ push @sql, $sql; -+ push @params, @bind; -+ } -+ -+ my $query = q<(> . join(" $operation ", @sql). q<)>; -+ -+ my $attrs = $self->_resolved_attrs; -+ return $self->result_source->resultset->search(undef, { -+ alias => $self->current_source_alias, -+ from => [{ -+ $self->current_source_alias => \[ $query, @params ], -+ -alias => $self->current_source_alias, -+ -source_handle => $self->result_source->handle, -+ }], -+ columns => $attrs->{as}, -+ result_class => $self->result_class, -+ }); -+} -+ -+1; -diff --git a/lib/App/Netdisco/JobQueue.pm b/lib/App/Netdisco/JobQueue.pm -index 875ec468..733cf4bb 100644 ---- a/lib/App/Netdisco/JobQueue.pm -+++ b/lib/App/Netdisco/JobQueue.pm -@@ -9,16 +9,15 @@ Module::Load::load - use base 'Exporter'; - our @EXPORT = (); - our @EXPORT_OK = qw/ -+ jq_warm_thrusters - jq_getsome -- jq_getsomep - jq_locked - jq_queued -- jq_warm_thrusters -- jq_log -- jq_userlog - jq_lock - jq_defer - jq_complete -+ jq_log -+ jq_userlog - jq_insert - jq_delete - /; -@@ -43,10 +42,6 @@ Returns a list of randomly selected queued jobs. Default is to return one job, - unless C<$num> is provided. Jobs are returned as objects which implement the - Netdisco job instance interface (see below). - --=head2 jq_getsomep( $num? ) -- --Same as C but for high priority jobs. -- - =head2 jq_locked() - - Returns the list of jobs currently booked out to this processing node (denoted -diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm -index 859e9380..80405701 100644 ---- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm -+++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm -@@ -15,7 +15,6 @@ our @EXPORT = (); - our @EXPORT_OK = qw/ - jq_warm_thrusters - jq_getsome -- jq_getsomep - jq_locked - jq_queued - jq_lock -@@ -69,23 +68,49 @@ sub jq_warm_thrusters { - }); - } - --sub _getsome { -- my ($num_slots, $where) = @_; -- return () if ((!defined $num_slots) or ($num_slots < 1)); -- return () if ((!defined $where) or (ref {} ne ref $where)); -+sub jq_getsome { -+ my $num_slots = shift; -+ return () unless $num_slots and $num_slots > 0; - - my $jobs = schema('netdisco')->resultset('Admin'); -- my $rs = $jobs->search({ -+ my @returned = (); -+ -+ my %jobsearch = ( - status => 'queued', - device => { '-not_in' => - $jobs->skipped(setting('workers')->{'BACKEND'}, - setting('workers')->{'max_deferrals'}, - setting('workers')->{'retry_after'}) - ->columns('device')->as_query }, -- %$where, -- }, { order_by => 'random()', rows => $num_slots }); -+ ); -+ my %randoms = (order_by => 'random()', rows => $num_slots ); -+ -+ my $hiprio = $jobs->search({ -+ %jobsearch, -+ -or => [{ -+ username => { '!=' => undef }, -+ action => { -in => setting('job_prio')->{'normal'} }, -+ },{ -+ action => { -in => setting('job_prio')->{'high'} }, -+ }], -+ }, { -+ %randoms, -+ '+select' => [\'100 as job_priority'], '+as' => ['me.job_priority'], -+ }); -+ -+ my $loprio = $jobs->search({ -+ %jobsearch, -+ action => { -in => setting('job_prio')->{'normal'} }, -+ }, { -+ %randoms, -+ '+select' => [\'0 as job_priority'], '+as' => ['me.job_priority'], -+ }); -+ -+ my $rs = $hiprio->union($loprio)->search(undef, { -+ order_by => { '-desc' => 'job_priority' }, -+ rows => $num_slots, -+ }); - -- my @returned = (); - while (my $job = $rs->next) { - if ($job->device) { - # need to handle device discovered since backend daemon started -@@ -140,23 +165,6 @@ sub _getsome { - return @returned; - } - --sub jq_getsome { -- return _getsome(shift, -- { action => { -in => setting('job_prio')->{'normal'} } } -- ); --} -- --sub jq_getsomep { -- return _getsome(shift, { -- -or => [{ -- username => { '!=' => undef }, -- action => { -in => setting('job_prio')->{'normal'} }, -- },{ -- action => { -in => setting('job_prio')->{'high'} }, -- }], -- }); --} -- - sub jq_locked { - my @returned = (); - my $rs = schema('netdisco')->resultset('Admin') -- cgit v1.2.3