package com.hpe.caf.worker.jobtracking;

import com.hpe.caf.api.Codec;
import com.hpe.caf.api.CodecException;
import com.hpe.caf.api.ConfigurationException;
import com.hpe.caf.api.ConfigurationSource;
import com.hpe.caf.api.DecodeMethod;
import com.hpe.caf.api.HealthResult;
import com.hpe.caf.api.HealthStatus;
import com.hpe.caf.api.worker.BulkWorker;
import com.hpe.caf.api.worker.BulkWorkerRuntime;
import com.hpe.caf.api.worker.DataStore;
import com.hpe.caf.api.worker.InvalidTaskException;
import com.hpe.caf.api.worker.TaskFailedException;
import com.hpe.caf.api.worker.TaskInformation;
import com.hpe.caf.api.worker.TaskMessage;
import com.hpe.caf.api.worker.TaskMessageForwardingEvaluator;
import com.hpe.caf.api.worker.TaskRejectedException;
import com.hpe.caf.api.worker.TaskSourceInfo;
import com.hpe.caf.api.worker.TaskStatus;
import com.hpe.caf.api.worker.TrackingInfo;
import com.hpe.caf.api.worker.Worker;
import com.hpe.caf.api.worker.WorkerCallback;
import com.hpe.caf.api.worker.WorkerException;
import com.hpe.caf.api.worker.WorkerFactory;
import com.hpe.caf.api.worker.WorkerResponse;
import com.hpe.caf.api.worker.WorkerTask;
import com.hpe.caf.api.worker.WorkerTaskData;
import com.hpe.caf.services.job.util.JobTaskId;
import com.hpe.caf.worker.tracking.report.TrackingReportTask;
import jakarta.validation.Validation;
import jakarta.validation.Validator;
import jakarta.validation.constraints.NotNull;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hpe/caf/worker/jobtracking/JobTrackingWorkerFactory.class */
public class JobTrackingWorkerFactory implements WorkerFactory, TaskMessageForwardingEvaluator, BulkWorker {
    private static final Logger LOG;
    private final DataStore dataStore;
    private final JobTrackingWorkerConfiguration configuration;
    private final Codec codec;
    private final Class<JobTrackingWorkerTask> taskClass = (Class) Objects.requireNonNull(JobTrackingWorkerTask.class);

    @NotNull
    private JobTrackingReporter reporter;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hpe/caf/worker/jobtracking/JobTrackingWorkerFactory$TaskValidator.class */
    public static class TaskValidator {
        private static final Validator validator = Validation.buildDefaultValidatorFactory().getValidator();

        private TaskValidator() {
        }

        public static <T> T deserialiseAndValidateTask(Codec codec, Class<T> cls, byte[] bArr) throws InvalidTaskException {
            try {
                T t = (T) codec.deserialise(bArr, cls, DecodeMethod.STRICT);
                if (t == null) {
                    throw new InvalidTaskException("Invalid input message: no result from deserialisation");
                }
                Set validate = validator.validate(t, new Class[0]);
                if (validate.size() <= 0) {
                    return t;
                }
                JobTrackingWorkerFactory.LOG.error("Task of type {} failed validation due to: {}", cls.getSimpleName(), validate);
                throw new InvalidTaskException("Task failed validation");
            } catch (CodecException e) {
                throw new InvalidTaskException("Invalid input message", e);
            }
        }
    }

    public JobTrackingWorkerFactory(ConfigurationSource configurationSource, DataStore dataStore, Codec codec) throws WorkerException {
        this.codec = (Codec) Objects.requireNonNull(codec);
        this.dataStore = (DataStore) Objects.requireNonNull(dataStore);
        try {
            this.configuration = (JobTrackingWorkerConfiguration) configurationSource.getConfiguration(JobTrackingWorkerConfiguration.class);
            this.reporter = createReporter();
        } catch (ConfigurationException e) {
            throw new WorkerException("Failed to create worker factory", e);
        }
    }

    public JobTrackingWorkerFactory(ConfigurationSource configurationSource, DataStore dataStore, Codec codec, JobTrackingReporter jobTrackingReporter) throws WorkerException {
        this.codec = (Codec) Objects.requireNonNull(codec);
        this.dataStore = (DataStore) Objects.requireNonNull(dataStore);
        try {
            this.configuration = (JobTrackingWorkerConfiguration) configurationSource.getConfiguration(JobTrackingWorkerConfiguration.class);
            this.reporter = jobTrackingReporter;
        } catch (ConfigurationException e) {
            throw new WorkerException("Failed to create worker factory", e);
        }
    }

    public final Worker getWorker(WorkerTaskData workerTaskData) throws TaskRejectedException, InvalidTaskException {
        LOG.debug("Starting a single job...");
        String classifier = workerTaskData.getClassifier();
        boolean z = -1;
        switch (classifier.hashCode()) {
            case -937029102:
                if (classifier.equals("JobTrackingWorker")) {
                    z = true;
                    break;
                }
                break;
            case -10730736:
                if (classifier.equals("TrackingReportTask")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return createWorker(getTrackingReportTask(workerTaskData), workerTaskData);
            case true:
                return createWorker(getJobTrackingWorkerTask(workerTaskData), workerTaskData);
            default:
                throw new InvalidTaskException("Task of type " + classifier + " found on queue for JobTrackingWorker");
        }
    }

    private TrackingReportTask getTrackingReportTask(WorkerTaskData workerTaskData) throws TaskRejectedException, InvalidTaskException {
        return (TrackingReportTask) TaskValidator.deserialiseAndValidateTask(this.codec, TrackingReportTask.class, validateVersionAndData(workerTaskData, 1));
    }

    private JobTrackingWorkerTask getJobTrackingWorkerTask(WorkerTaskData workerTaskData) throws TaskRejectedException, InvalidTaskException {
        return (JobTrackingWorkerTask) TaskValidator.deserialiseAndValidateTask(this.codec, JobTrackingWorkerTask.class, validateVersionAndData(workerTaskData, 1));
    }

    private static byte[] validateVersionAndData(WorkerTaskData workerTaskData, int i) throws InvalidTaskException, TaskRejectedException {
        int version = workerTaskData.getVersion();
        if (i < version) {
            throw new TaskRejectedException("Found task version " + version + ", which is newer than " + i);
        }
        byte[] data = workerTaskData.getData();
        if (data == null) {
            throw new InvalidTaskException("Invalid input message: task not specified");
        }
        return data;
    }

    protected String getWorkerName() {
        return "JobTrackingWorker";
    }

    protected int getWorkerApiVersion() {
        return 1;
    }

    private Worker createWorker(JobTrackingWorkerTask jobTrackingWorkerTask, WorkerTaskData workerTaskData) throws TaskRejectedException, InvalidTaskException {
        return new JobTrackingWorker(jobTrackingWorkerTask, this.configuration.getOutputQueue(), this.codec, this.reporter, workerTaskData);
    }

    private Worker createWorker(TrackingReportTask trackingReportTask, WorkerTaskData workerTaskData) throws TaskRejectedException, InvalidTaskException {
        return new JobTrackingReportUpdateWorker(trackingReportTask, workerTaskData, this.configuration.getOutputQueue(), this.codec, this.reporter);
    }

    public String getInvalidTaskQueue() {
        return this.configuration.getOutputQueue();
    }

    public int getWorkerThreads() {
        return this.configuration.getThreads();
    }

    public HealthResult healthCheck() {
        try {
            return new JobTrackingWorkerHealthCheck(this.reporter).healthCheck();
        } catch (Exception e) {
            return new HealthResult(HealthStatus.UNHEALTHY, "Failed to perform Job Tracking Worker health check. " + e.getMessage());
        }
    }

    public void determineForwardingAction(TaskMessage taskMessage, TaskInformation taskInformation, Map<String, Object> map, WorkerCallback workerCallback) {
        List<JobTrackingWorkerDependency> reportProxiedTask = reportProxiedTask(taskMessage, map);
        if (!reportProxiedTask.isEmpty()) {
            try {
                forwardAvailableJobs(reportProxiedTask, workerCallback, taskMessage.getTracking().getTrackingPipe(), taskMessage.getCorrelationId(), taskInformation);
            } catch (Exception e) {
                LOG.error("Failed to create dependent jobs.");
                throw new RuntimeException("Failed to create dependent jobs.", e);
            }
        }
        LOG.debug("Forwarding task {}", taskMessage.getTaskId());
        workerCallback.forward(taskInformation, taskMessage.getTo(), taskMessage, map);
    }

    private List<JobTrackingWorkerDependency> reportProxiedTask(TaskMessage taskMessage, Map<String, Object> map) {
        TrackingInfo tracking;
        try {
            tracking = taskMessage.getTracking();
        } catch (JobReportingException e) {
            LOG.warn("Error reporting task {} progress to the Job Database: ", taskMessage.getTaskId(), e);
        }
        if (tracking == null) {
            LOG.warn("Cannot report job task progress for task {} - the task message has no tracking info", taskMessage.getTaskId());
            return Collections.emptyList();
        }
        String jobTaskId = tracking.getJobTaskId();
        if (jobTaskId == null) {
            LOG.warn("Cannot report job task progress for task {} - the tracking info has no jobTaskId", taskMessage.getTaskId());
            return Collections.emptyList();
        }
        TaskStatus taskStatus = taskMessage.getTaskStatus();
        if (taskStatus == TaskStatus.NEW_TASK || taskStatus == TaskStatus.RESULT_SUCCESS || taskStatus == TaskStatus.RESULT_FAILURE) {
            String trackTo = tracking.getTrackTo();
            String to = taskMessage.getTo();
            if ((to == null && trackTo == null) || (trackTo != null && trackTo.equalsIgnoreCase(to))) {
                return this.reporter.reportJobTaskComplete(jobTaskId);
            }
            this.reporter.reportJobTaskProgress(jobTaskId, 0);
            return Collections.emptyList();
        }
        if (taskStatus == TaskStatus.RESULT_EXCEPTION || taskStatus == TaskStatus.INVALID_TASK) {
            JobTrackingWorkerFailure jobTrackingWorkerFailure = new JobTrackingWorkerFailure();
            jobTrackingWorkerFailure.setFailureId(taskStatus.toString());
            jobTrackingWorkerFailure.setFailureTime(new Date());
            jobTrackingWorkerFailure.setFailureSource(getWorkerName(taskMessage));
            byte[] taskData = taskMessage.getTaskData();
            if (taskData != null) {
                jobTrackingWorkerFailure.setFailureMessage(new String(taskData, StandardCharsets.UTF_8));
            }
            this.reporter.reportJobTaskRejected(jobTaskId, jobTrackingWorkerFailure);
            return Collections.emptyList();
        }
        boolean z = map.getOrDefault("x-caf-worker-rejected", null) != null;
        int parseInt = Integer.parseInt(String.valueOf(map.getOrDefault("x-caf-worker-retry", "0")));
        if (z) {
            String format = MessageFormat.format("{0}. Execution of this job task was retried {1} times.", String.valueOf(map.get("x-caf-worker-rejected")), Integer.valueOf(parseInt));
            JobTrackingWorkerFailure jobTrackingWorkerFailure2 = new JobTrackingWorkerFailure();
            jobTrackingWorkerFailure2.setFailureId("x-caf-worker-rejected");
            jobTrackingWorkerFailure2.setFailureTime(new Date());
            jobTrackingWorkerFailure2.setFailureSource(getWorkerName(taskMessage));
            jobTrackingWorkerFailure2.setFailureMessage(format);
            this.reporter.reportJobTaskRejected(jobTaskId, jobTrackingWorkerFailure2);
        } else {
            this.reporter.reportJobTaskRetry(jobTaskId, MessageFormat.format("This job task encountered a problem and will be retried. This will be retry attempt number {0} for this job task.", Integer.valueOf(parseInt)));
        }
        return Collections.emptyList();
    }

    private JobTrackingReporter createReporter() throws TaskRejectedException {
        try {
            return new JobTrackingWorkerReporter();
        } catch (JobReportingException e) {
            throw new TaskRejectedException("Failed to create Job Database reporter for Job Tracking Worker. ", e);
        }
    }

    private static String getWorkerName(TaskMessage taskMessage) {
        TaskSourceInfo sourceInfo = taskMessage.getSourceInfo();
        if (sourceInfo == null) {
            return "Unknown - no source info";
        }
        String name = sourceInfo.getName();
        return name == null ? "Unknown - worker name not set" : name;
    }

    private void forwardAvailableJobs(List<JobTrackingWorkerDependency> list, WorkerCallback workerCallback, String str, String str2, TaskInformation taskInformation) {
        try {
            Iterator<JobTrackingWorkerDependency> it = list.iterator();
            while (it.hasNext()) {
                workerCallback.send(taskInformation, JobTrackingWorkerUtil.createDependentJobTaskMessage(it.next(), str, str2));
            }
        } catch (Exception e) {
            LOG.error("Error retrieving Dependent Job Info from the Job Service Database", e);
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:5:0x004e. Please report as an issue. */
    public void processTasks(BulkWorkerRuntime bulkWorkerRuntime) throws InterruptedException {
        LOG.debug("Starting a bulk job...");
        long currentTimeMillis = System.currentTimeMillis() + JobTrackingWorkerUtil.getMaxBatchTime();
        TreeMap<FullyQualifiedJobId, List<CompletedWorkerTaskEntity>> treeMap = new TreeMap<>();
        while (true) {
            WorkerTask nextWorkerTask = bulkWorkerRuntime.getNextWorkerTask(currentTimeMillis - System.currentTimeMillis());
            if (nextWorkerTask != null) {
                String classifier = nextWorkerTask.getClassifier();
                boolean z = -1;
                switch (classifier.hashCode()) {
                    case -937029102:
                        if (classifier.equals("JobTrackingWorker")) {
                            z = true;
                            break;
                        }
                        break;
                    case -10730736:
                        if (classifier.equals("TrackingReportTask")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        processTrackingReportTask(nextWorkerTask, treeMap);
                        break;
                    case true:
                        processJobTrackingWorkerTask(nextWorkerTask);
                        break;
                    default:
                        nextWorkerTask.setResponse(new InvalidTaskException("Task of type " + classifier + " found on queue for Job Tracking Worker"));
                        break;
                }
            } else {
                LOG.info("Size of bulkItemList: {}", Integer.valueOf(treeMap.size()));
                processCompletedTrackingReports(treeMap);
                return;
            }
        }
    }

    private void processTrackingReportTask(WorkerTask workerTask, TreeMap<FullyQualifiedJobId, List<CompletedWorkerTaskEntity>> treeMap) throws InterruptedException {
        try {
            processTrackingReportTaskImpl(workerTask, treeMap);
        } catch (TaskRejectedException e) {
            workerTask.setResponse(e);
        } catch (InvalidTaskException e2) {
            workerTask.setResponse(e2);
        }
    }

    private void processTrackingReportTaskImpl(WorkerTask workerTask, TreeMap<FullyQualifiedJobId, List<CompletedWorkerTaskEntity>> treeMap) throws InvalidTaskException, InterruptedException, TaskRejectedException {
        boolean hasNext;
        TrackingReportTask trackingReportTask = getTrackingReportTask(workerTask);
        ArrayList arrayList = new ArrayList();
        WorkerResponse doWork = new JobTrackingReportUpdateWorker(trackingReportTask, workerTask, this.configuration.getOutputQueue(), this.codec, new JobTrackingReporterPartialProxy(this.reporter, arrayList)).doWork();
        if (doWork.getTaskStatus() != TaskStatus.RESULT_SUCCESS || arrayList.isEmpty()) {
            workerTask.setResponse(doWork);
            return;
        }
        Iterator it = ((TreeMap) arrayList.stream().map(JobTaskId::fromMessageId).collect(Collectors.groupingBy(jobTaskId -> {
            return new FullyQualifiedJobId(jobTaskId.getPartitionId(), jobTaskId.getJobId());
        }, TreeMap::new, Collectors.toList()))).entrySet().iterator();
        do {
            Map.Entry entry = (Map.Entry) it.next();
            hasNext = it.hasNext();
            mergeIntoBulkItemList(treeMap, workerTask, (FullyQualifiedJobId) entry.getKey(), (List) entry.getValue(), !hasNext);
        } while (hasNext);
    }

    private static void mergeIntoBulkItemList(TreeMap<FullyQualifiedJobId, List<CompletedWorkerTaskEntity>> treeMap, WorkerTask workerTask, FullyQualifiedJobId fullyQualifiedJobId, List<JobTaskId> list, boolean z) {
        treeMap.merge(fullyQualifiedJobId, Collections.singletonList(new CompletedWorkerTaskEntity(workerTask, list, z)), (list2, list3) -> {
            if (!$assertionsDisabled && (list2 == null || list2.isEmpty())) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && (list3 == null || list3.size() != 1)) {
                throw new AssertionError();
            }
            if (list2 instanceof ArrayList) {
                list2.add((CompletedWorkerTaskEntity) list3.get(0));
                return list2;
            }
            if (!$assertionsDisabled && list2.size() != 1) {
                throw new AssertionError();
            }
            ArrayList arrayList = new ArrayList();
            arrayList.add((CompletedWorkerTaskEntity) list2.get(0));
            arrayList.add((CompletedWorkerTaskEntity) list3.get(0));
            return arrayList;
        });
    }

    private void processCompletedTrackingReports(TreeMap<FullyQualifiedJobId, List<CompletedWorkerTaskEntity>> treeMap) {
        Iterator<Map.Entry<FullyQualifiedJobId, List<CompletedWorkerTaskEntity>>> it = treeMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<FullyQualifiedJobId, List<CompletedWorkerTaskEntity>> next = it.next();
            FullyQualifiedJobId key = next.getKey();
            LOG.debug("partition: {}; job: {}", key.getPartitionId(), key.getJobId());
            List<CompletedWorkerTaskEntity> value = next.getValue();
            try {
                List<JobTrackingWorkerDependency> reportJobTasksComplete = this.reporter.reportJobTasksComplete(key.getPartitionId(), key.getJobId(), (List) value.stream().flatMap(completedWorkerTaskEntity -> {
                    return completedWorkerTaskEntity.getCompletedTaskIds().stream().map((v0) -> {
                        return v0.getId();
                    });
                }).collect(Collectors.toList()));
                if (reportJobTasksComplete != null && !reportJobTasksComplete.isEmpty()) {
                    WorkerTask workerTask = value.get(0).getWorkerTask();
                    Iterator<JobTrackingWorkerDependency> it2 = reportJobTasksComplete.iterator();
                    while (it2.hasNext()) {
                        workerTask.sendMessage(JobTrackingWorkerUtil.createDependentJobTaskMessage(it2.next(), workerTask.getTo(), workerTask.getCorrelationId()));
                    }
                }
                value.stream().filter((v0) -> {
                    return v0.isFinalJob();
                }).map((v0) -> {
                    return v0.getWorkerTask();
                }).forEach(JobTrackingWorkerFactory::setWorkerResultSuccess);
            } catch (JobReportingException e) {
                byte[] failureData = getFailureData(e);
                failRemainingCompletedTrackingReports(value, it, workerTask2 -> {
                    setWorkerResultFailure(workerTask2, failureData);
                });
                return;
            } catch (JobReportingTransientException e2) {
                String message = e2.getMessage();
                failRemainingCompletedTrackingReports(value, it, workerTask3 -> {
                    setWorkerResultTransientFailure(workerTask3, message);
                });
                return;
            }
        }
    }

    private static void failRemainingCompletedTrackingReports(List<CompletedWorkerTaskEntity> list, Iterator<Map.Entry<FullyQualifiedJobId, List<CompletedWorkerTaskEntity>>> it, Consumer<WorkerTask> consumer) {
        List<CompletedWorkerTaskEntity> list2 = list;
        while (true) {
            list2.stream().filter((v0) -> {
                return v0.isFinalJob();
            }).map((v0) -> {
                return v0.getWorkerTask();
            }).forEach(consumer);
            if (!it.hasNext()) {
                return;
            } else {
                list2 = it.next().getValue();
            }
        }
    }

    private static void setWorkerResultSuccess(WorkerTask workerTask) {
        workerTask.setResponse(new WorkerResponse((String) null, TaskStatus.RESULT_SUCCESS, new byte[0], "JobTrackingWorker", 1, (byte[]) null));
    }

    private byte[] getFailureData(JobReportingException jobReportingException) {
        try {
            return this.codec.serialise(JobTrackingWorkerUtil.createErrorResult(JobTrackingWorkerStatus.PROGRESS_UPDATE_FAILED, jobReportingException.getMessage()));
        } catch (CodecException e) {
            throw new TaskFailedException("Failed to serialise result", e);
        }
    }

    private void setWorkerResultFailure(WorkerTask workerTask, byte[] bArr) {
        workerTask.setResponse(new WorkerResponse(this.configuration.getOutputQueue(), TaskStatus.RESULT_FAILURE, bArr, "JobTrackingWorker", 1, (byte[]) null));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void setWorkerResultTransientFailure(WorkerTask workerTask, String str) {
        workerTask.setResponse(new TaskRejectedException(str));
    }

    private void processJobTrackingWorkerTask(WorkerTask workerTask) throws InterruptedException {
        try {
            workerTask.setResponse(new JobTrackingWorker(getJobTrackingWorkerTask(workerTask), this.configuration.getOutputQueue(), this.codec, this.reporter, workerTask).doWork());
        } catch (TaskRejectedException e) {
            LOG.warn("Invalid input message version", e);
            workerTask.setResponse(e);
        } catch (InvalidTaskException e2) {
            LOG.warn("Invalid input message data", e2);
            workerTask.setResponse(e2);
        }
    }

    static {
        $assertionsDisabled = !JobTrackingWorkerFactory.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(JobTrackingWorkerFactory.class);
    }
}
