package io.zeebe.broker.system.log;

import io.zeebe.broker.clustering.management.PartitionManager;
import io.zeebe.broker.logstreams.LogStreamServiceNames;
import io.zeebe.broker.logstreams.processor.StreamProcessorIds;
import io.zeebe.broker.logstreams.processor.StreamProcessorService;
import io.zeebe.broker.logstreams.processor.TypedEventStreamProcessorBuilder;
import io.zeebe.broker.logstreams.processor.TypedStreamEnvironment;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessor;
import io.zeebe.broker.system.SystemConfiguration;
import io.zeebe.broker.system.SystemServiceNames;
import io.zeebe.broker.system.deployment.processor.PartitionCollector;
import io.zeebe.broker.system.executor.ScheduledCommand;
import io.zeebe.broker.system.executor.ScheduledExecutor;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceGroupReference;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.transport.ServerOutput;
import io.zeebe.transport.ServerTransport;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/zeebe/broker/system/log/SystemPartitionManager.class */
public class SystemPartitionManager implements Service<SystemPartitionManager> {
    public static final String CREATE_TOPICS_PROCESSOR = "create-topics";
    public static final String COLLECT_PARTITIONS_PROCESSOR = "collect-partitions";
    private ServiceStartContext serviceContext;
    private final SystemConfiguration systemConfiguration;
    private PartitionManager partitionManager;
    private ServerTransport clientApiTransport;
    private ScheduledExecutor executor;
    private ScheduledCommand command;
    private final Injector<ServerTransport> clientApiTransportInjector = new Injector<>();
    private final Injector<ScheduledExecutor> executorInjector = new Injector<>();
    private final Injector<PartitionManager> partitionManagerInjector = new Injector<>();
    private final ServiceGroupReference<LogStream> logStreamsGroupReference = ServiceGroupReference.create().onAdd((serviceName, logStream) -> {
        addSystemPartition(logStream, serviceName);
    }).onRemove((serviceName2, logStream2) -> {
        removeSystemPartition(logStream2, serviceName2);
    }).build();
    private AtomicReference<PartitionResponder> partitionResponderRef = new AtomicReference<>();

    public SystemPartitionManager(SystemConfiguration systemConfiguration) {
        this.systemConfiguration = systemConfiguration;
    }

    public void addSystemPartition(LogStream logStream, ServiceName<LogStream> serviceName) {
        ServerOutput output = this.clientApiTransport.getOutput();
        TypedStreamEnvironment typedStreamEnvironment = new TypedStreamEnvironment(logStream, output);
        installCreateTopicProcessor(serviceName, typedStreamEnvironment);
        installPartitionCollectorProcessor(serviceName, output, typedStreamEnvironment);
    }

    private void installPartitionCollectorProcessor(ServiceName<LogStream> serviceName, ServerOutput serverOutput, TypedStreamEnvironment typedStreamEnvironment) {
        PartitionResponder partitionResponder = new PartitionResponder(serverOutput);
        TypedStreamProcessor buildPartitionResponseProcessor = buildPartitionResponseProcessor(typedStreamEnvironment, partitionResponder);
        StreamProcessorService eventFilter = new StreamProcessorService(COLLECT_PARTITIONS_PROCESSOR, StreamProcessorIds.SYSTEM_COLLECT_PARTITION_PROCESSOR_ID, buildPartitionResponseProcessor).eventFilter(buildPartitionResponseProcessor.buildTypeFilter());
        this.serviceContext.createService(SystemServiceNames.systemProcessorName(eventFilter.getName()), eventFilter).dependency(serviceName, eventFilter.getSourceStreamInjector()).dependency(serviceName, eventFilter.getTargetStreamInjector()).dependency(LogStreamServiceNames.SNAPSHOT_STORAGE_SERVICE, eventFilter.getSnapshotStorageInjector()).dependency(SystemServiceNames.ACTOR_SCHEDULER_SERVICE, eventFilter.getActorSchedulerInjector()).install();
        this.partitionResponderRef.set(partitionResponder);
    }

    private void installCreateTopicProcessor(ServiceName<LogStream> serviceName, TypedStreamEnvironment typedStreamEnvironment) {
        PendingPartitionsIndex pendingPartitionsIndex = new PendingPartitionsIndex();
        TopicsIndex topicsIndex = new TopicsIndex();
        ResolvePendingPartitionsCommand resolvePendingPartitionsCommand = new ResolvePendingPartitionsCommand(pendingPartitionsIndex, this.partitionManager, typedStreamEnvironment.buildStreamReader(), typedStreamEnvironment.buildStreamWriter());
        TypedStreamProcessor buildTopicCreationProcessor = buildTopicCreationProcessor(typedStreamEnvironment, this.partitionManager, topicsIndex, pendingPartitionsIndex, Duration.ofSeconds(this.systemConfiguration.getPartitionCreationTimeoutSeconds()));
        this.command = this.executor.scheduleAtFixedRate(() -> {
            buildTopicCreationProcessor.runAsync(resolvePendingPartitionsCommand);
        }, Duration.ofMillis(100L));
        StreamProcessorService eventFilter = new StreamProcessorService(CREATE_TOPICS_PROCESSOR, 1000, buildTopicCreationProcessor).eventFilter(buildTopicCreationProcessor.buildTypeFilter());
        this.serviceContext.createService(SystemServiceNames.systemProcessorName(eventFilter.getName()), eventFilter).dependency(serviceName, eventFilter.getSourceStreamInjector()).dependency(serviceName, eventFilter.getTargetStreamInjector()).dependency(LogStreamServiceNames.SNAPSHOT_STORAGE_SERVICE, eventFilter.getSnapshotStorageInjector()).dependency(SystemServiceNames.ACTOR_SCHEDULER_SERVICE, eventFilter.getActorSchedulerInjector()).install();
    }

    private void removeSystemPartition(LogStream logStream, ServiceName<LogStream> serviceName) {
        this.partitionResponderRef.set(null);
    }

    public static TypedStreamProcessor buildTopicCreationProcessor(TypedStreamEnvironment typedStreamEnvironment, PartitionManager partitionManager, TopicsIndex topicsIndex, PendingPartitionsIndex pendingPartitionsIndex, Duration duration) {
        PartitionIdGenerator partitionIdGenerator = new PartitionIdGenerator();
        RoundRobinSelectionStrategy roundRobinSelectionStrategy = new RoundRobinSelectionStrategy(partitionManager);
        return typedStreamEnvironment.newStreamProcessor().onEvent(EventType.TOPIC_EVENT, TopicState.CREATE, new CreateTopicProcessor(topicsIndex, partitionIdGenerator, roundRobinSelectionStrategy)).onEvent(EventType.PARTITION_EVENT, PartitionState.CREATE, new CreatePartitionProcessor(partitionManager, pendingPartitionsIndex, duration)).onEvent(EventType.PARTITION_EVENT, PartitionState.CREATE_COMPLETE, new CompletePartitionProcessor(pendingPartitionsIndex)).onEvent(EventType.PARTITION_EVENT, PartitionState.CREATED, new PartitionCreatedProcessor(topicsIndex, typedStreamEnvironment.buildStreamReader())).onEvent(EventType.PARTITION_EVENT, PartitionState.CREATE_EXPIRE, new ExpirePartitionCreationProcessor(pendingPartitionsIndex, partitionIdGenerator, roundRobinSelectionStrategy)).withStateResource(topicsIndex.getRawMap()).withStateResource(pendingPartitionsIndex.getRawMap()).withStateResource(partitionIdGenerator).build();
    }

    public static TypedStreamProcessor buildPartitionResponseProcessor(TypedStreamEnvironment typedStreamEnvironment, PartitionResponder partitionResponder) {
        PartitionCollector partitionCollector = new PartitionCollector(partitionResponder);
        TypedEventStreamProcessorBuilder newStreamProcessor = typedStreamEnvironment.newStreamProcessor();
        partitionCollector.registerWith(newStreamProcessor);
        partitionResponder.registerWith(newStreamProcessor);
        return newStreamProcessor.build();
    }

    @Override // io.zeebe.servicecontainer.Service
    public void start(ServiceStartContext serviceStartContext) {
        this.serviceContext = serviceStartContext;
        this.clientApiTransport = this.clientApiTransportInjector.getValue();
        this.executor = this.executorInjector.getValue();
        this.partitionManager = getPartitionManagerInjector().getValue();
    }

    @Override // io.zeebe.servicecontainer.Service
    public void stop(ServiceStopContext serviceStopContext) {
        if (this.command != null) {
            this.command.cancel();
        }
        this.partitionResponderRef.set(null);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.zeebe.servicecontainer.Service
    public SystemPartitionManager get() {
        return this;
    }

    public CompletableFuture<Void> sendPartitions(int i, long j) {
        PartitionResponder partitionResponder = this.partitionResponderRef.get();
        if (partitionResponder != null) {
            return partitionResponder.sendPartitions(i, j);
        }
        return null;
    }

    public ServiceGroupReference<LogStream> getLogStreamsGroupReference() {
        return this.logStreamsGroupReference;
    }

    public Injector<ServerTransport> getClientApiTransportInjector() {
        return this.clientApiTransportInjector;
    }

    public Injector<ScheduledExecutor> getExecutorInjector() {
        return this.executorInjector;
    }

    public Injector<PartitionManager> getPartitionManagerInjector() {
        return this.partitionManagerInjector;
    }
}
