package io.camunda.zeebe.gateway.impl.job;

import com.google.rpc.Status;
import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.grpc.ServerStreamObserver;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.camunda.zeebe.gateway.metrics.LongPollingMetrics;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.grpc.protobuf.StatusProto;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsHandler.class */
public final class LongPollingActivateJobsHandler implements ActivateJobsHandler {
    private static final String JOBS_AVAILABLE_TOPIC = "jobsAvailable";
    private static final Logger LOG = Loggers.LONG_POLLING;
    private static final String ERROR_MSG_ACTIVATED_EXHAUSTED = "Expected to activate jobs of type '%s', but no jobs available and at least one broker returned 'RESOURCE_EXHAUSTED'. Please try again later.";
    private final RoundRobinActivateJobsHandler activateJobsHandler;
    private final BrokerClient brokerClient;
    private final Duration longPollingTimeout;
    private final long probeTimeoutMillis;
    private final int failedAttemptThreshold;
    private ActorControl actor;
    private final Map<String, InFlightLongPollingActivateJobsRequestsState> jobTypeState = new ConcurrentHashMap();
    private final LongPollingMetrics metrics = new LongPollingMetrics();

    /* loaded from: input_file:io/camunda/zeebe/gateway/impl/job/LongPollingActivateJobsHandler$Builder.class */
    public static class Builder {
        private static final long DEFAULT_LONG_POLLING_TIMEOUT = 10000;
        private static final long DEFAULT_PROBE_TIMEOUT = 10000;
        private static final int EMPTY_RESPONSE_THRESHOLD = 3;
        private BrokerClient brokerClient;
        private long longPollingTimeout = 10000;
        private long probeTimeoutMillis = 10000;
        private int minEmptyResponses = EMPTY_RESPONSE_THRESHOLD;

        public Builder setBrokerClient(BrokerClient brokerClient) {
            this.brokerClient = brokerClient;
            return this;
        }

        public Builder setLongPollingTimeout(long j) {
            this.longPollingTimeout = j;
            return this;
        }

        public Builder setProbeTimeoutMillis(long j) {
            this.probeTimeoutMillis = j;
            return this;
        }

        public Builder setMinEmptyResponses(int i) {
            this.minEmptyResponses = i;
            return this;
        }

        public LongPollingActivateJobsHandler build() {
            Objects.requireNonNull(this.brokerClient, "brokerClient");
            return new LongPollingActivateJobsHandler(this.brokerClient, this.longPollingTimeout, this.probeTimeoutMillis, this.minEmptyResponses);
        }
    }

    private LongPollingActivateJobsHandler(BrokerClient brokerClient, long j, long j2, int i) {
        this.brokerClient = brokerClient;
        this.activateJobsHandler = new RoundRobinActivateJobsHandler(brokerClient);
        this.longPollingTimeout = Duration.ofMillis(j);
        this.probeTimeoutMillis = j2;
        this.failedAttemptThreshold = i;
    }

    @Override // java.util.function.Consumer
    public void accept(ActorControl actorControl) {
        this.actor = actorControl;
        this.activateJobsHandler.accept(actorControl);
        onActorStarted();
    }

    protected void onActorStarted() {
        this.actor.run(() -> {
            this.brokerClient.subscribeJobAvailableNotification(JOBS_AVAILABLE_TOPIC, this::onNotification);
            this.actor.runAtFixedRate(Duration.ofMillis(this.probeTimeoutMillis), this::probe);
        });
    }

    @Override // io.camunda.zeebe.gateway.impl.job.ActivateJobsHandler
    public void activateJobs(GatewayOuterClass.ActivateJobsRequest activateJobsRequest, ServerStreamObserver<GatewayOuterClass.ActivateJobsResponse> serverStreamObserver) {
        activateJobs(ActivateJobsHandler.toInflightActivateJobsRequest(activateJobsRequest, serverStreamObserver));
    }

    private void completeOrResubmitRequest(InflightActivateJobsRequest inflightActivateJobsRequest, boolean z) {
        if (inflightActivateJobsRequest.isLongPollingDisabled()) {
            inflightActivateJobsRequest.complete();
            return;
        }
        if (inflightActivateJobsRequest.isTimedOut()) {
            return;
        }
        InFlightLongPollingActivateJobsRequestsState jobTypeState = getJobTypeState(inflightActivateJobsRequest.getType());
        if (!inflightActivateJobsRequest.hasScheduledTimer()) {
            addTimeOut(jobTypeState, inflightActivateJobsRequest);
        }
        if (z) {
            activateJobs(inflightActivateJobsRequest);
        } else {
            enqueueRequest(jobTypeState, inflightActivateJobsRequest);
        }
    }

    public void activateJobs(InflightActivateJobsRequest inflightActivateJobsRequest) {
        this.actor.run(() -> {
            InFlightLongPollingActivateJobsRequestsState jobTypeState = getJobTypeState(inflightActivateJobsRequest.getType());
            if (jobTypeState.shouldAttempt(this.failedAttemptThreshold)) {
                activateJobsUnchecked(jobTypeState, inflightActivateJobsRequest);
            } else {
                completeOrResubmitRequest(inflightActivateJobsRequest, false);
            }
        });
    }

    private InFlightLongPollingActivateJobsRequestsState getJobTypeState(String str) {
        return this.jobTypeState.computeIfAbsent(str, str2 -> {
            return new InFlightLongPollingActivateJobsRequestsState(str2, this.metrics);
        });
    }

    private void activateJobsUnchecked(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, InflightActivateJobsRequest inflightActivateJobsRequest) {
        BrokerClusterState topology = this.brokerClient.getTopologyManager().getTopology();
        if (topology != null) {
            inFlightLongPollingActivateJobsRequestsState.addActiveRequest(inflightActivateJobsRequest);
            this.activateJobsHandler.activateJobs(topology.getPartitionsCount(), inflightActivateJobsRequest, th -> {
                onError(inFlightLongPollingActivateJobsRequestsState, inflightActivateJobsRequest, th);
            }, (num, bool) -> {
                onCompleted(inFlightLongPollingActivateJobsRequestsState, inflightActivateJobsRequest, num.intValue(), bool.booleanValue());
            });
        }
    }

    private void onNotification(String str) {
        LOG.trace("Received jobs available notification for type {}.", str);
        InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState = this.jobTypeState.get(str);
        if (inFlightLongPollingActivateJobsRequestsState == null || !inFlightLongPollingActivateJobsRequestsState.shouldNotifyAndStartNotification()) {
            LOG.trace("Ignore jobs available notification for type {}.", str);
        } else {
            LOG.trace("Handle jobs available notification for type {}.", str);
            this.actor.run(() -> {
                resetFailedAttemptsAndHandlePendingRequests(str);
                inFlightLongPollingActivateJobsRequestsState.completeNotification();
            });
        }
    }

    private void onCompleted(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, InflightActivateJobsRequest inflightActivateJobsRequest, int i, boolean z) {
        if (i != inflightActivateJobsRequest.getMaxJobsToActivate()) {
            this.actor.submit(() -> {
                inflightActivateJobsRequest.complete();
                inFlightLongPollingActivateJobsRequestsState.removeActiveRequest(inflightActivateJobsRequest);
                resetFailedAttemptsAndHandlePendingRequests(inflightActivateJobsRequest.getType());
            });
        } else if (z) {
            this.actor.submit(() -> {
                inFlightLongPollingActivateJobsRequestsState.removeActiveRequest(inflightActivateJobsRequest);
                inflightActivateJobsRequest.onError(StatusProto.toStatusException(Status.newBuilder().setCode(8).setMessage(String.format(ERROR_MSG_ACTIVATED_EXHAUSTED, inflightActivateJobsRequest.getType())).build()));
            });
        } else {
            this.actor.submit(() -> {
                inFlightLongPollingActivateJobsRequestsState.incrementFailedAttempts(ActorClock.currentTimeMillis());
                boolean shouldBeRepeated = inFlightLongPollingActivateJobsRequestsState.shouldBeRepeated(inflightActivateJobsRequest);
                inFlightLongPollingActivateJobsRequestsState.removeActiveRequest(inflightActivateJobsRequest);
                completeOrResubmitRequest(inflightActivateJobsRequest, shouldBeRepeated);
            });
        }
    }

    private void onError(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, InflightActivateJobsRequest inflightActivateJobsRequest, Throwable th) {
        this.actor.submit(() -> {
            inflightActivateJobsRequest.onError(th);
            inFlightLongPollingActivateJobsRequestsState.removeActiveRequest(inflightActivateJobsRequest);
        });
    }

    private void resetFailedAttemptsAndHandlePendingRequests(String str) {
        InFlightLongPollingActivateJobsRequestsState jobTypeState = getJobTypeState(str);
        jobTypeState.resetFailedAttempts();
        Queue<InflightActivateJobsRequest> pendingRequests = jobTypeState.getPendingRequests();
        if (!pendingRequests.isEmpty()) {
            pendingRequests.forEach(inflightActivateJobsRequest -> {
                LOG.trace("Unblocking ActivateJobsRequest {}", inflightActivateJobsRequest.getRequest());
                activateJobs(inflightActivateJobsRequest);
            });
        } else {
            if (jobTypeState.hasActiveRequests()) {
                return;
            }
            this.jobTypeState.remove(str);
        }
    }

    private void enqueueRequest(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, InflightActivateJobsRequest inflightActivateJobsRequest) {
        LOG.trace("Worker '{}' asked for '{}' jobs of type '{}', but none are available. This request will be kept open until a new job of this type is created or until timeout of '{}'.", new Object[]{inflightActivateJobsRequest.getWorker(), Integer.valueOf(inflightActivateJobsRequest.getMaxJobsToActivate()), inflightActivateJobsRequest.getType(), inflightActivateJobsRequest.getLongPollingTimeout(this.longPollingTimeout)});
        inFlightLongPollingActivateJobsRequestsState.enqueueRequest(inflightActivateJobsRequest);
    }

    private void addTimeOut(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, InflightActivateJobsRequest inflightActivateJobsRequest) {
        inflightActivateJobsRequest.setScheduledTimer(this.actor.schedule(inflightActivateJobsRequest.getLongPollingTimeout(this.longPollingTimeout), () -> {
            inflightActivateJobsRequest.timeout();
            inFlightLongPollingActivateJobsRequestsState.removeRequest(inflightActivateJobsRequest);
        }));
    }

    private void probe() {
        long currentTimeMillis = ActorClock.currentTimeMillis();
        this.jobTypeState.forEach((str, inFlightLongPollingActivateJobsRequestsState) -> {
            if (inFlightLongPollingActivateJobsRequestsState.getLastUpdatedTime() < currentTimeMillis - this.probeTimeoutMillis) {
                InflightActivateJobsRequest nextPendingRequest = inFlightLongPollingActivateJobsRequestsState.getNextPendingRequest();
                if (nextPendingRequest != null) {
                    activateJobsUnchecked(inFlightLongPollingActivateJobsRequestsState, nextPendingRequest);
                } else if (inFlightLongPollingActivateJobsRequestsState.getFailedAttempts() >= this.failedAttemptThreshold) {
                    inFlightLongPollingActivateJobsRequestsState.setFailedAttempts(this.failedAttemptThreshold - 1);
                }
            }
        });
    }

    public static Builder newBuilder() {
        return new Builder();
    }
}
