package io.camunda.zeebe.client.impl.worker;

import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.worker.BackoffSupplier;
import io.camunda.zeebe.client.api.worker.JobWorker;
import io.camunda.zeebe.client.impl.Loggers;
import java.io.Closeable;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/zeebe-client-java-1.2.4.jar:io/camunda/zeebe/client/impl/worker/JobWorkerImpl.class */
public final class JobWorkerImpl implements JobWorker, Closeable {
    private static final BackoffSupplier DEFAULT_BACKOFF_SUPPLIER = JobWorkerBuilderImpl.DEFAULT_BACKOFF_SUPPLIER;
    private static final Logger LOG = Loggers.JOB_WORKER_LOGGER;
    private static final String SUPPLY_RETRY_DELAY_FAILURE_MESSAGE = "Expected to supply retry delay, but an exception was thrown. Falling back to default backoff supplier";
    private final int maxJobsActive;
    private final int activationThreshold;
    private final ScheduledExecutorService executor;
    private final JobRunnableFactory jobHandlerFactory;
    private final long initialPollInterval;
    private final BackoffSupplier backoffSupplier;
    private final AtomicReference<JobPoller> claimableJobPoller;
    private volatile long pollInterval;
    private final AtomicBoolean acquiringJobs = new AtomicBoolean(true);
    private final AtomicBoolean isPollScheduled = new AtomicBoolean(false);
    private final AtomicInteger remainingJobs = new AtomicInteger(0);

    public JobWorkerImpl(int i, ScheduledExecutorService scheduledExecutorService, Duration duration, JobRunnableFactory jobRunnableFactory, JobPoller jobPoller, BackoffSupplier backoffSupplier) {
        this.maxJobsActive = i;
        this.activationThreshold = Math.round(i * 0.3f);
        this.executor = scheduledExecutorService;
        this.jobHandlerFactory = jobRunnableFactory;
        this.initialPollInterval = duration.toMillis();
        this.backoffSupplier = backoffSupplier;
        this.claimableJobPoller = new AtomicReference<>(jobPoller);
        this.pollInterval = this.initialPollInterval;
        schedulePoll();
    }

    @Override // io.camunda.zeebe.client.api.worker.JobWorker
    public boolean isOpen() {
        return this.acquiringJobs.get();
    }

    @Override // io.camunda.zeebe.client.api.worker.JobWorker
    public boolean isClosed() {
        return (isOpen() || this.claimableJobPoller.get() == null || this.remainingJobs.get() > 0) ? false : true;
    }

    @Override // io.camunda.zeebe.client.api.worker.JobWorker, java.lang.AutoCloseable
    public void close() {
        this.acquiringJobs.set(false);
    }

    private void schedulePoll() {
        if (this.isPollScheduled.compareAndSet(false, true)) {
            this.executor.schedule(this::onScheduledPoll, this.pollInterval, TimeUnit.MILLISECONDS);
        }
    }

    private void onScheduledPoll() {
        this.isPollScheduled.set(false);
        if (shouldPoll(this.remainingJobs.get())) {
            tryPoll();
        }
    }

    private boolean shouldPoll(int i) {
        return this.acquiringJobs.get() && i <= this.activationThreshold;
    }

    private void tryPoll() {
        tryClaimJobPoller().ifPresent(jobPoller -> {
            try {
                poll(jobPoller);
            } catch (Exception e) {
                LOG.warn("Unexpected failure to activate jobs", (Throwable) e);
                backoff(jobPoller, e);
            }
        });
    }

    private Optional<JobPoller> tryClaimJobPoller() {
        return Optional.ofNullable(this.claimableJobPoller.getAndSet(null));
    }

    private void releaseJobPoller(JobPoller jobPoller) {
        this.claimableJobPoller.set(jobPoller);
    }

    private void poll(JobPoller jobPoller) {
        int i = this.remainingJobs.get();
        if (shouldPoll(i)) {
            jobPoller.poll(this.maxJobsActive - i, this::handleJob, i2 -> {
                onPollSuccess(jobPoller, i2);
            }, th -> {
                onPollError(jobPoller, th);
            }, this::isOpen);
            return;
        }
        LOG.trace("Expected to activate for jobs, but still enough remain. Reschedule poll.");
        releaseJobPoller(jobPoller);
        schedulePoll();
    }

    private void onPollSuccess(JobPoller jobPoller, int i) {
        releaseJobPoller(jobPoller);
        int addAndGet = this.remainingJobs.addAndGet(i);
        this.pollInterval = this.initialPollInterval;
        if (addAndGet <= 0) {
            schedulePoll();
        }
    }

    private void onPollError(JobPoller jobPoller, Throwable th) {
        backoff(jobPoller, th);
    }

    private void backoff(JobPoller jobPoller, Throwable th) {
        long j = this.pollInterval;
        try {
            this.pollInterval = this.backoffSupplier.supplyRetryDelay(j);
        } catch (Exception e) {
            LOG.warn(SUPPLY_RETRY_DELAY_FAILURE_MESSAGE, (Throwable) e);
            this.pollInterval = DEFAULT_BACKOFF_SUPPLIER.supplyRetryDelay(j);
        }
        LOG.debug("Failed to activate jobs due to {}, delay retry for {} ms", th.getMessage(), Long.valueOf(this.pollInterval));
        releaseJobPoller(jobPoller);
        schedulePoll();
    }

    private void handleJob(ActivatedJob activatedJob) {
        this.executor.execute(this.jobHandlerFactory.create(activatedJob, this::handleJobFinished));
    }

    private void handleJobFinished() {
        int decrementAndGet = this.remainingJobs.decrementAndGet();
        if (this.isPollScheduled.get() || !shouldPoll(decrementAndGet)) {
            return;
        }
        tryPoll();
    }
}
