package yerbie.client;

import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import yerbie.autogenerated.YerbieAPI;
import yerbie.autogenerated.models.JobRequest;
import yerbie.exception.JobNotFoundException;
import yerbie.exception.SerializationException;
import yerbie.job.Job;
import yerbie.serde.DataTransformer;
import yerbie.serde.JobData;
import yerbie.serde.JobSpec;
import yerbie.serde.JobSpecTransformer;

/* loaded from: input_file:yerbie/client/YerbieConsumer.class */
public class YerbieConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(YerbieConsumer.class);
    private final ExecutorService jobExecutorService;
    private final YerbieAPI yerbieAPI;
    private final String queue;
    private final JobSpecTransformer jobSpecTransformer;
    private final DataTransformer dataTransformer;
    private final JobRepository jobRepository;
    private final RetryHandler retryHandler;
    private boolean processing = false;
    private final ScheduledExecutorService pollingExecutorService = Executors.newSingleThreadScheduledExecutor();

    public YerbieConsumer(ExecutorService executorService, YerbieAPI yerbieAPI, String str, JobSpecTransformer jobSpecTransformer, DataTransformer dataTransformer, JobRepository jobRepository, RetryHandler retryHandler) {
        this.jobExecutorService = executorService;
        this.yerbieAPI = yerbieAPI;
        this.queue = str;
        this.jobSpecTransformer = jobSpecTransformer;
        this.dataTransformer = dataTransformer;
        this.jobRepository = jobRepository;
        this.retryHandler = retryHandler;
    }

    public void start() {
        this.processing = true;
        this.pollingExecutorService.scheduleWithFixedDelay(() -> {
            try {
                executeJobsLoop();
            } catch (Throwable th) {
                LOGGER.error("Error executing jobs.", th);
            }
        }, 0L, 1L, TimeUnit.SECONDS);
    }

    public void shutdown() {
        this.processing = false;
        this.jobExecutorService.shutdown();
        this.pollingExecutorService.shutdown();
        try {
            if (!this.jobExecutorService.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.jobExecutorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            this.jobExecutorService.shutdownNow();
        }
        try {
            if (!this.pollingExecutorService.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.pollingExecutorService.shutdownNow();
            }
        } catch (InterruptedException e2) {
            this.pollingExecutorService.shutdownNow();
        }
    }

    private void runJob(Job<Object> job, JobData<?> jobData, JobRequest jobRequest) {
        job.run(jobData.getJobData());
        try {
            this.yerbieAPI.finishedJobAsync(jobRequest.getJobToken()).block();
        } catch (Exception e) {
            LOGGER.error("Error marking job as finished after it ran.", e);
        }
    }

    @VisibleForTesting
    protected boolean fetchAndSubmitOneJob() {
        JobRequest jobRequest = (JobRequest) this.yerbieAPI.reserveJobAsync(this.queue).block();
        if (jobRequest == null || jobRequest.getJobToken() == null) {
            LOGGER.debug("Received empty jobRequest from Yerbie.");
            return false;
        }
        JobSpec deserializeJobSpec = this.jobSpecTransformer.deserializeJobSpec(jobRequest.getJobData());
        try {
            JobData deserializeJobData = this.dataTransformer.getJobDataTransformer(deserializeJobSpec.getSerializationFormat()).deserializeJobData(deserializeJobSpec.getSerializedJobData(), Class.forName(deserializeJobSpec.getJobClass()));
            Job<Object> findJobForJobData = this.jobRepository.findJobForJobData(deserializeJobData.getJobData());
            LOGGER.info("Executing job:{} {} with jobData {}.", new Object[]{jobRequest.getJobToken(), findJobForJobData.getClass().getName(), deserializeJobData.getJobData()});
            this.jobExecutorService.submit(() -> {
                try {
                    runJob(findJobForJobData, deserializeJobData, jobRequest);
                } catch (Exception e) {
                    this.retryHandler.handleRetry(deserializeJobSpec, jobRequest, deserializeJobSpec.getCurrentRuns(), e);
                }
            });
            return true;
        } catch (ClassNotFoundException e) {
            LOGGER.error("Could not find class for job data {}. Retrying job according to retry policy.", jobRequest.getJobData(), e);
            this.retryHandler.handleRetry(deserializeJobSpec, jobRequest, deserializeJobSpec.getCurrentRuns(), e);
            return true;
        } catch (JobNotFoundException e2) {
            LOGGER.error("Could not find job for given job data. Retrying job according to retry policy.", e2);
            this.retryHandler.handleRetry(deserializeJobSpec, jobRequest, deserializeJobSpec.getCurrentRuns(), e2);
            return true;
        } catch (SerializationException e3) {
            LOGGER.error("Error deserializing job data {}. Retrying job according to retry policy.", jobRequest.getJobData(), e3);
            this.retryHandler.handleRetry(deserializeJobSpec, jobRequest, deserializeJobSpec.getCurrentRuns(), e3);
            return true;
        }
    }

    private void executeJobsLoop() {
        LOGGER.info("Polling for jobs to execute.");
        while (this.processing && fetchAndSubmitOneJob()) {
        }
    }
}
