/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.client.impl.worker;

import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.worker.BackoffSupplier;
import io.camunda.zeebe.client.api.worker.JobWorker;
import io.camunda.zeebe.client.api.worker.JobWorkerMetrics;
import io.camunda.zeebe.client.impl.Loggers;
import io.camunda.zeebe.client.impl.worker.JobPoller;
import io.camunda.zeebe.client.impl.worker.JobRunnableFactory;
import io.camunda.zeebe.client.impl.worker.JobStreamer;
import io.camunda.zeebe.client.impl.worker.JobWorkerBuilderImpl;
import java.io.Closeable;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;

public final class JobWorkerImpl
implements JobWorker,
Closeable {
    public static final String ERROR_MSG = "Expected to handle received job with key {}, but the worker reached maximum capacity (maxJobsActive). The job activation timed out (controllable by timeout parameter). It will get reactivated shortly. If this issue persist, make sure to either scale your workers, threads, increase maxJobsActive or reduce the load you want to work on. ";
    private static final BackoffSupplier DEFAULT_BACKOFF_SUPPLIER = JobWorkerBuilderImpl.DEFAULT_BACKOFF_SUPPLIER;
    private static final Logger LOG = Loggers.JOB_WORKER_LOGGER;
    private static final String SUPPLY_RETRY_DELAY_FAILURE_MESSAGE = "Expected to supply retry delay, but an exception was thrown. Falling back to default backoff supplier";
    private final int maxJobsActive;
    private final int activationThreshold;
    private final AtomicInteger remainingJobs;
    private final Executor executor;
    private final JobRunnableFactory jobHandlerFactory;
    private final long initialPollInterval;
    private final JobStreamer jobStreamer;
    private final BackoffSupplier backoffSupplier;
    private final JobWorkerMetrics metrics;
    private final AtomicBoolean acquiringJobs = new AtomicBoolean(true);
    private final AtomicReference<JobPoller> claimableJobPoller;
    private final AtomicBoolean isPollScheduled = new AtomicBoolean(false);
    private volatile long pollInterval;
    private final ScheduledExecutorService scheduledExecutorService;

    public JobWorkerImpl(int maxJobsActive, ScheduledExecutorService executor, Duration pollInterval, JobRunnableFactory jobHandlerFactory, JobPoller jobPoller, JobStreamer jobStreamer, BackoffSupplier backoffSupplier, JobWorkerMetrics metrics, Executor jobExecutor) {
        this.maxJobsActive = maxJobsActive;
        this.activationThreshold = Math.round((float)maxJobsActive * 0.3f);
        this.remainingJobs = new AtomicInteger(0);
        this.executor = jobExecutor;
        this.scheduledExecutorService = executor;
        this.jobHandlerFactory = jobHandlerFactory;
        this.jobStreamer = jobStreamer;
        this.initialPollInterval = pollInterval.toMillis();
        this.backoffSupplier = backoffSupplier;
        this.metrics = metrics;
        this.claimableJobPoller = new AtomicReference<JobPoller>(jobPoller);
        this.pollInterval = this.initialPollInterval;
        this.openStream();
        this.schedulePoll();
    }

    private void openStream() {
        this.jobStreamer.openStreamer(this::handleStreamedJob);
    }

    @Override
    public boolean isOpen() {
        return this.acquiringJobs.get();
    }

    @Override
    public boolean isClosed() {
        return !this.isOpen() && this.claimableJobPoller.get() != null && this.remainingJobs.get() <= 0;
    }

    @Override
    public void close() {
        this.acquiringJobs.set(false);
        this.jobStreamer.close();
    }

    private void schedulePoll() {
        if (this.isPollScheduled.compareAndSet(false, true)) {
            this.scheduledExecutorService.schedule(this::onScheduledPoll, this.pollInterval, TimeUnit.MILLISECONDS);
        }
    }

    private void onScheduledPoll() {
        this.isPollScheduled.set(false);
        int actualRemainingJobs = this.remainingJobs.get();
        if (this.shouldPoll(actualRemainingJobs)) {
            this.tryPoll();
        }
    }

    private boolean shouldPoll(int remainingJobs) {
        return this.acquiringJobs.get() && remainingJobs <= this.activationThreshold;
    }

    private void tryPoll() {
        this.tryClaimJobPoller().ifPresent(poller -> {
            try {
                this.poll((JobPoller)poller);
            }
            catch (Exception error) {
                LOG.warn("Unexpected failure to activate jobs", (Throwable)error);
                this.onPollError((JobPoller)poller, error);
            }
        });
    }

    private Optional<JobPoller> tryClaimJobPoller() {
        return Optional.ofNullable(this.claimableJobPoller.getAndSet(null));
    }

    private void releaseJobPoller(JobPoller jobPoller) {
        this.claimableJobPoller.set(jobPoller);
    }

    private void poll(JobPoller jobPoller) {
        int actualRemainingJobs = this.remainingJobs.get();
        if (!this.shouldPoll(actualRemainingJobs)) {
            LOG.trace("Expected to activate for jobs, but still enough remain. Reschedule poll.");
            this.releaseJobPoller(jobPoller);
            this.schedulePoll();
            return;
        }
        int maxJobsToActivate = this.maxJobsActive - actualRemainingJobs;
        jobPoller.poll(maxJobsToActivate, this::handleJob, activatedJobs -> this.onPollSuccess(jobPoller, activatedJobs), error -> this.onPollError(jobPoller, (Throwable)error), this::isOpen);
    }

    private void onPollSuccess(JobPoller jobPoller, int activatedJobs) {
        this.releaseJobPoller(jobPoller);
        int actualRemainingJobs = this.remainingJobs.addAndGet(activatedJobs);
        if (this.jobStreamer.isOpen() && activatedJobs == 0) {
            this.backoff(jobPoller);
            LOG.trace("No jobs to activate via polling, will backoff and poll in {}", (Object)this.pollInterval);
        } else {
            this.pollInterval = this.initialPollInterval;
            if (actualRemainingJobs <= 0) {
                this.schedulePoll();
            }
        }
    }

    private void onPollError(JobPoller jobPoller, Throwable error) {
        this.backoff(jobPoller);
        LOG.debug("Failed to activate jobs due to {}, delay retry for {} ms", (Object)error.getMessage(), (Object)this.pollInterval);
    }

    private void backoff(JobPoller jobPoller) {
        this.getPollInterval();
        this.releaseJobPoller(jobPoller);
        this.schedulePoll();
    }

    private void getPollInterval() {
        long prevInterval = this.pollInterval;
        try {
            this.pollInterval = this.backoffSupplier.supplyRetryDelay(prevInterval);
        }
        catch (Exception e) {
            LOG.warn(SUPPLY_RETRY_DELAY_FAILURE_MESSAGE, (Throwable)e);
            this.pollInterval = DEFAULT_BACKOFF_SUPPLIER.supplyRetryDelay(prevInterval);
        }
    }

    private void handleJob(ActivatedJob job) {
        this.handleActivatedJob(job, this::handleJobFinished);
    }

    private void handleStreamedJob(ActivatedJob job) {
        this.handleActivatedJob(job, this::handleStreamJobFinished);
    }

    private void handleActivatedJob(ActivatedJob job, Runnable finalizer) {
        this.metrics.jobActivated(1);
        try {
            this.executor.execute(this.jobHandlerFactory.create(job, finalizer));
        }
        catch (RejectedExecutionException e) {
            if (this.isClosed()) {
                return;
            }
            if (this.scheduledExecutorService.isShutdown() || this.scheduledExecutorService.isTerminated()) {
                LOG.warn("Underlying executor was closed before the worker. Closing the worker now.", (Throwable)e);
                this.close();
                return;
            }
            LOG.warn(ERROR_MSG, (Object)job.getKey(), (Object)e);
        }
    }

    private void handleJobFinished() {
        int actualRemainingJobs = this.remainingJobs.decrementAndGet();
        if (!this.isPollScheduled.get() && this.shouldPoll(actualRemainingJobs)) {
            this.tryPoll();
        }
        this.metrics.jobHandled(1);
    }

    private void handleStreamJobFinished() {
        this.metrics.jobHandled(1);
    }
}

