I7Q3M7V32AUNVHKPCRAMSOIZXSTTOOPZ2PR3A2UF4J7KRDGIWWOQC The dispatcher's behavior is slightly different based onif the Task has an associated record:=over 1=item *If a task succeeds and there is no record, the Dispatcherassumes there is no further accounting of the task to bedone.=item *If a task succeeds and there is a record, the Dispatchercalls C<delete> on the record.=item *If a task fails and there is no record, the Dispatchercalls C<$store_task> with the Task as its only argument.It is the C<$store_task>'s responsibility to store thetask in some way for retrying.=item *If a task fails and there is a record, the Dispatchercalls C<requeue> on the record.=back
$prometheus->declare("notify_plugin_retry_success",type => "counter",help => "Number of successful executions of retried tasks.");$prometheus->declare("notify_plugin_drop",type => "counter",help => "Number of tasks that have been dropped after too many retries.");$prometheus->declare("notify_plugin_requeue",type => "counter",help => "Number of tasks that have been requeued after a failure.");
}}=head2 successMark a task's execution as successful.If the task has an associated record, the record is deleted.Arguments:=over 1=item C<$task>L<Hydra::Task> the task to mark as successful.=back=cutsub success {my ($self, $task) = @_;my $event_labels = $self->prom_labels_for_task($task);if (defined($task->{"record"})) {$self->{"prometheus"}->inc("notify_plugin_retry_sucess", $event_labels);$task->{"record"}->delete();
my $event_labels = $self->prom_labels_for_task($task);if (defined($task->{"record"})) {if ($task->{"record"}->{"attempts"} > 100) {$self->{"prometheus"}->inc("notify_plugin_drop", $event_labels);$task->{"record"}->delete();} else {$self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels);$task->{"record"}->requeue();}} else {$self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels);$self->{"store_task"}($task);}}=head2 prom_labels_for_taskGiven a specific task, return a hash of standard labels to record withPrometheus.Arguments:=over 1=item C<$task>L<Hydra::Task> the task to return labels for.=back=cutsub prom_labels_for_task {my ($self, $task) = @_;my $channel_name = $task->{"event"}->{'channel_name'};my $plugin_name = $task->{"plugin_name"};return {channel => $channel_name,plugin => $plugin_name,};}
}sub make_fake_record {my %attrs = @_;my $record = {"attempts" => $attrs{"attempts"} || 0,"requeued" => 0,"deleted" => 0};my $mock_record = mock_obj $record => (add => ["delete" => sub {my ($self, $db, $plugin) = @_;$self->{"deleted"} = 1;},"requeue" => sub {my ($self, $db, $plugin) = @_;$self->{"requeued"} = 1;}]);return $mock_record;
};subtest "a failed run without a record saves the task for later" => sub {my $db = "bogus db";my $record = make_fake_record();my $bogus_plugin = make_noop_plugin("bogus-1");my $task = {"event" => make_failing_event("fail-event"),"plugin_name" => ref $bogus_plugin,"record" => undef,};my $save_hook_called = 0;my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin],sub {$save_hook_called = 1;});$dispatcher->dispatch_task($task);is($save_hook_called, 1, "The record was requeued with the store hook.");};subtest "a successful run from a record deletes the record" => sub {my $db = "bogus db";my $record = make_fake_record();my $bogus_plugin = make_noop_plugin("bogus-1");my $task = {"event" => make_fake_event("success-event"),"plugin_name" => ref $bogus_plugin,"record" => $record,};my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);$dispatcher->dispatch_task($task);is($record->{"deleted"}, 1, "The record was deleted.");};subtest "a failed run from a record re-queues the task" => sub {my $db = "bogus db";my $record = make_fake_record();my $bogus_plugin = make_noop_plugin("bogus-1");my $task = {"event" => make_failing_event("fail-event"),"plugin_name" => ref $bogus_plugin,"record" => $record,};my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);$dispatcher->dispatch_task($task);is($record->{"requeued"}, 1, "The record was requeued.");
subtest "a failed run from a record with a lot of attempts deletes the task" => sub {my $db = "bogus db";my $record = make_fake_record(attempts => 101);my $bogus_plugin = make_noop_plugin("bogus-1");my $task = {"event" => make_failing_event("fail-event"),"plugin_name" => ref $bogus_plugin,"record" => $record,};my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);$dispatcher->dispatch_task($task);is($record->{"deleted"}, 1, "The record was deleted.");};