package io.camunda.zeebe.broker;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.core.Atomix;
import io.atomix.raft.partition.RaftPartition;
import io.atomix.raft.partition.RaftPartitionGroup;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.broker.bootstrap.CloseProcess;
import io.camunda.zeebe.broker.bootstrap.StartProcess;
import io.camunda.zeebe.broker.clustering.atomix.AtomixFactory;
import io.camunda.zeebe.broker.clustering.topology.TopologyManagerImpl;
import io.camunda.zeebe.broker.clustering.topology.TopologyPartitionListenerImpl;
import io.camunda.zeebe.broker.engine.impl.DeploymentDistributorImpl;
import io.camunda.zeebe.broker.engine.impl.LongPollingJobNotification;
import io.camunda.zeebe.broker.engine.impl.PartitionCommandSenderImpl;
import io.camunda.zeebe.broker.engine.impl.SubscriptionApiCommandMessageHandlerService;
import io.camunda.zeebe.broker.exporter.jar.ExporterJarLoadException;
import io.camunda.zeebe.broker.exporter.repo.ExporterLoadException;
import io.camunda.zeebe.broker.exporter.repo.ExporterRepository;
import io.camunda.zeebe.broker.system.EmbeddedGatewayService;
import io.camunda.zeebe.broker.system.SystemContext;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.configuration.ClusterCfg;
import io.camunda.zeebe.broker.system.configuration.DataCfg;
import io.camunda.zeebe.broker.system.configuration.ExporterCfg;
import io.camunda.zeebe.broker.system.configuration.NetworkCfg;
import io.camunda.zeebe.broker.system.configuration.SocketBindingCfg;
import io.camunda.zeebe.broker.system.configuration.backpressure.BackpressureCfg;
import io.camunda.zeebe.broker.system.management.BrokerAdminService;
import io.camunda.zeebe.broker.system.management.BrokerAdminServiceImpl;
import io.camunda.zeebe.broker.system.management.LeaderManagementRequestHandler;
import io.camunda.zeebe.broker.system.management.deployment.PushDeploymentRequestHandler;
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageMonitor;
import io.camunda.zeebe.broker.system.partitions.PartitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionHealthBroadcaster;
import io.camunda.zeebe.broker.system.partitions.PartitionStep;
import io.camunda.zeebe.broker.system.partitions.TypedRecordProcessorsFactory;
import io.camunda.zeebe.broker.system.partitions.ZeebePartition;
import io.camunda.zeebe.broker.system.partitions.impl.AtomixPartitionMessagingService;
import io.camunda.zeebe.broker.system.partitions.impl.PartitionProcessingState;
import io.camunda.zeebe.broker.system.partitions.impl.PartitionTransitionImpl;
import io.camunda.zeebe.broker.system.partitions.impl.steps.AtomixLogStoragePartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.ExporterDirectorPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.FollowerPostStoragePartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.LeaderPostStoragePartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.LogDeletionPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.LogStreamPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.RaftLogReaderPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.RocksDbMetricExporterPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.SnapshotDirectorPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.SnapshotReplicationPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.StateControllerPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.StreamProcessorPartitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.ZeebeDbPartitionStep;
import io.camunda.zeebe.broker.transport.backpressure.PartitionAwareRequestLimiter;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiService;
import io.camunda.zeebe.engine.processing.EngineProcessors;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.snapshots.SnapshotStoreSupplier;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStoreFactory;
import io.camunda.zeebe.transport.ServerTransport;
import io.camunda.zeebe.transport.TransportFactory;
import io.camunda.zeebe.util.LogUtil;
import io.camunda.zeebe.util.VersionUtil;
import io.camunda.zeebe.util.exception.UncheckedExecutionException;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorScheduler;
import io.camunda.zeebe.util.sched.clock.ActorClock;
import io.netty.util.NetUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/Broker.class */
public final class Broker implements AutoCloseable {
    public static final Logger LOG = Loggers.SYSTEM_LOGGER;
    private static final List<PartitionStep> LEADER_STEPS = List.of((Object[]) new PartitionStep[]{new AtomixLogStoragePartitionStep(), new LogStreamPartitionStep(), new RaftLogReaderPartitionStep(), new SnapshotReplicationPartitionStep(), new StateControllerPartitionStep(), new LogDeletionPartitionStep(), new LeaderPostStoragePartitionStep(), new ZeebeDbPartitionStep(), new StreamProcessorPartitionStep(), new SnapshotDirectorPartitionStep(), new RocksDbMetricExporterPartitionStep(), new ExporterDirectorPartitionStep()});
    private static final List<PartitionStep> FOLLOWER_STEPS = List.of(new RaftLogReaderPartitionStep(), new SnapshotReplicationPartitionStep(), new StateControllerPartitionStep(), new LogDeletionPartitionStep(), new FollowerPostStoragePartitionStep());
    private final SystemContext brokerContext;
    private final List<PartitionListener> partitionListeners;
    private boolean isClosed;
    private Atomix atomix;
    private CompletableFuture<Broker> startFuture;
    private TopologyManagerImpl topologyManager;
    private LeaderManagementRequestHandler managementRequestHandler;
    private CommandApiService commandHandler;
    private ActorScheduler scheduler;
    private CloseProcess closeProcess;
    private EmbeddedGatewayService embeddedGatewayService;
    private ServerTransport serverTransport;
    private BrokerHealthCheckService healthCheckService;
    private final List<DiskSpaceUsageListener> diskSpaceUsageListeners;
    private final SpringBrokerBridge springBrokerBridge;
    private DiskSpaceUsageMonitor diskSpaceUsageMonitor;
    private SnapshotStoreSupplier snapshotStoreSupplier;
    private final List<ZeebePartition> partitions;
    private BrokerAdminService brokerAdminService;

    public Broker(SystemContext systemContext, SpringBrokerBridge springBrokerBridge) {
        this.isClosed = false;
        this.diskSpaceUsageListeners = new ArrayList();
        this.partitions = new ArrayList();
        this.brokerContext = systemContext;
        this.partitionListeners = new ArrayList();
        this.springBrokerBridge = springBrokerBridge;
    }

    public Broker(BrokerCfg brokerCfg, String str, ActorClock actorClock, SpringBrokerBridge springBrokerBridge) {
        this(new SystemContext(brokerCfg, str, actorClock), springBrokerBridge);
    }

    public void addPartitionListener(PartitionListener partitionListener) {
        this.partitionListeners.add(partitionListener);
    }

    public synchronized CompletableFuture<Broker> start() {
        if (this.startFuture == null) {
            logBrokerStart();
            this.startFuture = new CompletableFuture<>();
            LogUtil.doWithMDC(this.brokerContext.getDiagnosticContext(), this::internalStart);
        }
        return this.startFuture;
    }

    private void logBrokerStart() {
        if (LOG.isInfoEnabled()) {
            BrokerCfg config = getConfig();
            LOG.info("Version: {}", VersionUtil.getVersion());
            LOG.info("Starting broker {} with configuration {}", Integer.valueOf(config.getCluster().getNodeId()), config.toJson());
        }
    }

    private void internalStart() {
        try {
            this.closeProcess = initStart().start();
            this.startFuture.complete(this);
            this.healthCheckService.setBrokerStarted();
        } catch (Exception e) {
            LOG.error("Failed to start broker {}!", Integer.valueOf(getConfig().getCluster().getNodeId()), e);
            UncheckedExecutionException uncheckedExecutionException = new UncheckedExecutionException("Failed to start broker", e);
            this.startFuture.completeExceptionally(uncheckedExecutionException);
            throw uncheckedExecutionException;
        }
    }

    private StartProcess initStart() {
        BrokerCfg config = getConfig();
        NetworkCfg network = config.getNetwork();
        ClusterCfg cluster = config.getCluster();
        BrokerInfo brokerInfo = new BrokerInfo(cluster.getNodeId(), NetUtil.toSocketAddressString(network.getCommandApi().getAdvertisedAddress()));
        StartProcess startProcess = new StartProcess("Broker-" + brokerInfo.getNodeId());
        startProcess.addStep("actor scheduler", this::actorSchedulerStep);
        startProcess.addStep("membership and replication protocol", () -> {
            return atomixCreateStep(config, brokerInfo);
        });
        startProcess.addStep("command api transport", () -> {
            return commandApiTransportStep(cluster, config.getNetwork().getCommandApi(), brokerInfo);
        });
        startProcess.addStep("command api handler", () -> {
            return commandApiHandlerStep(config, brokerInfo);
        });
        startProcess.addStep("subscription api", () -> {
            return subscriptionAPIStep(brokerInfo);
        });
        startProcess.addStep("cluster services", () -> {
            this.atomix.start().join();
        });
        startProcess.addStep("topology manager", () -> {
            return topologyManagerStep(cluster, brokerInfo);
        });
        if (config.getGateway().isEnable()) {
            startProcess.addStep("embedded gateway", () -> {
                this.embeddedGatewayService = new EmbeddedGatewayService(config, this.scheduler, this.atomix);
                return this.embeddedGatewayService;
            });
        }
        startProcess.addStep("monitoring services", () -> {
            return monitoringServerStep(brokerInfo);
        });
        startProcess.addStep("disk space monitor", () -> {
            return diskSpaceMonitorStep(config.getData());
        });
        startProcess.addStep("leader management request handler", () -> {
            return managementRequestStep(brokerInfo);
        });
        startProcess.addStep("zeebe partitions", () -> {
            return partitionsStep(config, cluster, brokerInfo);
        });
        startProcess.addStep("register diskspace usage listeners", this::addDiskSpaceUsageListeners);
        startProcess.addStep("upgrade manager", this::addBrokerAdminService);
        return startProcess;
    }

    private AutoCloseable addBrokerAdminService() {
        Actor brokerAdminServiceImpl = new BrokerAdminServiceImpl(this.partitions);
        scheduleActor(brokerAdminServiceImpl);
        this.brokerAdminService = brokerAdminServiceImpl;
        this.springBrokerBridge.registerBrokerAdminServiceSupplier(() -> {
            return this.brokerAdminService;
        });
        return brokerAdminServiceImpl;
    }

    private AutoCloseable actorSchedulerStep() {
        this.scheduler = this.brokerContext.getScheduler();
        this.scheduler.start();
        return () -> {
            this.scheduler.stop().get(this.brokerContext.getStepTimeout().toMillis(), TimeUnit.MILLISECONDS);
        };
    }

    private AutoCloseable atomixCreateStep(BrokerCfg brokerCfg, BrokerInfo brokerInfo) {
        FileBasedSnapshotStoreFactory fileBasedSnapshotStoreFactory = new FileBasedSnapshotStoreFactory(this.scheduler, brokerInfo.getNodeId());
        this.snapshotStoreSupplier = fileBasedSnapshotStoreFactory;
        this.atomix = AtomixFactory.fromConfiguration(brokerCfg, fileBasedSnapshotStoreFactory);
        return () -> {
            this.atomix.stop().get(this.brokerContext.getStepTimeout().toMillis(), TimeUnit.MILLISECONDS);
        };
    }

    private AutoCloseable commandApiTransportStep(ClusterCfg clusterCfg, SocketBindingCfg socketBindingCfg, BrokerInfo brokerInfo) {
        ManagedMessagingService createMessagingService = createMessagingService(clusterCfg, socketBindingCfg);
        createMessagingService.start().join();
        LOG.debug("Bound command API to {}, using advertised address {} ", createMessagingService.bindingAddresses(), createMessagingService.address());
        this.serverTransport = new TransportFactory(this.scheduler).createServerTransport(brokerInfo.getNodeId(), createMessagingService);
        return () -> {
            this.serverTransport.close();
            createMessagingService.stop().join();
        };
    }

    private ManagedMessagingService createMessagingService(ClusterCfg clusterCfg, SocketBindingCfg socketBindingCfg) {
        MessagingConfig messagingConfig = new MessagingConfig();
        messagingConfig.setInterfaces(List.of(socketBindingCfg.getHost()));
        messagingConfig.setPort(Integer.valueOf(socketBindingCfg.getPort()));
        return new NettyMessagingService(clusterCfg.getClusterName(), Address.from(socketBindingCfg.getAdvertisedHost(), socketBindingCfg.getAdvertisedPort()), messagingConfig);
    }

    private AutoCloseable commandApiHandlerStep(BrokerCfg brokerCfg, BrokerInfo brokerInfo) {
        BackpressureCfg backpressure = brokerCfg.getBackpressure();
        PartitionAwareRequestLimiter newNoopLimiter = PartitionAwareRequestLimiter.newNoopLimiter();
        if (backpressure.isEnabled()) {
            newNoopLimiter = PartitionAwareRequestLimiter.newLimiter(backpressure);
        }
        this.commandHandler = new CommandApiService(this.serverTransport, brokerInfo, newNoopLimiter);
        this.partitionListeners.add(this.commandHandler);
        scheduleActor(this.commandHandler);
        this.diskSpaceUsageListeners.add(this.commandHandler);
        return this.commandHandler;
    }

    private AutoCloseable subscriptionAPIStep(BrokerInfo brokerInfo) {
        Actor subscriptionApiCommandMessageHandlerService = new SubscriptionApiCommandMessageHandlerService(brokerInfo, this.atomix);
        this.partitionListeners.add(subscriptionApiCommandMessageHandlerService);
        scheduleActor(subscriptionApiCommandMessageHandlerService);
        this.diskSpaceUsageListeners.add(subscriptionApiCommandMessageHandlerService);
        return subscriptionApiCommandMessageHandlerService;
    }

    private void addDiskSpaceUsageListeners() {
        List<DiskSpaceUsageListener> list = this.diskSpaceUsageListeners;
        DiskSpaceUsageMonitor diskSpaceUsageMonitor = this.diskSpaceUsageMonitor;
        Objects.requireNonNull(diskSpaceUsageMonitor);
        list.forEach(diskSpaceUsageMonitor::addDiskUsageListener);
    }

    private void scheduleActor(Actor actor) {
        this.brokerContext.getScheduler().submitActor(actor).join(this.brokerContext.getStepTimeout().toSeconds(), TimeUnit.SECONDS);
    }

    private AutoCloseable topologyManagerStep(ClusterCfg clusterCfg, BrokerInfo brokerInfo) {
        this.topologyManager = new TopologyManagerImpl(this.atomix, brokerInfo, clusterCfg);
        this.partitionListeners.add(this.topologyManager);
        scheduleActor(this.topologyManager);
        return this.topologyManager;
    }

    private AutoCloseable monitoringServerStep(BrokerInfo brokerInfo) {
        this.healthCheckService = new BrokerHealthCheckService(brokerInfo, this.atomix);
        this.springBrokerBridge.registerBrokerHealthCheckServiceSupplier(() -> {
            return this.healthCheckService;
        });
        this.partitionListeners.add(this.healthCheckService);
        scheduleActor(this.healthCheckService);
        return () -> {
            this.healthCheckService.close();
        };
    }

    private AutoCloseable diskSpaceMonitorStep(DataCfg dataCfg) {
        this.diskSpaceUsageMonitor = new DiskSpaceUsageMonitor(dataCfg);
        if (!dataCfg.isDiskUsageMonitoringEnabled()) {
            LOG.info("Skipping start of disk space usage monitor, as it is disabled by configuration");
            return () -> {
            };
        }
        scheduleActor(this.diskSpaceUsageMonitor);
        this.diskSpaceUsageListeners.forEach(diskSpaceUsageListener -> {
            this.diskSpaceUsageMonitor.addDiskUsageListener(diskSpaceUsageListener);
        });
        return () -> {
            this.diskSpaceUsageMonitor.close();
        };
    }

    private AutoCloseable managementRequestStep(BrokerInfo brokerInfo) {
        this.managementRequestHandler = new LeaderManagementRequestHandler(brokerInfo, this.atomix);
        scheduleActor(this.managementRequestHandler);
        this.partitionListeners.add(this.managementRequestHandler);
        this.diskSpaceUsageListeners.add(this.managementRequestHandler);
        return this.managementRequestHandler;
    }

    private AutoCloseable partitionsStep(BrokerCfg brokerCfg, ClusterCfg clusterCfg, BrokerInfo brokerInfo) throws Exception {
        RaftPartitionGroup partitionGroup = this.atomix.getPartitionService().getPartitionGroup(AtomixFactory.GROUP_NAME);
        MemberId id = this.atomix.getMembershipService().getLocalMember().id();
        Stream filter = partitionGroup.getPartitions().stream().filter(partition -> {
            return partition.members().contains(id);
        });
        Class<RaftPartition> cls = RaftPartition.class;
        Objects.requireNonNull(RaftPartition.class);
        List<RaftPartition> list = (List) filter.map((v1) -> {
            return r1.cast(v1);
        }).collect(Collectors.toList());
        StartProcess startProcess = new StartProcess("Broker-" + id + " partitions");
        for (RaftPartition raftPartition : list) {
            Integer num = (Integer) raftPartition.id().id();
            startProcess.addStep("partition " + num, () -> {
                PartitionContext partitionContext = new PartitionContext(brokerInfo.getNodeId(), raftPartition, this.partitionListeners, new AtomixPartitionMessagingService(this.atomix.getCommunicationService(), this.atomix.getMembershipService(), raftPartition.members()), this.scheduler, brokerCfg, this.commandHandler, this.snapshotStoreSupplier, createFactory(this.topologyManager, clusterCfg, this.atomix, this.managementRequestHandler), buildExporterRepository(brokerCfg), new PartitionProcessingState(raftPartition));
                ?? zeebePartition = new ZeebePartition(partitionContext, new PartitionTransitionImpl(partitionContext, LEADER_STEPS, FOLLOWER_STEPS));
                scheduleActor(zeebePartition);
                TopologyManagerImpl topologyManagerImpl = this.topologyManager;
                Objects.requireNonNull(topologyManagerImpl);
                zeebePartition.addFailureListener(new PartitionHealthBroadcaster(num, topologyManagerImpl::onHealthChanged));
                this.healthCheckService.registerMonitoredPartition(((Integer) raftPartition.id().id()).intValue(), zeebePartition);
                this.diskSpaceUsageListeners.add(zeebePartition);
                this.partitions.add(zeebePartition);
                return zeebePartition;
            });
        }
        return startProcess.start();
    }

    private ExporterRepository buildExporterRepository(BrokerCfg brokerCfg) {
        ExporterRepository exporterRepository = new ExporterRepository();
        for (Map.Entry<String, ExporterCfg> entry : brokerCfg.getExporters().entrySet()) {
            String key = entry.getKey();
            ExporterCfg value = entry.getValue();
            try {
                exporterRepository.load(key, value);
            } catch (ExporterJarLoadException | ExporterLoadException e) {
                throw new IllegalStateException("Failed to load exporter with configuration: " + value, e);
            }
        }
        return exporterRepository;
    }

    private TypedRecordProcessorsFactory createFactory(TopologyManagerImpl topologyManagerImpl, ClusterCfg clusterCfg, Atomix atomix, LeaderManagementRequestHandler leaderManagementRequestHandler) {
        return (actorControl, mutableZeebeState, processingContext) -> {
            LogStream logStream = processingContext.getLogStream();
            TopologyPartitionListenerImpl topologyPartitionListenerImpl = new TopologyPartitionListenerImpl(actorControl);
            topologyManagerImpl.addTopologyPartitionListener(topologyPartitionListenerImpl);
            DeploymentDistributorImpl deploymentDistributorImpl = new DeploymentDistributorImpl(atomix, topologyPartitionListenerImpl, mutableZeebeState.getDeploymentState(), actorControl);
            SubscriptionCommandSender subscriptionCommandSender = new SubscriptionCommandSender(logStream.getPartitionId(), new PartitionCommandSenderImpl(atomix, topologyManagerImpl, actorControl));
            PushDeploymentRequestHandler pushDeploymentRequestHandler = leaderManagementRequestHandler.getPushDeploymentRequestHandler();
            LongPollingJobNotification longPollingJobNotification = new LongPollingJobNotification(atomix.getEventService());
            int partitionsCount = clusterCfg.getPartitionsCount();
            Objects.requireNonNull(longPollingJobNotification);
            return EngineProcessors.createEngineProcessors(processingContext, partitionsCount, subscriptionCommandSender, deploymentDistributorImpl, pushDeploymentRequestHandler, longPollingJobNotification::onJobsAvailable);
        };
    }

    public BrokerCfg getConfig() {
        return this.brokerContext.getBrokerConfiguration();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        LogUtil.doWithMDC(this.brokerContext.getDiagnosticContext(), () -> {
            if (this.isClosed || this.startFuture == null) {
                return;
            }
            this.startFuture.thenAccept(broker -> {
                this.closeProcess.closeReverse();
                this.isClosed = true;
                LOG.info("Broker shut down.");
            }).join();
        });
    }

    public EmbeddedGatewayService getEmbeddedGatewayService() {
        return this.embeddedGatewayService;
    }

    public Atomix getAtomix() {
        return this.atomix;
    }

    public DiskSpaceUsageMonitor getDiskSpaceUsageMonitor() {
        return this.diskSpaceUsageMonitor;
    }

    public BrokerAdminService getBrokerAdminService() {
        return this.brokerAdminService;
    }

    public SystemContext getBrokerContext() {
        return this.brokerContext;
    }
}
