package io.zeebe.broker.task;

import io.zeebe.broker.logstreams.LogStreamServiceNames;
import io.zeebe.broker.logstreams.processor.StreamProcessorService;
import io.zeebe.broker.system.SystemServiceNames;
import io.zeebe.broker.task.processor.LockTaskStreamProcessor;
import io.zeebe.broker.task.processor.TaskSubscription;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.processor.StreamProcessorController;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.TransportListener;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.allocation.HeapBufferAllocator;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.collection.CompactList;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;

/* loaded from: input_file:io/zeebe/broker/task/TaskSubscriptionManager.class */
public class TaskSubscriptionManager implements Actor, TransportListener {
    protected static final String NAME = "taskqueue.subscription.manager";
    public static final int NUM_CONCURRENT_REQUESTS = 1024;
    protected final ServiceStartContext serviceContext;
    protected final Function<DirectBuffer, LockTaskStreamProcessor> streamProcessorSupplier;
    protected final Map<DirectBuffer, Int2ObjectHashMap<LogStreamBucket>> logStreamBuckets;
    protected final Long2ObjectHashMap<LockTaskStreamProcessor> streamProcessorBySubscriptionId;
    protected final DeferredCommandContext asyncContext;
    protected final CreditsRequestBuffer creditRequestBuffer;
    protected final CompactList backPressuredCreditsRequests;
    protected final CreditsRequest creditsRequest;
    protected long nextSubscriptionId;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zeebe/broker/task/TaskSubscriptionManager$LogStreamBucket.class */
    public static class LogStreamBucket {
        protected final LogStream logStream;
        protected final ServiceName<LogStream> logStreamServiceName;
        protected List<LockTaskStreamProcessor> streamProcessors = new ArrayList();

        LogStreamBucket(LogStream logStream, ServiceName<LogStream> serviceName) {
            this.logStream = logStream;
            this.logStreamServiceName = serviceName;
        }

        public LogStream getLogStream() {
            return this.logStream;
        }

        public ServiceName<LogStream> getLogServiceName() {
            return this.logStreamServiceName;
        }

        public LockTaskStreamProcessor getStreamProcessorByTaskType(DirectBuffer directBuffer) {
            LockTaskStreamProcessor lockTaskStreamProcessor = null;
            int size = this.streamProcessors.size();
            for (int i = 0; i < size && lockTaskStreamProcessor == null; i++) {
                LockTaskStreamProcessor lockTaskStreamProcessor2 = this.streamProcessors.get(i);
                if (BufferUtil.equals(directBuffer, lockTaskStreamProcessor2.getSubscriptedTaskType())) {
                    lockTaskStreamProcessor = lockTaskStreamProcessor2;
                }
            }
            return lockTaskStreamProcessor;
        }

        public void addStreamProcessor(LockTaskStreamProcessor lockTaskStreamProcessor) {
            this.streamProcessors.add(lockTaskStreamProcessor);
        }

        public void removeStreamProcessor(LockTaskStreamProcessor lockTaskStreamProcessor) {
            this.streamProcessors.remove(lockTaskStreamProcessor);
        }
    }

    public TaskSubscriptionManager(ServiceStartContext serviceStartContext) {
        this(serviceStartContext, directBuffer -> {
            return new LockTaskStreamProcessor(directBuffer);
        });
    }

    public TaskSubscriptionManager(ServiceStartContext serviceStartContext, Function<DirectBuffer, LockTaskStreamProcessor> function) {
        this.logStreamBuckets = new HashMap();
        this.streamProcessorBySubscriptionId = new Long2ObjectHashMap<>();
        this.asyncContext = new DeferredCommandContext(NUM_CONCURRENT_REQUESTS);
        this.creditsRequest = new CreditsRequest();
        this.nextSubscriptionId = 0L;
        this.serviceContext = serviceStartContext;
        this.streamProcessorSupplier = function;
        this.creditRequestBuffer = new CreditsRequestBuffer(NUM_CONCURRENT_REQUESTS, creditsRequest -> {
            if (dispatchSubscriptionCredits(creditsRequest)) {
                return;
            }
            backpressureRequest(creditsRequest);
        });
        this.backPressuredCreditsRequests = new CompactList(12, this.creditRequestBuffer.getCapacityUpperBound(), new HeapBufferAllocator());
    }

    @Override // io.zeebe.util.actor.Actor
    public String name() {
        return NAME;
    }

    @Override // io.zeebe.util.actor.Actor
    public int doWork() throws Exception {
        return this.asyncContext.doWork() + dispatchBackpressuredSubscriptionCredits() + (this.backPressuredCreditsRequests.size() == 0 ? this.creditRequestBuffer.handleRequests() : 0);
    }

    public CompletableFuture<Void> addSubscription(TaskSubscription taskSubscription) {
        return this.asyncContext.runAsync(
        /*  JADX ERROR: Method code generation error
            jadx.core.utils.exceptions.CodegenException: Error generate insn: 0x000e: RETURN 
              (wrap:java.util.concurrent.CompletableFuture<java.lang.Void>:0x000b: INVOKE 
              (wrap:io.zeebe.util.DeferredCommandContext:0x0001: IGET (r4v0 'this' io.zeebe.broker.task.TaskSubscriptionManager A[IMMUTABLE_TYPE, THIS]) A[WRAPPED] io.zeebe.broker.task.TaskSubscriptionManager.asyncContext io.zeebe.util.DeferredCommandContext)
              (wrap:java.util.function.Consumer:0x0006: INVOKE_CUSTOM 
              (r4v0 'this' io.zeebe.broker.task.TaskSubscriptionManager A[DONT_INLINE, IMMUTABLE_TYPE, THIS])
              (r5v0 'taskSubscription' io.zeebe.broker.task.processor.TaskSubscription A[DONT_INLINE])
             A[MD:(io.zeebe.broker.task.TaskSubscriptionManager, io.zeebe.broker.task.processor.TaskSubscription):java.util.function.Consumer (s), WRAPPED]
             handle type: INVOKE_DIRECT
             lambda: java.util.function.Consumer.accept(java.lang.Object):void
             call insn: INVOKE 
              (r1 I:io.zeebe.broker.task.TaskSubscriptionManager)
              (r2 I:io.zeebe.broker.task.processor.TaskSubscription)
              (v2 java.util.concurrent.CompletableFuture)
             DIRECT call: io.zeebe.broker.task.TaskSubscriptionManager.lambda$addSubscription$5(io.zeebe.broker.task.processor.TaskSubscription, java.util.concurrent.CompletableFuture):void A[MD:(io.zeebe.broker.task.processor.TaskSubscription, java.util.concurrent.CompletableFuture):void (m)])
             VIRTUAL call: io.zeebe.util.DeferredCommandContext.runAsync(java.util.function.Consumer):java.util.concurrent.CompletableFuture A[MD:<T>:(java.util.function.Consumer<java.util.concurrent.CompletableFuture<T>>):java.util.concurrent.CompletableFuture<T> (m), WRAPPED])
             in method: io.zeebe.broker.task.TaskSubscriptionManager.addSubscription(io.zeebe.broker.task.processor.TaskSubscription):java.util.concurrent.CompletableFuture<java.lang.Void>, file: input_file:io/zeebe/broker/task/TaskSubscriptionManager.class
            	at jadx.core.codegen.InsnGen.makeInsn(InsnGen.java:310)
            	at jadx.core.codegen.InsnGen.makeInsn(InsnGen.java:273)
            	at jadx.core.codegen.RegionGen.makeSimpleBlock(RegionGen.java:94)
            	at jadx.core.dex.nodes.IBlock.generate(IBlock.java:15)
            	at jadx.core.codegen.RegionGen.makeRegion(RegionGen.java:66)
            	at jadx.core.dex.regions.Region.generate(Region.java:35)
            	at jadx.core.codegen.RegionGen.makeRegion(RegionGen.java:66)
            	at jadx.core.codegen.MethodGen.addRegionInsns(MethodGen.java:297)
            	at jadx.core.codegen.MethodGen.addInstructions(MethodGen.java:276)
            	at jadx.core.codegen.ClassGen.addMethodCode(ClassGen.java:406)
            	at jadx.core.codegen.ClassGen.addMethod(ClassGen.java:335)
            	at jadx.core.codegen.ClassGen.lambda$addInnerClsAndMethods$3(ClassGen.java:301)
            	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
            	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
            	at java.base/java.util.stream.SortedOps$RefSortingSink.end(SortedOps.java:395)
            	at java.base/java.util.stream.Sink$ChainedReference.end(Sink.java:261)
            Caused by: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.SSAVar.getCodeVar()" because the return value of "jadx.core.dex.instructions.args.RegisterArg.getSVar()" is null
            	at jadx.core.codegen.InsnGen.makeInlinedLambdaMethod(InsnGen.java:1025)
            	at jadx.core.codegen.InsnGen.makeInvokeLambda(InsnGen.java:936)
            	at jadx.core.codegen.InsnGen.makeInvoke(InsnGen.java:827)
            	at jadx.core.codegen.InsnGen.makeInsnBody(InsnGen.java:422)
            	at jadx.core.codegen.InsnGen.addWrappedArg(InsnGen.java:145)
            	at jadx.core.codegen.InsnGen.addArg(InsnGen.java:121)
            	at jadx.core.codegen.InsnGen.addArg(InsnGen.java:108)
            	at jadx.core.codegen.InsnGen.generateMethodArguments(InsnGen.java:1117)
            	at jadx.core.codegen.InsnGen.makeInvoke(InsnGen.java:884)
            	at jadx.core.codegen.InsnGen.makeInsnBody(InsnGen.java:422)
            	at jadx.core.codegen.InsnGen.addWrappedArg(InsnGen.java:145)
            	at jadx.core.codegen.InsnGen.addArg(InsnGen.java:121)
            	at jadx.core.codegen.InsnGen.addArg(InsnGen.java:108)
            	at jadx.core.codegen.InsnGen.makeInsnBody(InsnGen.java:368)
            	at jadx.core.codegen.InsnGen.makeInsn(InsnGen.java:303)
            	... 15 more
            */
        /*
            this = this;
            r0 = r4
            io.zeebe.util.DeferredCommandContext r0 = r0.asyncContext
            r1 = r4
            r2 = r5
            java.util.concurrent.CompletableFuture<java.lang.Void> r1 = (v2) -> { // java.util.function.Consumer.accept(java.lang.Object):void
                r1.lambda$addSubscription$5(r2, v2);
            }
            java.util.concurrent.CompletableFuture r0 = r0.runAsync(r1)
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: io.zeebe.broker.task.TaskSubscriptionManager.addSubscription(io.zeebe.broker.task.processor.TaskSubscription):java.util.concurrent.CompletableFuture");
    }

    protected CompletableFuture<LockTaskStreamProcessor> createStreamProcessorService(LogStreamBucket logStreamBucket, DirectBuffer directBuffer) {
        CompletableFuture<LockTaskStreamProcessor> completableFuture = new CompletableFuture<>();
        ServiceName<LogStream> logServiceName = logStreamBucket.getLogServiceName();
        ServiceName<StreamProcessorController> taskQueueLockStreamProcessorServiceName = TaskQueueServiceNames.taskQueueLockStreamProcessorServiceName(logStreamBucket.getLogStream().getLogName(), BufferUtil.bufferAsString(directBuffer));
        String name = taskQueueLockStreamProcessorServiceName.getName();
        DirectBuffer cloneBuffer = BufferUtil.cloneBuffer(directBuffer);
        LockTaskStreamProcessor apply = this.streamProcessorSupplier.apply(cloneBuffer);
        StreamProcessorService reprocessingEventFilter = new StreamProcessorService(name, 20, apply).eventFilter(LockTaskStreamProcessor.eventFilter()).reprocessingEventFilter(LockTaskStreamProcessor.reprocessingEventFilter(cloneBuffer));
        this.serviceContext.createService(taskQueueLockStreamProcessorServiceName, reprocessingEventFilter).dependency(logServiceName, reprocessingEventFilter.getSourceStreamInjector()).dependency(logServiceName, reprocessingEventFilter.getTargetStreamInjector()).dependency(LogStreamServiceNames.SNAPSHOT_STORAGE_SERVICE, reprocessingEventFilter.getSnapshotStorageInjector()).dependency(SystemServiceNames.ACTOR_SCHEDULER_SERVICE, reprocessingEventFilter.getActorSchedulerInjector()).install().handle((r5, th) -> {
            return Boolean.valueOf(th == null ? completableFuture.complete(apply) : completableFuture.completeExceptionally(th));
        });
        return completableFuture;
    }

    public CompletableFuture<Void> removeSubscription(long j) {
        return this.asyncContext.runAsync(completableFuture -> {
            LockTaskStreamProcessor remove = this.streamProcessorBySubscriptionId.remove(j);
            if (remove != null) {
                remove.removeSubscription(j).thenCompose(bool -> {
                    return !bool.booleanValue() ? removeStreamProcessorService(remove) : CompletableFuture.completedFuture(null);
                }).handle((BiFunction<? super U, Throwable, ? extends U>) (r4, th) -> {
                    return Boolean.valueOf(th == null ? completableFuture.complete(null) : completableFuture.completeExceptionally(th));
                });
            } else {
                completableFuture.complete(null);
            }
        });
    }

    protected CompletionStage<Void> removeStreamProcessorService(LockTaskStreamProcessor lockTaskStreamProcessor) {
        LogStreamBucket logStreamBucket = getLogStreamBucket(lockTaskStreamProcessor.getLogStreamTopicName(), lockTaskStreamProcessor.getLogStreamPartitionId());
        logStreamBucket.removeStreamProcessor(lockTaskStreamProcessor);
        return this.serviceContext.removeService(TaskQueueServiceNames.taskQueueLockStreamProcessorServiceName(logStreamBucket.getLogStream().getLogName(), BufferUtil.bufferAsString(lockTaskStreamProcessor.getSubscriptedTaskType())));
    }

    public boolean increaseSubscriptionCreditsAsync(CreditsRequest creditsRequest) {
        return this.creditRequestBuffer.offerRequest(creditsRequest);
    }

    protected boolean dispatchSubscriptionCredits(CreditsRequest creditsRequest) {
        LockTaskStreamProcessor lockTaskStreamProcessor = this.streamProcessorBySubscriptionId.get(creditsRequest.getSubscriberKey());
        if (lockTaskStreamProcessor != null) {
            return lockTaskStreamProcessor.increaseSubscriptionCreditsAsync(creditsRequest);
        }
        return true;
    }

    protected void backpressureRequest(CreditsRequest creditsRequest) {
        creditsRequest.appendTo(this.backPressuredCreditsRequests);
    }

    protected int dispatchBackpressuredSubscriptionCredits() {
        int i = 0;
        for (int size = this.backPressuredCreditsRequests.size() - 1; size >= 0; size--) {
            this.creditsRequest.wrapListElement(this.backPressuredCreditsRequests, size);
            if (!dispatchSubscriptionCredits(this.creditsRequest)) {
                break;
            }
            this.backPressuredCreditsRequests.remove(size);
            i++;
        }
        return i;
    }

    public void addStream(LogStream logStream, ServiceName<LogStream> serviceName) {
        this.asyncContext.runAsync(completableFuture -> {
            this.logStreamBuckets.computeIfAbsent(logStream.getTopicName(), directBuffer -> {
                return new Int2ObjectHashMap();
            }).put(logStream.getPartitionId(), (int) new LogStreamBucket(logStream, serviceName));
        });
    }

    public void removeStream(LogStream logStream) {
        this.asyncContext.runAsync(completableFuture -> {
            DirectBuffer topicName = logStream.getTopicName();
            int partitionId = logStream.getPartitionId();
            Int2ObjectHashMap<LogStreamBucket> int2ObjectHashMap = this.logStreamBuckets.get(topicName);
            if (int2ObjectHashMap != null) {
                int2ObjectHashMap.remove(partitionId);
                if (int2ObjectHashMap.isEmpty()) {
                    this.logStreamBuckets.remove(topicName);
                }
            }
            removeSubscriptionsForLogStream(topicName, partitionId);
        });
    }

    protected void removeSubscriptionsForLogStream(DirectBuffer directBuffer, int i) {
        Set<Map.Entry<Long, LockTaskStreamProcessor>> entrySet = this.streamProcessorBySubscriptionId.entrySet();
        for (Map.Entry<Long, LockTaskStreamProcessor> entry : entrySet) {
            LockTaskStreamProcessor value = entry.getValue();
            if (directBuffer.equals(value.getLogStreamTopicName()) && i == value.getLogStreamPartitionId()) {
                entrySet.remove(entry);
            }
        }
    }

    public void onClientChannelCloseAsync(int i) {
        this.asyncContext.runAsync(() -> {
            for (LockTaskStreamProcessor lockTaskStreamProcessor : this.streamProcessorBySubscriptionId.values()) {
                lockTaskStreamProcessor.onClientChannelCloseAsync(i).thenCompose(bool -> {
                    return !bool.booleanValue() ? removeStreamProcessorService(lockTaskStreamProcessor) : CompletableFuture.completedFuture(null);
                });
            }
        });
    }

    public int getCreditRequestCapacityUpperBound() {
        return this.creditRequestBuffer.getCapacityUpperBound();
    }

    protected LogStreamBucket getLogStreamBucket(DirectBuffer directBuffer, int i) {
        Int2ObjectHashMap<LogStreamBucket> int2ObjectHashMap = this.logStreamBuckets.get(directBuffer);
        if (int2ObjectHashMap != null) {
            return int2ObjectHashMap.get(i);
        }
        return null;
    }

    @Override // io.zeebe.transport.TransportListener
    public void onConnectionEstablished(RemoteAddress remoteAddress) {
    }

    @Override // io.zeebe.transport.TransportListener
    public void onConnectionClosed(RemoteAddress remoteAddress) {
        onClientChannelCloseAsync(remoteAddress.getStreamId());
    }
}
