commit 4a51f83efc89412f1954b3cb32a3ee83ff354095 Author: Oliver Gorwits Date: Sun Nov 19 22:06:15 2017 +0000 fix detection of unknown action in netdisco-do diff --git a/bin/netdisco-do b/bin/netdisco-do index 1c5b0b43..ab60f98e 100755 --- a/bin/netdisco-do +++ b/bin/netdisco-do @@ -139,7 +139,7 @@ foreach my $host (@hostlist) { $job->log("error running job: $_"); }; - if ($job->log eq 'no worker succeeded during main phase') { + if ($job->log eq 'failed to report from any worker!') { pod2usage( -msg => (sprintf 'error: %s is not a valid action', $action), -verbose => 2, commit c576a755af61186ec46a0dc2576b43aff0ed903a Author: Oliver Gorwits Date: Tue Nov 21 10:00:53 2017 +0000 tweak log message diff --git a/lib/App/Netdisco/Worker/Runner.pm b/lib/App/Netdisco/Worker/Runner.pm index 2a81ab0c..334e9e97 100644 --- a/lib/App/Netdisco/Worker/Runner.pm +++ b/lib/App/Netdisco/Worker/Runner.pm @@ -76,7 +76,7 @@ sub run_workers { foreach my $worker (@{ $self->$set }) { try { $job->add_status( $worker->($job) ) } catch { - debug "=> $_" if $_; + debug "-> $_" if $_; $job->add_status( Status->error($_) ); }; } commit b694258a6580382c5a66864468e4e1b136709f79 Author: Oliver Gorwits Date: Tue Nov 21 10:01:09 2017 +0000 leave community rows in place diff --git a/lib/App/Netdisco/DB/Result/Device.pm b/lib/App/Netdisco/DB/Result/Device.pm index 6352e444..3d3e0a74 100644 --- a/lib/App/Netdisco/DB/Result/Device.pm +++ b/lib/App/Netdisco/DB/Result/Device.pm @@ -249,7 +249,6 @@ sub renumber { DevicePortSsid DevicePortVlan DevicePortWireless - Community /) { $schema->resultset($set) ->search({ip => $old_ip}) commit 1bbe8c916481b9e5dc0c2ec26c4d922a6dabd5f0 Merge: b694258a 4a51f83e Author: Oliver Gorwits Date: Tue Nov 21 10:02:57 2017 +0000 Merge branch 'master' of github.com:netdisco/netdisco commit 0bb15f36b9e8f374f995b5cfeaf493b74fb458e6 Author: Oliver Gorwits Date: Thu Nov 23 19:23:55 2017 +0000 fixes for race conditions and dupes in job queue we had situations where the manager would start workers on the same job, either because of race conditions or because at the time of queueing it wasn't known that the jobs were targeting the same device (due to device aliases). this commit removes duplicate jobs, reduces the need for locking on the job queue, and makes use of lldpRemChassisId to try to deduplicate jobs before they are started. in effect we have several goes to prevent duplicate jobs: 1. at neighbor discovery time we try to skip queueing same lldpRemChassisId 2. at job selection we 'error out' jobs with same profile as job selected 3. at job selection we check for running job with same profile as selected 4. the job manager process also checks for duplicate job profiles 5. at job lock we abort if the job was 'errored out' all together this seems to work well. a test on a large university network of 303 devices (four core routers and the rest edge routers, runing VRF with many duplicate identities), ~1200 subnets, ~50k hosts, resulted in no DB deadlock or contention and a complete discover+arpnip+macsuck (909 jobs) in ~3 minutes (with ~150 duplicate jobs identified and skipped). diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm index 7811b53f..72850e8f 100644 --- a/lib/App/Netdisco/Backend/Job.pm +++ b/lib/App/Netdisco/Backend/Job.pm @@ -19,6 +19,7 @@ foreach my $slot (qw/ username userip log + device_key _current_phase _last_namespace diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm index 25d13ba7..d594df4d 100644 --- a/lib/App/Netdisco/Backend/Role/Manager.pm +++ b/lib/App/Netdisco/Backend/Role/Manager.pm @@ -34,6 +34,16 @@ sub worker_begin { } } +# creates a 'signature' for each job so that we can check for duplicates ... +# it happens from time to time due to the distributed nature of the job queue +# and manager(s) - also kinder to the DB to skip here rather than jq_lock() +my $memoize = sub { + no warnings 'uninitialized'; + my $job = shift; + return join chr(28), map {$job->{$_}} + (qw/action port subaction/, ($job->{device_key} ? 'device_key' : 'device')); +}; + sub worker_body { my $self = shift; my $wid = $self->wid; @@ -46,6 +56,7 @@ sub worker_body { while (1) { prctl sprintf 'nd2: #%s mgr: gathering', $wid; my $num_slots = 0; + my %seen_job = (); $num_slots = parse_max_workers( setting('workers')->{tasks} ) - $self->{queue}->pending(); @@ -54,6 +65,7 @@ sub worker_body { # get some high priority jobs # TODO also check for stale jobs in Netdisco DB foreach my $job ( jq_getsomep($num_slots) ) { + next if $seen_job{ $memoize->($job) }++; # mark job as running next unless jq_lock($job); @@ -71,6 +83,7 @@ sub worker_body { # get some normal priority jobs # TODO also check for stale jobs in Netdisco DB foreach my $job ( jq_getsome($num_slots) ) { + next if $seen_job{ $memoize->($job) }++; # mark job as running next unless jq_lock($job); @@ -81,6 +94,11 @@ sub worker_body { $self->{queue}->enqueue($job); } + #if (scalar grep {$_ > 1} values %seen_job) { + # debug 'WARNING: saw duplicate jobs after getsome()'; + # use DDP; debug p %seen_job; + #} + debug "mgr ($wid): sleeping now..."; prctl sprintf 'nd2: #%s mgr: idle', $wid; sleep( setting('workers')->{sleep_time} || 1 ); diff --git a/lib/App/Netdisco/DB.pm b/lib/App/Netdisco/DB.pm index 808b45b6..7e864c36 100644 --- a/lib/App/Netdisco/DB.pm +++ b/lib/App/Netdisco/DB.pm @@ -11,7 +11,7 @@ __PACKAGE__->load_namespaces( ); our # try to hide from kwalitee - $VERSION = 44; # schema version used for upgrades, keep as integer + $VERSION = 45; # schema version used for upgrades, keep as integer use Path::Class; use File::ShareDir 'dist_dir'; diff --git a/lib/App/Netdisco/DB/Result/Admin.pm b/lib/App/Netdisco/DB/Result/Admin.pm index 27e0cf9f..c3075c08 100644 --- a/lib/App/Netdisco/DB/Result/Admin.pm +++ b/lib/App/Netdisco/DB/Result/Admin.pm @@ -46,6 +46,8 @@ __PACKAGE__->add_columns( { data_type => "text", is_nullable => 1 }, "debug", { data_type => "boolean", is_nullable => 1 }, + "device_key", + { data_type => "text", is_nullable => 1 }, ); diff --git a/lib/App/Netdisco/DB/Result/Device.pm b/lib/App/Netdisco/DB/Result/Device.pm index 3d3e0a74..e3a794d8 100644 --- a/lib/App/Netdisco/DB/Result/Device.pm +++ b/lib/App/Netdisco/DB/Result/Device.pm @@ -238,6 +238,7 @@ sub renumber { if $new_ip eq '0.0.0.0' or $new_ip eq '127.0.0.1'; + # Community is not included as SNMP::test_connection will take care of it foreach my $set (qw/ DeviceIp DeviceModule @@ -259,10 +260,6 @@ sub renumber { ->search({remote_ip => $old_ip}) ->update({remote_ip => $new_ip}); - $schema->resultset('Admin') - ->search({device => $old_ip}) - ->update({device => $new_ip}); - $schema->resultset('Node') ->search({switch => $old_ip}) ->update({switch => $new_ip}); diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm index d7b73d53..859e9380 100644 --- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm +++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -13,11 +13,11 @@ use Try::Tiny; use base 'Exporter'; our @EXPORT = (); our @EXPORT_OK = qw/ + jq_warm_thrusters jq_getsome jq_getsomep jq_locked jq_queued - jq_warm_thrusters jq_lock jq_defer jq_complete @@ -28,6 +28,47 @@ our @EXPORT_OK = qw/ /; our %EXPORT_TAGS = ( all => \@EXPORT_OK ); +# given a device, tests if any of the primary acls applies +# returns a list of job actions to be denied/skipped on this host. +sub _get_denied_actions { + my $device = shift; + my @badactions = (); + return @badactions unless $device; + + push @badactions, ('discover', @{ setting('job_prio')->{high} }) + if not is_discoverable($device); + + push @badactions, (qw/macsuck nbtstat/) + if not is_macsuckable($device); + + push @badactions, 'arpnip' + if not is_arpnipable($device); + + return @badactions; +} + +sub jq_warm_thrusters { + my @devices = schema('netdisco')->resultset('Device')->all; + my $rs = schema('netdisco')->resultset('DeviceSkip'); + my %actionset = (); + + foreach my $d (@devices) { + my @badactions = _get_denied_actions($d); + $actionset{$d->ip} = \@badactions if scalar @badactions; + } + + schema('netdisco')->txn_do(sub { + $rs->search({ backend => setting('workers')->{'BACKEND'} })->delete; + $rs->populate([ + map {{ + backend => setting('workers')->{'BACKEND'}, + device => $_, + actionset => $actionset{$_}, + }} keys %actionset + ]); + }); +} + sub _getsome { my ($num_slots, $where) = @_; return () if ((!defined $num_slots) or ($num_slots < 1)); @@ -46,8 +87,56 @@ sub _getsome { my @returned = (); while (my $job = $rs->next) { - push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); + if ($job->device) { + # need to handle device discovered since backend daemon started + # and the skiplist was primed. these should be checked against + # the various acls and have device_skip entry added if needed, + # and return false if it should have been skipped. + my @badactions = _get_denied_actions($job->device); + if (scalar @badactions) { + schema('netdisco')->resultset('DeviceSkip')->find_or_create({ + backend => setting('workers')->{'BACKEND'}, device => $job->device, + },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions); + + # will now not be selected in a future _getsome() + next if scalar grep {$_ eq $job->action} @badactions; + } + } + + # remove any duplicate jobs, incuding possibly this job if there + # is already an equivalent job running + + my %job_properties = ( + action => $job->action, + port => $job->port, + subaction => $job->subaction, + -or => [ + { device => $job->device }, + ($job->device_key ? ({ device_key => $job->device_key }) : ()), + ], + ); + + my $gone = $jobs->search({ + status => 'queued', + -and => [ + %job_properties, + -or => [{ + job => { '!=' => $job->id }, + },{ + job => $job->id, + -exists => $jobs->search({ + status => { -like => 'queued-%' }, + %job_properties, + })->as_query, + }], + ], + }, {for => 'update'}) + ->update({ status => 'error', log => (sprintf 'duplicate of %s', $job->id) }); + + debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id; + push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns }); } + return @returned; } @@ -89,90 +178,17 @@ sub jq_queued { })->get_column('device')->all; } -# given a device, tests if any of the primary acls applies -# returns a list of job actions to be denied/skipped on this host. -sub _get_denied_actions { - my $device = shift; - my @badactions = (); - return @badactions unless $device; - - push @badactions, ('discover', @{ setting('job_prio')->{high} }) - if not is_discoverable($device); - - push @badactions, (qw/macsuck nbtstat/) - if not is_macsuckable($device); - - push @badactions, 'arpnip' - if not is_arpnipable($device); - - return @badactions; -} - -sub jq_warm_thrusters { - my @devices = schema('netdisco')->resultset('Device')->all; - my $rs = schema('netdisco')->resultset('DeviceSkip'); - my %actionset = (); - - foreach my $d (@devices) { - my @badactions = _get_denied_actions($d); - $actionset{$d->ip} = \@badactions if scalar @badactions; - } - - schema('netdisco')->txn_do(sub { - $rs->search({ backend => setting('workers')->{'BACKEND'} })->delete; - $rs->populate([ - map {{ - backend => setting('workers')->{'BACKEND'}, - device => $_, - actionset => $actionset{$_}, - }} keys %actionset - ]); - }); -} - sub jq_lock { my $job = shift; my $happy = false; - if ($job->device) { - # need to handle device discovered since backend daemon started - # and the skiplist was primed. these should be checked against - # the various acls and have device_skip entry added if needed, - # and return false if it should have been skipped. - my @badactions = _get_denied_actions($job->device); - if (scalar @badactions) { - schema('netdisco')->resultset('DeviceSkip')->find_or_create({ - backend => setting('workers')->{'BACKEND'}, device => $job->device, - },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions); - - return false if scalar grep {$_ eq $job->action} @badactions; - } - } - # lock db row and update to show job has been picked try { - schema('netdisco')->txn_do(sub { - schema('netdisco')->resultset('Admin') - ->search({ job => $job->id }, { for => 'update' }) - ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) }); - - return unless - schema('netdisco')->resultset('Admin') - ->count({ job => $job->id, - status => ('queued-'. setting('workers')->{'BACKEND'}) }); - - # remove any duplicate jobs, needed because we have race conditions - # when queueing jobs of a type for all devices - schema('netdisco')->resultset('Admin')->search({ - status => 'queued', - device => $job->device, - port => $job->port, - action => $job->action, - subaction => $job->subaction, - }, {for => 'update'})->delete(); - - $happy = true; - }); + my $updated = schema('netdisco')->resultset('Admin') + ->search({ job => $job->id, status => 'queued' }, { for => 'update' }) + ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) }); + + $happy = true if $updated > 0; } catch { error $_; @@ -243,6 +259,7 @@ sub jq_complete { $happy = true; } catch { + # use DDP; p $job; error $_; }; @@ -274,13 +291,14 @@ sub jq_insert { schema('netdisco')->txn_do(sub { schema('netdisco')->resultset('Admin')->populate([ map {{ - device => $_->{device}, - port => $_->{port}, - action => $_->{action}, - subaction => ($_->{extra} || $_->{subaction}), - username => $_->{username}, - userip => $_->{userip}, - status => 'queued', + device => $_->{device}, + device_key => $_->{device_key}, + port => $_->{port}, + action => $_->{action}, + subaction => ($_->{extra} || $_->{subaction}), + username => $_->{username}, + userip => $_->{userip}, + status => 'queued', }} @$jobs ]); }); diff --git a/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm b/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm index 105f4185..c6335187 100644 --- a/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm +++ b/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm @@ -72,6 +72,10 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub { $device->renumber($new_ip) or die "cannot renumber to: $new_ip"; # rollback + # is not done in renumber but required otherwise confusing at job end! + schema('netdisco')->resultset('Admin') + ->find({job => $job->id})->update({device => $new_ip}); + return Status->noop(sprintf ' [%s] device - changed IP to %s (%s)', $old_ip, $device->ip, ($device->dns || '')); }); diff --git a/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm b/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm index aecec0e9..a3b6b20a 100644 --- a/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm +++ b/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm @@ -39,11 +39,13 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub { or return Status->defer("discover failed: could not SNMP connect to $device"); my @to_discover = store_neighbors($device); + my %seen_id = (); # only enqueue if device is not already discovered, # discover_* config permits the discovery foreach my $neighbor (@to_discover) { - my ($ip, $remote_type) = @$neighbor; + my ($ip, $remote_type, $remote_id) = @$neighbor; + next if $remote_id and $seen_id{ $remote_id }++; my $device = get_device($ip); next if $device->in_storage; @@ -55,10 +57,14 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub { next; } + # risk of things going wrong...? + # https://quickview.cloudapps.cisco.com/quickview/bug/CSCur12254 + jq_insert({ device => $ip, action => 'discover', subaction => 'with-nodes', + ($remote_id ? (device_key => $remote_id) : ()), }); } }); @@ -171,7 +177,7 @@ sub store_neighbors { # useable remote IP... if ($remote_ip eq '0.0.0.0' or - check_acl_no($remote_ip, 'group:__LOCAL_ADDRESSES__')) { + check_acl_no($remote_ip, 'group:__LOCAL_ADDRESSES__')) { if ($remote_id) { my $devices = schema('netdisco')->resultset('Device'); @@ -228,7 +234,7 @@ sub store_neighbors { debug sprintf ' [%s] neigh - adding neighbor %s, type [%s], on %s to discovery queue', $device->ip, $remote_ip, ($remote_type || ''), $port; - push @to_discover, [$remote_ip, $remote_type]; + push @to_discover, [$remote_ip, $remote_type, $remote_id]; $remote_port = $c_port->{$entry}; if (defined $remote_port) { diff --git a/lib/App/Netdisco/Worker/Plugin/Expire.pm b/lib/App/Netdisco/Worker/Plugin/Expire.pm index 28c93cc4..0625338d 100644 --- a/lib/App/Netdisco/Worker/Plugin/Expire.pm +++ b/lib/App/Netdisco/Worker/Plugin/Expire.pm @@ -6,6 +6,7 @@ use aliased 'App::Netdisco::Worker::Status'; use Dancer::Plugin::DBIC 'schema'; use App::Netdisco::Util::Statistics 'update_stats'; +use App::Netdisco::DB::ExplicitLocking ':modes'; register_worker({ phase => 'main' }, sub { my ($job, $workerconf) = @_; @@ -40,7 +41,7 @@ register_worker({ phase => 'main' }, sub { } if (setting('expire_jobs') and setting('expire_jobs') > 0) { - schema('netdisco')->txn_do(sub { + schema('netdisco')->txn_do_locked('admin', 'EXCLUSIVE', sub { schema('netdisco')->resultset('Admin')->search({ entered => \[q/< (now() - ?::interval)/, (setting('expire_jobs') * 86400)], diff --git a/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql b/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql new file mode 100644 index 00000000..31e258e8 --- /dev/null +++ b/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql @@ -0,0 +1,5 @@ +BEGIN; + +ALTER TABLE "admin" ADD "device_key" text; + +COMMIT; commit 3db242cbe868e0672cba3b8ba1756e55cf46980c Author: Oliver Gorwits Date: Thu Nov 23 22:16:50 2017 +0000 support action::namespace for netdisco-do diff --git a/bin/netdisco-do b/bin/netdisco-do index ab60f98e..859de29d 100755 --- a/bin/netdisco-do +++ b/bin/netdisco-do @@ -109,7 +109,7 @@ my $exitstatus = 0; foreach my $host (@hostlist) { my $dev = $host ? get_device($host->addr) : undef; - if ($dev and not (blessed $dev and $dev->in_storage) and $action ne 'discover') { + if ($dev and not (blessed $dev and $dev->in_storage) and $action !~ m/^discover/) { info sprintf "%s: error - Don't know device: %s", $action, $host->addr; next; } @@ -139,7 +139,7 @@ foreach my $host (@hostlist) { $job->log("error running job: $_"); }; - if ($job->log eq 'failed to report from any worker!') { + if ($job->log eq 'failed to report from any worker!' and not $job->only_namespace) { pod2usage( -msg => (sprintf 'error: %s is not a valid action', $action), -verbose => 2, diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm index 72850e8f..ddbfd034 100644 --- a/lib/App/Netdisco/Backend/Job.pm +++ b/lib/App/Netdisco/Backend/Job.pm @@ -14,6 +14,7 @@ foreach my $slot (qw/ device port action + only_namespace subaction status username @@ -36,6 +37,15 @@ has '_statuslist' => ( default => sub { [] }, ); +sub BUILD { + my ($job, $args) = @_; + + if ($job->action =~ m/^(\w+)::(\w+)$/i) { + $job->action($1); + $job->only_namespace($2); + } +} + =head1 METHODS =head2 summary diff --git a/lib/App/Netdisco/Worker/Plugin.pm b/lib/App/Netdisco/Worker/Plugin.pm index 68440f52..746d8de0 100644 --- a/lib/App/Netdisco/Worker/Plugin.pm +++ b/lib/App/Netdisco/Worker/Plugin.pm @@ -36,6 +36,13 @@ register 'register_worker' => sub { # check to see if this namespace has already passed at higher priority return if $job->namespace_passed($workerconf); + # support part-actions via action::namespace + if ($job->only_namespace and $workerconf->{phase} ne 'check') { + return unless $workerconf->{namespace} eq lc( $job->only_namespace ) + or (($workerconf->{phase} eq 'early') + and ($job->device and not $job->device->in_storage)); + } + my @newuserconf = (); my @userconf = @{ setting('device_auth') || [] }; commit de594c647ff3e8d43afa69a1ce1bdfc54442e5c0 (HEAD -> master, origin/master, origin/HEAD) Author: Oliver Gorwits Date: Fri Nov 24 06:31:34 2017 +0000 single DB poll for new jobs both high and normal priority diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm index ddbfd034..9eef998e 100644 --- a/lib/App/Netdisco/Backend/Job.pm +++ b/lib/App/Netdisco/Backend/Job.pm @@ -21,6 +21,7 @@ foreach my $slot (qw/ userip log device_key + job_priority _current_phase _last_namespace diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm index d594df4d..3c6e7e00 100644 --- a/lib/App/Netdisco/Backend/Role/Manager.pm +++ b/lib/App/Netdisco/Backend/Role/Manager.pm @@ -6,7 +6,7 @@ use List::Util 'sum'; use App::Netdisco::Util::MCE; use App::Netdisco::JobQueue - qw/jq_locked jq_getsome jq_getsomep jq_lock jq_warm_thrusters/; + qw/jq_locked jq_getsome jq_lock jq_warm_thrusters/; use Role::Tiny; use namespace::clean; @@ -60,28 +60,8 @@ sub worker_body { $num_slots = parse_max_workers( setting('workers')->{tasks} ) - $self->{queue}->pending(); - debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)"; + debug "mgr ($wid): getting potential jobs for $num_slots workers"; - # get some high priority jobs - # TODO also check for stale jobs in Netdisco DB - foreach my $job ( jq_getsomep($num_slots) ) { - next if $seen_job{ $memoize->($job) }++; - - # mark job as running - next unless jq_lock($job); - info sprintf "mgr (%s): job %s booked out for this processing node", - $wid, $job->id; - - # copy job to local queue - $self->{queue}->enqueuep(100, $job); - } - - $num_slots = parse_max_workers( setting('workers')->{tasks} ) - - $self->{queue}->pending(); - debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)"; - - # get some normal priority jobs - # TODO also check for stale jobs in Netdisco DB foreach my $job ( jq_getsome($num_slots) ) { next if $seen_job{ $memoize->($job) }++; @@ -91,7 +71,7 @@ sub worker_body { $wid, $job->id; # copy job to local queue - $self->{queue}->enqueue($job); + $self->{queue}->enqueuep($job->job_priority, $job); } #if (scalar grep {$_ > 1} values %seen_job) { diff --git a/lib/App/Netdisco/DB/ResultSet.pm b/lib/App/Netdisco/DB/ResultSet.pm index 953c8e80..22c25cf8 100644 --- a/lib/App/Netdisco/DB/ResultSet.pm +++ b/lib/App/Netdisco/DB/ResultSet.pm @@ -6,7 +6,7 @@ use warnings; use base 'DBIx::Class::ResultSet'; __PACKAGE__->load_components(qw/ - Helper::ResultSet::SetOperations + +App::Netdisco::DB::SetOperations Helper::ResultSet::Shortcut Helper::ResultSet::CorrelateRelationship /); diff --git a/lib/App/Netdisco/DB/SetOperations.pm b/lib/App/Netdisco/DB/SetOperations.pm new file mode 100644 index 00000000..fef5efb4 --- /dev/null +++ b/lib/App/Netdisco/DB/SetOperations.pm @@ -0,0 +1,50 @@ +package App::Netdisco::DB::SetOperations; + +use strict; +use warnings; + +use parent 'DBIx::Class::Helper::ResultSet::SetOperations'; + +sub _set_operation { + my ( $self, $operation, $other ) = @_; + + my @sql; + my @params; + + my $as = $self->_resolved_attrs->{as}; + + my @operands = ( $self, ref $other eq 'ARRAY' ? @$other : $other ); + + for (@operands) { + $self->throw_exception("ResultClass of ResultSets do not match!") + unless $self->result_class eq $_->result_class; + + my $attrs = $_->_resolved_attrs; + + $self->throw_exception('ResultSets do not all have the same selected columns!') + unless $self->_compare_arrays($as, $attrs->{as}); + + my ($sql, @bind) = @{${$_->as_query}}; + # $sql =~ s/^\s*\((.*)\)\s*$/$1/; + $sql = q<(> . $sql . q<)>; + + push @sql, $sql; + push @params, @bind; + } + + my $query = q<(> . join(" $operation ", @sql). q<)>; + + my $attrs = $self->_resolved_attrs; + return $self->result_source->resultset->search(undef, { + alias => $self->current_source_alias, + from => [{ + $self->current_source_alias => \[ $query, @params ], + -alias => $self->current_source_alias, + -source_handle => $self->result_source->handle, + }], + columns => $attrs->{as}, + result_class => $self->result_class, + }); +} + +1; diff --git a/lib/App/Netdisco/JobQueue.pm b/lib/App/Netdisco/JobQueue.pm index 875ec468..733cf4bb 100644 --- a/lib/App/Netdisco/JobQueue.pm +++ b/lib/App/Netdisco/JobQueue.pm @@ -9,16 +9,15 @@ Module::Load::load use base 'Exporter'; our @EXPORT = (); our @EXPORT_OK = qw/ + jq_warm_thrusters jq_getsome - jq_getsomep jq_locked jq_queued - jq_warm_thrusters - jq_log - jq_userlog jq_lock jq_defer jq_complete + jq_log + jq_userlog jq_insert jq_delete /; @@ -43,10 +42,6 @@ Returns a list of randomly selected queued jobs. Default is to return one job, unless C<$num> is provided. Jobs are returned as objects which implement the Netdisco job instance interface (see below). -=head2 jq_getsomep( $num? ) - -Same as C but for high priority jobs. - =head2 jq_locked() Returns the list of jobs currently booked out to this processing node (denoted diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm index 859e9380..80405701 100644 --- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm +++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm @@ -15,7 +15,6 @@ our @EXPORT = (); our @EXPORT_OK = qw/ jq_warm_thrusters jq_getsome - jq_getsomep jq_locked jq_queued jq_lock @@ -69,23 +68,49 @@ sub jq_warm_thrusters { }); } -sub _getsome { - my ($num_slots, $where) = @_; - return () if ((!defined $num_slots) or ($num_slots < 1)); - return () if ((!defined $where) or (ref {} ne ref $where)); +sub jq_getsome { + my $num_slots = shift; + return () unless $num_slots and $num_slots > 0; my $jobs = schema('netdisco')->resultset('Admin'); - my $rs = $jobs->search({ + my @returned = (); + + my %jobsearch = ( status => 'queued', device => { '-not_in' => $jobs->skipped(setting('workers')->{'BACKEND'}, setting('workers')->{'max_deferrals'}, setting('workers')->{'retry_after'}) ->columns('device')->as_query }, - %$where, - }, { order_by => 'random()', rows => $num_slots }); + ); + my %randoms = (order_by => 'random()', rows => $num_slots ); + + my $hiprio = $jobs->search({ + %jobsearch, + -or => [{ + username => { '!=' => undef }, + action => { -in => setting('job_prio')->{'normal'} }, + },{ + action => { -in => setting('job_prio')->{'high'} }, + }], + }, { + %randoms, + '+select' => [\'100 as job_priority'], '+as' => ['me.job_priority'], + }); + + my $loprio = $jobs->search({ + %jobsearch, + action => { -in => setting('job_prio')->{'normal'} }, + }, { + %randoms, + '+select' => [\'0 as job_priority'], '+as' => ['me.job_priority'], + }); + + my $rs = $hiprio->union($loprio)->search(undef, { + order_by => { '-desc' => 'job_priority' }, + rows => $num_slots, + }); - my @returned = (); while (my $job = $rs->next) { if ($job->device) { # need to handle device discovered since backend daemon started @@ -140,23 +165,6 @@ sub _getsome { return @returned; } -sub jq_getsome { - return _getsome(shift, - { action => { -in => setting('job_prio')->{'normal'} } } - ); -} - -sub jq_getsomep { - return _getsome(shift, { - -or => [{ - username => { '!=' => undef }, - action => { -in => setting('job_prio')->{'normal'} }, - },{ - action => { -in => setting('job_prio')->{'high'} }, - }], - }); -} - sub jq_locked { my @returned = (); my $rs = schema('netdisco')->resultset('Admin')