package io.camunda.client.impl.worker;

import io.camunda.client.api.response.ActivatedJob;
import io.camunda.client.api.worker.BackoffSupplier;
import io.camunda.client.api.worker.JobWorker;
import io.camunda.client.api.worker.JobWorkerMetrics;
import io.camunda.client.impl.Loggers;
import java.io.Closeable;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/client/impl/worker/JobWorkerImpl.class */
public final class JobWorkerImpl implements JobWorker, Closeable {
    public static final String ERROR_MSG = "Expected to handle received job with key {}, but the worker reached maximum capacity (maxJobsActive). The job activation timed out (controllable by timeout parameter). It will get reactivated shortly. If this issue persist, make sure to either scale your workers, threads, increase maxJobsActive or reduce the load you want to work on. ";
    private static final BackoffSupplier DEFAULT_BACKOFF_SUPPLIER = JobWorkerBuilderImpl.DEFAULT_BACKOFF_SUPPLIER;
    private static final Logger LOG = Loggers.JOB_WORKER_LOGGER;
    private static final String SUPPLY_RETRY_DELAY_FAILURE_MESSAGE = "Expected to supply retry delay, but an exception was thrown. Falling back to default backoff supplier";
    private final int maxJobsActive;
    private final int activationThreshold;
    private final Executor executor;
    private final JobRunnableFactory jobHandlerFactory;
    private final long initialPollInterval;
    private final JobStreamer jobStreamer;
    private final BackoffSupplier backoffSupplier;
    private final JobWorkerMetrics metrics;
    private final AtomicReference<JobPoller> claimableJobPoller;
    private volatile long pollInterval;
    private final ScheduledExecutorService scheduledExecutorService;
    private final AtomicBoolean acquiringJobs = new AtomicBoolean(true);
    private final AtomicBoolean isPollScheduled = new AtomicBoolean(false);
    private final AtomicInteger remainingJobs = new AtomicInteger(0);

    public JobWorkerImpl(int i, ScheduledExecutorService scheduledExecutorService, Duration duration, JobRunnableFactory jobRunnableFactory, JobPoller jobPoller, JobStreamer jobStreamer, BackoffSupplier backoffSupplier, JobWorkerMetrics jobWorkerMetrics, Executor executor) {
        this.maxJobsActive = i;
        this.activationThreshold = Math.round(i * 0.3f);
        this.executor = executor;
        this.scheduledExecutorService = scheduledExecutorService;
        this.jobHandlerFactory = jobRunnableFactory;
        this.jobStreamer = jobStreamer;
        this.initialPollInterval = duration.toMillis();
        this.backoffSupplier = backoffSupplier;
        this.metrics = jobWorkerMetrics;
        this.claimableJobPoller = new AtomicReference<>(jobPoller);
        this.pollInterval = this.initialPollInterval;
        openStream();
        schedulePoll();
    }

    private void openStream() {
        this.jobStreamer.openStreamer(this::handleStreamedJob);
    }

    @Override // io.camunda.client.api.worker.JobWorker
    public boolean isOpen() {
        return this.acquiringJobs.get();
    }

    @Override // io.camunda.client.api.worker.JobWorker
    public boolean isClosed() {
        return (isOpen() || this.claimableJobPoller.get() == null || this.remainingJobs.get() > 0) ? false : true;
    }

    @Override // io.camunda.client.api.worker.JobWorker, java.lang.AutoCloseable
    public void close() {
        this.acquiringJobs.set(false);
        this.jobStreamer.close();
    }

    private void schedulePoll() {
        if (this.isPollScheduled.compareAndSet(false, true)) {
            this.scheduledExecutorService.schedule(this::onScheduledPoll, this.pollInterval, TimeUnit.MILLISECONDS);
        }
    }

    private void onScheduledPoll() {
        this.isPollScheduled.set(false);
        if (shouldPoll(this.remainingJobs.get())) {
            tryPoll();
        }
    }

    private boolean shouldPoll(int i) {
        return this.acquiringJobs.get() && i <= this.activationThreshold;
    }

    private void tryPoll() {
        tryClaimJobPoller().ifPresent(jobPoller -> {
            try {
                poll(jobPoller);
            } catch (Exception e) {
                LOG.warn("Unexpected failure to activate jobs", e);
                onPollError(jobPoller, e);
            }
        });
    }

    private Optional<JobPoller> tryClaimJobPoller() {
        return Optional.ofNullable(this.claimableJobPoller.getAndSet(null));
    }

    private void releaseJobPoller(JobPoller jobPoller) {
        this.claimableJobPoller.set(jobPoller);
    }

    private void poll(JobPoller jobPoller) {
        int i = this.remainingJobs.get();
        if (shouldPoll(i)) {
            jobPoller.poll(this.maxJobsActive - i, this::handleJob, i2 -> {
                onPollSuccess(jobPoller, i2);
            }, th -> {
                onPollError(jobPoller, th);
            }, this::isOpen);
            return;
        }
        LOG.trace("Expected to activate for jobs, but still enough remain. Reschedule poll.");
        releaseJobPoller(jobPoller);
        schedulePoll();
    }

    private void onPollSuccess(JobPoller jobPoller, int i) {
        releaseJobPoller(jobPoller);
        int addAndGet = this.remainingJobs.addAndGet(i);
        if (this.jobStreamer.isOpen() && i == 0) {
            backoff(jobPoller);
            LOG.trace("No jobs to activate via polling, will backoff and poll in {}", Long.valueOf(this.pollInterval));
        } else {
            this.pollInterval = this.initialPollInterval;
            if (addAndGet <= 0) {
                schedulePoll();
            }
        }
    }

    private void onPollError(JobPoller jobPoller, Throwable th) {
        backoff(jobPoller);
        LOG.debug("Failed to activate jobs due to {}, delay retry for {} ms", th.getMessage(), Long.valueOf(this.pollInterval));
    }

    private void backoff(JobPoller jobPoller) {
        getPollInterval();
        releaseJobPoller(jobPoller);
        schedulePoll();
    }

    private void getPollInterval() {
        long j = this.pollInterval;
        try {
            this.pollInterval = this.backoffSupplier.supplyRetryDelay(j);
        } catch (Exception e) {
            LOG.warn(SUPPLY_RETRY_DELAY_FAILURE_MESSAGE, e);
            this.pollInterval = DEFAULT_BACKOFF_SUPPLIER.supplyRetryDelay(j);
        }
    }

    private void handleJob(ActivatedJob activatedJob) {
        handleActivatedJob(activatedJob, this::handleJobFinished);
    }

    private void handleStreamedJob(ActivatedJob activatedJob) {
        handleActivatedJob(activatedJob, this::handleStreamJobFinished);
    }

    private void handleActivatedJob(ActivatedJob activatedJob, Runnable runnable) {
        this.metrics.jobActivated(1);
        try {
            this.executor.execute(this.jobHandlerFactory.create(activatedJob, runnable));
        } catch (RejectedExecutionException e) {
            if (isClosed()) {
                return;
            }
            if (!this.scheduledExecutorService.isShutdown() && !this.scheduledExecutorService.isTerminated()) {
                LOG.warn("Expected to handle received job with key {}, but the worker reached maximum capacity (maxJobsActive). The job activation timed out (controllable by timeout parameter). It will get reactivated shortly. If this issue persist, make sure to either scale your workers, threads, increase maxJobsActive or reduce the load you want to work on. ", Long.valueOf(activatedJob.getKey()), e);
            } else {
                LOG.warn("Underlying executor was closed before the worker. Closing the worker now.", e);
                close();
            }
        }
    }

    private void handleJobFinished() {
        int decrementAndGet = this.remainingJobs.decrementAndGet();
        if (!this.isPollScheduled.get() && shouldPoll(decrementAndGet)) {
            tryPoll();
        }
        this.metrics.jobHandled(1);
    }

    private void handleStreamJobFinished() {
        this.metrics.jobHandled(1);
    }
}
