/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.gateway.impl.job;

import io.camunda.zeebe.broker.client.api.BrokerClient;
import io.camunda.zeebe.broker.client.api.BrokerClusterState;
import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerActivateJobsRequest;
import io.camunda.zeebe.gateway.impl.job.ActivateJobsHandler;
import io.camunda.zeebe.gateway.impl.job.InFlightLongPollingActivateJobsRequestsState;
import io.camunda.zeebe.gateway.impl.job.InflightActivateJobsRequest;
import io.camunda.zeebe.gateway.impl.job.JobActivationResponse;
import io.camunda.zeebe.gateway.impl.job.JobActivationResult;
import io.camunda.zeebe.gateway.impl.job.ResponseObserver;
import io.camunda.zeebe.gateway.impl.job.RoundRobinActivateJobsHandler;
import io.camunda.zeebe.gateway.metrics.LongPollingMetrics;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.ScheduledTimer;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;

public final class LongPollingActivateJobsHandler<T>
implements ActivateJobsHandler<T> {
    private static final String JOBS_AVAILABLE_TOPIC = "jobsAvailable";
    private static final Logger LOG = Loggers.LONG_POLLING;
    private static final String ERROR_MSG_ACTIVATED_EXHAUSTED = "Expected to activate jobs of type '%s', but no jobs available and at least one broker returned 'RESOURCE_EXHAUSTED'. Please try again later.";
    private final RoundRobinActivateJobsHandler<T> activateJobsHandler;
    private final BrokerClient brokerClient;
    private final Map<String, InFlightLongPollingActivateJobsRequestsState<T>> jobTypeState = new ConcurrentHashMap<String, InFlightLongPollingActivateJobsRequestsState<T>>();
    private final Duration longPollingTimeout;
    private final long probeTimeoutMillis;
    private final int failedAttemptThreshold;
    private final LongPollingMetrics metrics;
    private ActorControl actor;
    private final Function<String, Exception> noJobsReceivedExceptionProvider;

    private LongPollingActivateJobsHandler(BrokerClient brokerClient, long maxMessageSize, long longPollingTimeout, long probeTimeoutMillis, int failedAttemptThreshold, Function<JobActivationResponse, JobActivationResult<T>> activationResultMapper, Function<String, Exception> noJobsReceivedExceptionProvider, Function<String, Throwable> requestCanceledExceptionProvider, LongPollingMetrics metrics) {
        this.brokerClient = brokerClient;
        this.activateJobsHandler = new RoundRobinActivateJobsHandler<T>(brokerClient, maxMessageSize, activationResultMapper, requestCanceledExceptionProvider);
        this.noJobsReceivedExceptionProvider = noJobsReceivedExceptionProvider;
        this.longPollingTimeout = Duration.ofMillis(longPollingTimeout);
        this.probeTimeoutMillis = probeTimeoutMillis;
        this.failedAttemptThreshold = failedAttemptThreshold;
        this.metrics = metrics;
    }

    @Override
    public void accept(ActorControl actor) {
        this.actor = actor;
        this.activateJobsHandler.accept(actor);
        this.onActorStarted();
    }

    void onActorStarted() {
        this.actor.run(() -> {
            this.brokerClient.subscribeJobAvailableNotification(JOBS_AVAILABLE_TOPIC, this::onJobAvailableNotification);
            this.actor.runAtFixedRate(Duration.ofMillis(this.probeTimeoutMillis), this::probe);
        });
    }

    @Override
    public void activateJobs(BrokerActivateJobsRequest request, ResponseObserver<T> responseObserver, Consumer<Runnable> setCancelHandler, long requestTimeout) {
        InflightActivateJobsRequest<T> longPollingRequest = new InflightActivateJobsRequest<T>(ACTIVATE_JOBS_REQUEST_ID_GENERATOR.getAndIncrement(), request, responseObserver, requestTimeout);
        String jobType = longPollingRequest.getType();
        setCancelHandler.accept(() -> this.onRequestCancel(jobType, longPollingRequest));
        this.actor.run(() -> {
            InFlightLongPollingActivateJobsRequestsState state = this.jobTypeState.computeIfAbsent(jobType, type -> new InFlightLongPollingActivateJobsRequestsState((String)type, this.metrics));
            this.tryToActivateJobsOnAllPartitions(state, longPollingRequest);
        });
    }

    private void onRequestCancel(String type, InflightActivateJobsRequest<T> longPollingRequest) {
        this.actor.run(() -> {
            InFlightLongPollingActivateJobsRequestsState<T> state = this.jobTypeState.get(type);
            if (state != null) {
                state.removeRequest(longPollingRequest);
            }
        });
    }

    private void tryToActivateJobsOnAllPartitions(InFlightLongPollingActivateJobsRequestsState<T> state, InflightActivateJobsRequest<T> request) {
        BrokerClusterState topology = this.brokerClient.getTopologyManager().getTopology();
        if (topology != null) {
            state.addActiveRequest(request);
            int partitionsCount = topology.getPartitionsCount();
            this.activateJobsHandler.activateJobs(partitionsCount, request, (Throwable error) -> this.actor.submit(() -> {
                request.onError((Throwable)error);
                state.removeActiveRequest(request);
            }), (remainingAmount, containedResourceExhaustedResponse) -> {
                boolean noJobsActivated;
                boolean bl = noJobsActivated = remainingAmount.intValue() == request.getMaxJobsToActivate();
                if (noJobsActivated) {
                    this.handleNoReceivedJobsFromAllPartitions(state, request, (Boolean)containedResourceExhaustedResponse);
                } else {
                    this.actor.submit(() -> {
                        request.complete();
                        state.removeActiveRequest(request);
                        state.resetFailedAttempts();
                        this.handlePendingRequests(state, request.getType());
                    });
                }
            });
        }
    }

    private void handleNoReceivedJobsFromAllPartitions(InFlightLongPollingActivateJobsRequestsState<T> state, InflightActivateJobsRequest<T> request, Boolean containedResourceExhaustedResponse) {
        if (containedResourceExhaustedResponse.booleanValue()) {
            this.actor.submit(() -> {
                state.removeActiveRequest(request);
                String type = request.getType();
                String errorMsg = String.format(ERROR_MSG_ACTIVATED_EXHAUSTED, type);
                request.onError(this.noJobsReceivedExceptionProvider.apply(errorMsg));
            });
        } else {
            this.actor.submit(() -> {
                state.incrementFailedAttempts(ActorClock.currentTimeMillis());
                boolean shouldBeRepeated = state.shouldBeRepeated(request);
                state.removeActiveRequest(request);
                this.completeOrResubmitRequest(request, shouldBeRepeated);
            });
        }
    }

    private void completeOrResubmitRequest(InflightActivateJobsRequest<T> request, boolean activateImmediately) {
        if (request.isLongPollingDisabled()) {
            request.complete();
            return;
        }
        if (request.isTimedOut()) {
            return;
        }
        InFlightLongPollingActivateJobsRequestsState state = this.jobTypeState.computeIfAbsent(request.getType(), type1 -> new InFlightLongPollingActivateJobsRequestsState((String)type1, this.metrics));
        if (!request.hasScheduledTimer()) {
            this.scheduleLongPollingTimeout(state, request);
        }
        if (activateImmediately) {
            this.internalActivateJobsRetry(request);
        } else {
            this.markRequestAsPending(state, request);
        }
    }

    void internalActivateJobsRetry(InflightActivateJobsRequest<T> request) {
        this.actor.run(() -> {
            String jobType = request.getType();
            InFlightLongPollingActivateJobsRequestsState state = this.jobTypeState.computeIfAbsent(jobType, type -> new InFlightLongPollingActivateJobsRequestsState((String)type, this.metrics));
            if (state.shouldAttempt(this.failedAttemptThreshold)) {
                this.tryToActivateJobsOnAllPartitions(state, request);
            } else {
                this.completeOrResubmitRequest(request, false);
            }
        });
    }

    private void onJobAvailableNotification(String jobType) {
        LOG.trace("Received jobs available notification for type {}.", (Object)jobType);
        InFlightLongPollingActivateJobsRequestsState<T> state = this.jobTypeState.get(jobType);
        if (state != null && state.shouldNotifyAndStartNotification()) {
            LOG.trace("Handle jobs available notification for type {}.", (Object)jobType);
            this.actor.run(() -> {
                state.resetFailedAttempts();
                this.handlePendingRequests(state, jobType);
                state.completeNotification();
            });
        } else {
            LOG.trace("Ignore jobs available notification for type {}.", (Object)jobType);
        }
    }

    private void handlePendingRequests(InFlightLongPollingActivateJobsRequestsState<T> state, String jobType) {
        Queue<InflightActivateJobsRequest<InflightActivateJobsRequest>> pendingRequests = state.getPendingRequests();
        if (!pendingRequests.isEmpty()) {
            pendingRequests.forEach(nextPendingRequest -> {
                LOG.trace("Unblocking ActivateJobsRequest {}", (Object)nextPendingRequest.getRequest());
                this.internalActivateJobsRetry((InflightActivateJobsRequest<T>)nextPendingRequest);
            });
        } else if (!state.hasActiveRequests()) {
            this.jobTypeState.remove(jobType);
        }
    }

    private void markRequestAsPending(InFlightLongPollingActivateJobsRequestsState<T> state, InflightActivateJobsRequest<T> request) {
        LOG.trace("Worker '{}' asked for '{}' jobs of type '{}', but none are available. This request will be kept open until a new job of this type is created or until timeout of '{}'.", new Object[]{request.getWorker(), request.getMaxJobsToActivate(), request.getType(), request.getLongPollingTimeout(this.longPollingTimeout)});
        state.enqueueRequest(request);
    }

    private void scheduleLongPollingTimeout(InFlightLongPollingActivateJobsRequestsState<T> state, InflightActivateJobsRequest<T> request) {
        Duration requestTimeout = request.getLongPollingTimeout(this.longPollingTimeout);
        ScheduledTimer timeout = this.actor.schedule(requestTimeout, () -> {
            request.timeout();
            state.removeRequest(request);
        });
        request.setScheduledTimer(timeout);
    }

    private void probe() {
        long now = ActorClock.currentTimeMillis();
        this.jobTypeState.forEach((type, state) -> {
            if (state.getLastUpdatedTime() < now - this.probeTimeoutMillis) {
                InflightActivateJobsRequest probeRequest = state.getNextPendingRequest();
                if (probeRequest != null) {
                    this.tryToActivateJobsOnAllPartitions((InFlightLongPollingActivateJobsRequestsState<T>)state, probeRequest);
                } else if (state.getFailedAttempts() >= this.failedAttemptThreshold) {
                    state.setFailedAttempts(this.failedAttemptThreshold - 1);
                }
            }
        });
    }

    public static <T> Builder<T> newBuilder() {
        return new Builder();
    }

    public static class Builder<T> {
        private BrokerClient brokerClient;
        private long maxMessageSize;
        private long longPollingTimeout = 10000L;
        private long probeTimeoutMillis = 10000L;
        private int minEmptyResponses = 3;
        private Function<JobActivationResponse, JobActivationResult<T>> activationResultMapper;
        private Function<String, Exception> noJobsReceivedExceptionProvider;
        private Function<String, Throwable> requestCanceledExceptionProvider;
        private LongPollingMetrics metrics;

        public Builder<T> setBrokerClient(BrokerClient brokerClient) {
            this.brokerClient = brokerClient;
            return this;
        }

        public Builder<T> setMaxMessageSize(long maxMessageSize) {
            this.maxMessageSize = maxMessageSize;
            return this;
        }

        public Builder<T> setLongPollingTimeout(long longPollingTimeout) {
            this.longPollingTimeout = longPollingTimeout;
            return this;
        }

        public Builder<T> setProbeTimeoutMillis(long probeTimeoutMillis) {
            this.probeTimeoutMillis = probeTimeoutMillis;
            return this;
        }

        public Builder<T> setMinEmptyResponses(int minEmptyResponses) {
            this.minEmptyResponses = minEmptyResponses;
            return this;
        }

        public Builder<T> setActivationResultMapper(Function<JobActivationResponse, JobActivationResult<T>> activationResultMapper) {
            this.activationResultMapper = activationResultMapper;
            return this;
        }

        public Builder<T> setNoJobsReceivedExceptionProvider(Function<String, Exception> noJobsReceivedExceptionProvider) {
            this.noJobsReceivedExceptionProvider = noJobsReceivedExceptionProvider;
            return this;
        }

        public Builder<T> setRequestCanceledExceptionProvider(Function<String, Throwable> requestCanceledExceptionProvider) {
            this.requestCanceledExceptionProvider = requestCanceledExceptionProvider;
            return this;
        }

        public Builder<T> setMetrics(LongPollingMetrics metrics) {
            this.metrics = metrics;
            return this;
        }

        public LongPollingActivateJobsHandler<T> build() {
            Objects.requireNonNull(this.brokerClient, "brokerClient");
            return new LongPollingActivateJobsHandler<T>(this.brokerClient, this.maxMessageSize, this.longPollingTimeout, this.probeTimeoutMillis, this.minEmptyResponses, this.activationResultMapper, this.noJobsReceivedExceptionProvider, this.requestCanceledExceptionProvider, this.metrics);
        }
    }
}

