package io.camunda.zeebe.gateway.impl.job;

import io.camunda.zeebe.broker.client.api.BrokerClient;
import io.camunda.zeebe.broker.client.api.BrokerClusterState;
import io.camunda.zeebe.broker.client.api.BrokerErrorException;
import io.camunda.zeebe.broker.client.api.BrokerRejectionException;
import io.camunda.zeebe.broker.client.api.BrokerTopologyManager;
import io.camunda.zeebe.broker.client.api.dto.BrokerResponse;
import io.camunda.zeebe.broker.client.impl.PartitionIdIterator;
import io.camunda.zeebe.broker.client.impl.RoundRobinDispatchStrategy;
import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerActivateJobsRequest;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerFailJobRequest;
import io.camunda.zeebe.gateway.impl.job.JobActivationResult;
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.util.Either;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:io/camunda/zeebe/gateway/impl/job/RoundRobinActivateJobsHandler.class */
public final class RoundRobinActivateJobsHandler<T> implements ActivateJobsHandler<T> {
    private static final String ACTIVATE_JOB_NOT_SENT_MSG = "Failed to send activated jobs to client";
    private static final String ACTIVATE_JOB_NOT_SENT_MSG_WITH_REASON = "Failed to send activated jobs to client, failed with: %s";
    private static final String MAX_MESSAGE_SIZE_EXCEEDED_MSG = "the response is bigger than the maximum allowed message size %d";
    private final Map<String, RoundRobinDispatchStrategy> jobTypeToNextPartitionId = new ConcurrentHashMap();
    private final BrokerClient brokerClient;
    private final BrokerTopologyManager topologyManager;
    private final long maxMessageSize;
    private final Function<JobActivationResponse, JobActivationResult<T>> activationResultMapper;
    private ActorControl actor;
    private final Function<String, Throwable> requestCanceledExceptionProvider;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/gateway/impl/job/RoundRobinActivateJobsHandler$ResponseObserverDelegate.class */
    public static final class ResponseObserverDelegate extends Record {
        private final Consumer<Throwable> onErrorDelegate;
        private final BiConsumer<Integer, Boolean> onCompletedDelegate;

        private ResponseObserverDelegate(Consumer<Throwable> consumer, BiConsumer<Integer, Boolean> biConsumer) {
            this.onErrorDelegate = consumer;
            this.onCompletedDelegate = biConsumer;
        }

        public void onError(Throwable th) {
            this.onErrorDelegate.accept(th);
        }

        public void onCompleted(int i, boolean z) {
            this.onCompletedDelegate.accept(Integer.valueOf(i), Boolean.valueOf(z));
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ResponseObserverDelegate.class), ResponseObserverDelegate.class, "onErrorDelegate;onCompletedDelegate", "FIELD:Lio/camunda/zeebe/gateway/impl/job/RoundRobinActivateJobsHandler$ResponseObserverDelegate;->onErrorDelegate:Ljava/util/function/Consumer;", "FIELD:Lio/camunda/zeebe/gateway/impl/job/RoundRobinActivateJobsHandler$ResponseObserverDelegate;->onCompletedDelegate:Ljava/util/function/BiConsumer;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ResponseObserverDelegate.class), ResponseObserverDelegate.class, "onErrorDelegate;onCompletedDelegate", "FIELD:Lio/camunda/zeebe/gateway/impl/job/RoundRobinActivateJobsHandler$ResponseObserverDelegate;->onErrorDelegate:Ljava/util/function/Consumer;", "FIELD:Lio/camunda/zeebe/gateway/impl/job/RoundRobinActivateJobsHandler$ResponseObserverDelegate;->onCompletedDelegate:Ljava/util/function/BiConsumer;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ResponseObserverDelegate.class, Object.class), ResponseObserverDelegate.class, "onErrorDelegate;onCompletedDelegate", "FIELD:Lio/camunda/zeebe/gateway/impl/job/RoundRobinActivateJobsHandler$ResponseObserverDelegate;->onErrorDelegate:Ljava/util/function/Consumer;", "FIELD:Lio/camunda/zeebe/gateway/impl/job/RoundRobinActivateJobsHandler$ResponseObserverDelegate;->onCompletedDelegate:Ljava/util/function/BiConsumer;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Consumer<Throwable> onErrorDelegate() {
            return this.onErrorDelegate;
        }

        public BiConsumer<Integer, Boolean> onCompletedDelegate() {
            return this.onCompletedDelegate;
        }
    }

    public RoundRobinActivateJobsHandler(BrokerClient brokerClient, long j, Function<JobActivationResponse, JobActivationResult<T>> function, Function<String, Throwable> function2) {
        this.brokerClient = brokerClient;
        this.topologyManager = brokerClient.getTopologyManager();
        this.maxMessageSize = j;
        this.activationResultMapper = function;
        this.requestCanceledExceptionProvider = function2;
    }

    @Override // java.util.function.Consumer
    public void accept(ActorControl actorControl) {
        this.actor = actorControl;
    }

    @Override // io.camunda.zeebe.gateway.impl.job.ActivateJobsHandler
    public void activateJobs(BrokerActivateJobsRequest brokerActivateJobsRequest, ResponseObserver<T> responseObserver, Consumer<Runnable> consumer, long j) {
        BrokerClusterState topology = this.topologyManager.getTopology();
        if (topology != null) {
            InflightActivateJobsRequest<T> inflightActivateJobsRequest = new InflightActivateJobsRequest<>(ACTIVATE_JOBS_REQUEST_ID_GENERATOR.getAndIncrement(), brokerActivateJobsRequest, responseObserver, j);
            int partitionsCount = topology.getPartitionsCount();
            Objects.requireNonNull(responseObserver);
            activateJobs(partitionsCount, inflightActivateJobsRequest, responseObserver::onError, (num, bool) -> {
                responseObserver.onCompleted();
            });
        }
    }

    public void activateJobs(int i, InflightActivateJobsRequest<T> inflightActivateJobsRequest, Consumer<Throwable> consumer, BiConsumer<Integer, Boolean> biConsumer) {
        String type = inflightActivateJobsRequest.getType();
        activateJobs(inflightActivateJobsRequest, new InflightActivateJobsRequestState(partitionIdIteratorForType(type, i), inflightActivateJobsRequest.getMaxJobsToActivate()), new ResponseObserverDelegate(consumer, biConsumer));
    }

    private void activateJobs(InflightActivateJobsRequest<T> inflightActivateJobsRequest, InflightActivateJobsRequestState inflightActivateJobsRequestState, ResponseObserverDelegate responseObserverDelegate) {
        this.actor.run(() -> {
            if (inflightActivateJobsRequest.isOpen()) {
                if (!inflightActivateJobsRequestState.shouldActivateJobs()) {
                    responseObserverDelegate.onCompleted(inflightActivateJobsRequestState.getRemainingAmount(), inflightActivateJobsRequestState.wasResourceExhaustedPresent());
                    return;
                }
                BrokerActivateJobsRequest request = inflightActivateJobsRequest.getRequest();
                int nextPartition = inflightActivateJobsRequestState.getNextPartition();
                int remainingAmount = inflightActivateJobsRequestState.getRemainingAmount();
                request.setPartitionId(nextPartition);
                request.setMaxJobsToActivate(remainingAmount);
                this.brokerClient.sendRequest(request).whenComplete((BiConsumer) handleBrokerResponse(inflightActivateJobsRequest, inflightActivateJobsRequestState, responseObserverDelegate));
            }
        });
    }

    private BiConsumer<BrokerResponse<JobBatchRecord>, Throwable> handleBrokerResponse(InflightActivateJobsRequest<T> inflightActivateJobsRequest, InflightActivateJobsRequestState inflightActivateJobsRequestState, ResponseObserverDelegate responseObserverDelegate) {
        return (brokerResponse, th) -> {
            if (th == null) {
                handleResponseSuccess(inflightActivateJobsRequest, inflightActivateJobsRequestState, responseObserverDelegate, brokerResponse);
            } else {
                handleResponseError(inflightActivateJobsRequest, inflightActivateJobsRequestState, responseObserverDelegate, th);
            }
        };
    }

    private void handleResponseSuccess(InflightActivateJobsRequest<T> inflightActivateJobsRequest, InflightActivateJobsRequestState inflightActivateJobsRequestState, ResponseObserverDelegate responseObserverDelegate, BrokerResponse<JobBatchRecord> brokerResponse) {
        this.actor.run(() -> {
            JobBatchRecord jobBatchRecord = (JobBatchRecord) brokerResponse.getResponse();
            JobActivationResult<T> apply = this.activationResultMapper.apply(new JobActivationResponse(brokerResponse.getKey(), jobBatchRecord, this.maxMessageSize));
            List<JobActivationResult.ActivatedJob> jobsToDefer = apply.getJobsToDefer();
            if (!jobsToDefer.isEmpty()) {
                List<Long> list = jobsToDefer.stream().map((v0) -> {
                    return v0.key();
                }).toList();
                String type = inflightActivateJobsRequest.getType();
                String format = String.format(MAX_MESSAGE_SIZE_EXCEEDED_MSG, Long.valueOf(this.maxMessageSize));
                logResponseNotSent(type, list, format);
                reactivateJobs(jobsToDefer, format);
            }
            T activateJobsResponse = apply.getActivateJobsResponse();
            int jobsCount = apply.getJobsCount();
            if (jobsCount > 0) {
                Either<Exception, Boolean> tryToSendActivatedJobs = inflightActivateJobsRequest.tryToSendActivatedJobs(activateJobsResponse);
                if (!((Boolean) tryToSendActivatedJobs.getOrElse(false)).booleanValue()) {
                    List<JobActivationResult.ActivatedJob> jobs = apply.getJobs();
                    List<Long> jobKeys = jobBatchRecord.getJobKeys();
                    String type2 = inflightActivateJobsRequest.getType();
                    String createReasonMessage = createReasonMessage(tryToSendActivatedJobs);
                    logResponseNotSent(type2, jobKeys, createReasonMessage);
                    reactivateJobs(jobs, createReasonMessage);
                    cancelActivateJobsRequest(createReasonMessage, responseObserverDelegate);
                    return;
                }
            }
            int remainingAmount = inflightActivateJobsRequestState.getRemainingAmount() - jobsCount;
            boolean truncated = jobBatchRecord.getTruncated();
            inflightActivateJobsRequestState.setRemainingAmount(remainingAmount);
            inflightActivateJobsRequestState.setPollPrevPartition(truncated);
            activateJobs(inflightActivateJobsRequest, inflightActivateJobsRequestState, responseObserverDelegate);
        });
    }

    private String createReasonMessage(Either<Exception, Boolean> either) {
        return either.isLeft() ? String.format(ACTIVATE_JOB_NOT_SENT_MSG_WITH_REASON, ((Exception) either.getLeft()).getMessage()) : ACTIVATE_JOB_NOT_SENT_MSG;
    }

    private void reactivateJobs(List<JobActivationResult.ActivatedJob> list, String str) {
        if (list != null) {
            list.forEach(activatedJob -> {
                tryToReactivateJob(activatedJob, str);
            });
        }
    }

    private void tryToReactivateJob(JobActivationResult.ActivatedJob activatedJob, String str) {
        this.brokerClient.sendRequestWithRetry(toFailJobRequest(activatedJob, str)).whenComplete((BiConsumer) (brokerResponse, th) -> {
            if (th != null) {
                Loggers.GATEWAY_LOGGER.info("Failed to reactivate job {} due to {}", Long.valueOf(activatedJob.key()), th.getMessage());
            }
        });
    }

    private BrokerFailJobRequest toFailJobRequest(JobActivationResult.ActivatedJob activatedJob, String str) {
        return new BrokerFailJobRequest(activatedJob.key(), activatedJob.retries(), 0L).setErrorMessage(str);
    }

    private void cancelActivateJobsRequest(String str, ResponseObserverDelegate responseObserverDelegate) {
        responseObserverDelegate.onError(this.requestCanceledExceptionProvider.apply(str));
    }

    private void handleResponseError(InflightActivateJobsRequest<T> inflightActivateJobsRequest, InflightActivateJobsRequestState inflightActivateJobsRequestState, ResponseObserverDelegate responseObserverDelegate, Throwable th) {
        this.actor.run(() -> {
            boolean wasResourceExhausted = wasResourceExhausted(th);
            if (isRejection(th)) {
                responseObserverDelegate.onError(th);
                return;
            }
            if (!wasResourceExhausted) {
                logErrorResponse(inflightActivateJobsRequestState.getCurrentPartition(), inflightActivateJobsRequest.getType(), th);
            }
            inflightActivateJobsRequestState.setResourceExhaustedWasPresent(wasResourceExhausted);
            inflightActivateJobsRequestState.setPollPrevPartition(false);
            activateJobs(inflightActivateJobsRequest, inflightActivateJobsRequestState, responseObserverDelegate);
        });
    }

    private boolean isRejection(Throwable th) {
        return th != null && BrokerRejectionException.class.isAssignableFrom(th.getClass());
    }

    private boolean wasResourceExhausted(Throwable th) {
        return (th instanceof BrokerErrorException) && ((BrokerErrorException) th).getError().getCode() == ErrorCode.RESOURCE_EXHAUSTED;
    }

    private void logErrorResponse(int i, String str, Throwable th) {
        Loggers.GATEWAY_LOGGER.warn("Failed to activate jobs for type {} from partition {}", new Object[]{str, Integer.valueOf(i), th});
    }

    private void logResponseNotSent(String str, List<Long> list, String str2) {
        Loggers.GATEWAY_LOGGER.debug("Failed to send {} activated jobs for type {} (with job keys: {}) to client, because: {}", new Object[]{Integer.valueOf(list.size()), str, list, str2});
    }

    private PartitionIdIterator partitionIdIteratorForType(String str, int i) {
        return new PartitionIdIterator(this.jobTypeToNextPartitionId.computeIfAbsent(str, str2 -> {
            return new RoundRobinDispatchStrategy();
        }).determinePartition(this.topologyManager), i, this.topologyManager);
    }
}
