package io.camunda.zeebe.dynamic.config;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationInitializer;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationManager;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationUpdateNotifier;
import io.camunda.zeebe.dynamic.config.api.ClusterConfigurationManagementRequestsHandler;
import io.camunda.zeebe.dynamic.config.api.ClusterConfigurationRequestServer;
import io.camunda.zeebe.dynamic.config.changes.ClusterChangeExecutor;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeAppliersImpl;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeCoordinator;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeCoordinatorImpl;
import io.camunda.zeebe.dynamic.config.changes.NoopClusterMembershipChangeExecutor;
import io.camunda.zeebe.dynamic.config.changes.PartitionChangeExecutor;
import io.camunda.zeebe.dynamic.config.changes.PartitionScalingChangeExecutor;
import io.camunda.zeebe.dynamic.config.gossip.ClusterConfigurationGossiper;
import io.camunda.zeebe.dynamic.config.gossip.ClusterConfigurationGossiperConfig;
import io.camunda.zeebe.dynamic.config.metrics.TopologyManagerMetrics;
import io.camunda.zeebe.dynamic.config.metrics.TopologyMetrics;
import io.camunda.zeebe.dynamic.config.serializer.ProtoBufSerializer;
import io.camunda.zeebe.dynamic.config.state.ClusterConfiguration;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.AsyncClosable;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.FileUtil;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/* loaded from: input_file:io/camunda/zeebe/dynamic/config/ClusterConfigurationManagerService.class */
public final class ClusterConfigurationManagerService implements ClusterConfigurationUpdateNotifier, AsyncClosable {
    private static final String COORDINATOR_ID = "0";
    private static final String TOPOLOGY_FILE_NAME = ".topology.meta";
    private final ClusterConfigurationManagerImpl clusterConfigurationManager;
    private final ClusterConfigurationGossiper clusterConfigurationGossiper;
    private final ClusterMembershipService memberShipService;
    private final boolean isCoordinator;
    private final PersistedClusterConfiguration persistedClusterConfiguration;
    private final Path configurationFile;
    private final ConfigurationChangeCoordinator configurationChangeCoordinator;
    private final ClusterConfigurationRequestServer configurationRequestServer;
    private final Actor gossipActor;
    private final Actor managerActor;
    private final ClusterChangeExecutor clusterChangeExecutor;
    private final TopologyMetrics topologyMetrics;
    private final TopologyManagerMetrics topologyManagerMetrics;

    public ClusterConfigurationManagerService(Path path, ClusterCommunicationService clusterCommunicationService, ClusterMembershipService clusterMembershipService, ClusterConfigurationGossiperConfig clusterConfigurationGossiperConfig, boolean z, ClusterChangeExecutor clusterChangeExecutor, MeterRegistry meterRegistry) {
        this.clusterChangeExecutor = clusterChangeExecutor;
        this.memberShipService = clusterMembershipService;
        this.topologyMetrics = new TopologyMetrics(meterRegistry);
        this.topologyManagerMetrics = new TopologyManagerMetrics(meterRegistry);
        try {
            FileUtil.ensureDirectoryExists(path);
            MemberId id = clusterMembershipService.getLocalMember().id();
            this.configurationFile = path.resolve(TOPOLOGY_FILE_NAME);
            this.persistedClusterConfiguration = PersistedClusterConfiguration.ofFile(this.configurationFile, new ProtoBufSerializer());
            this.gossipActor = new Actor(this) { // from class: io.camunda.zeebe.dynamic.config.ClusterConfigurationManagerService.1
            };
            this.managerActor = new Actor(this) { // from class: io.camunda.zeebe.dynamic.config.ClusterConfigurationManagerService.2
            };
            this.clusterConfigurationManager = new ClusterConfigurationManagerImpl(this.managerActor, id, this.persistedClusterConfiguration, this.topologyManagerMetrics);
            Actor actor = this.gossipActor;
            ProtoBufSerializer protoBufSerializer = new ProtoBufSerializer();
            ClusterConfigurationManagerImpl clusterConfigurationManagerImpl = this.clusterConfigurationManager;
            Objects.requireNonNull(clusterConfigurationManagerImpl);
            this.clusterConfigurationGossiper = new ClusterConfigurationGossiper(actor, clusterCommunicationService, clusterMembershipService, protoBufSerializer, clusterConfigurationGossiperConfig, clusterConfigurationManagerImpl::onGossipReceived, this.topologyMetrics);
            this.isCoordinator = ((String) id.id()).equals(COORDINATOR_ID);
            this.configurationChangeCoordinator = new ConfigurationChangeCoordinatorImpl(this.clusterConfigurationManager, id, this.managerActor);
            this.configurationRequestServer = new ClusterConfigurationRequestServer(clusterCommunicationService, new ProtoBufSerializer(), new ClusterConfigurationManagementRequestsHandler(this.configurationChangeCoordinator, id, this.managerActor, z));
            ClusterConfigurationManagerImpl clusterConfigurationManagerImpl2 = this.clusterConfigurationManager;
            ClusterConfigurationGossiper clusterConfigurationGossiper = this.clusterConfigurationGossiper;
            Objects.requireNonNull(clusterConfigurationGossiper);
            clusterConfigurationManagerImpl2.setConfigurationGossiper(clusterConfigurationGossiper::updateClusterConfiguration);
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to create data directory", e);
        }
    }

    private ClusterConfigurationInitializer getNonCoordinatorInitializer(ClusterMembershipService clusterMembershipService, StaticConfiguration staticConfiguration) {
        List<MemberId> list = staticConfiguration.clusterMembers().stream().filter(memberId -> {
            return !memberId.equals(staticConfiguration.localMemberId());
        }).toList();
        ClusterConfigurationInitializer.FileInitializer fileInitializer = new ClusterConfigurationInitializer.FileInitializer(this.configurationFile, new ProtoBufSerializer());
        ClusterConfigurationGossiper clusterConfigurationGossiper = this.clusterConfigurationGossiper;
        Actor actor = this.managerActor;
        ClusterConfigurationGossiper clusterConfigurationGossiper2 = this.clusterConfigurationGossiper;
        Objects.requireNonNull(clusterConfigurationGossiper2);
        ClusterConfigurationInitializer recover = fileInitializer.recover(ClusterConfigurationInitializer.InitializerError.PersistedConfigurationIsBroken.class, new ClusterConfigurationInitializer.SyncInitializer(clusterConfigurationGossiper, list, actor, clusterConfigurationGossiper2::queryClusterConfiguration));
        ClusterConfigurationGossiper clusterConfigurationGossiper3 = this.clusterConfigurationGossiper;
        PersistedClusterConfiguration persistedClusterConfiguration = this.persistedClusterConfiguration;
        ClusterConfigurationGossiper clusterConfigurationGossiper4 = this.clusterConfigurationGossiper;
        Objects.requireNonNull(clusterConfigurationGossiper4);
        return recover.orThen(new ClusterConfigurationInitializer.GossipInitializer(clusterConfigurationGossiper3, persistedClusterConfiguration, clusterConfigurationGossiper4::updateClusterConfiguration, this.managerActor)).andThen(new ExporterStateInitializer(staticConfiguration.partitionConfig().exporting().exporters().keySet(), staticConfiguration.localMemberId(), this.managerActor)).andThen(new RoutingStateInitializer(staticConfiguration.enablePartitionScaling(), staticConfiguration.partitionCount()));
    }

    private ClusterConfigurationInitializer getCoordinatorInitializer(StaticConfiguration staticConfiguration) {
        List<MemberId> list = staticConfiguration.clusterMembers().stream().filter(memberId -> {
            return !memberId.equals(staticConfiguration.localMemberId());
        }).toList();
        ClusterConfigurationInitializer.FileInitializer fileInitializer = new ClusterConfigurationInitializer.FileInitializer(this.configurationFile, new ProtoBufSerializer());
        ClusterConfigurationGossiper clusterConfigurationGossiper = this.clusterConfigurationGossiper;
        Actor actor = this.managerActor;
        ClusterConfigurationGossiper clusterConfigurationGossiper2 = this.clusterConfigurationGossiper;
        Objects.requireNonNull(clusterConfigurationGossiper2);
        return fileInitializer.orThen(new ClusterConfigurationInitializer.SyncInitializer(clusterConfigurationGossiper, list, actor, clusterConfigurationGossiper2::queryClusterConfiguration)).orThen(new ClusterConfigurationInitializer.StaticInitializer(staticConfiguration)).andThen(new ExporterStateInitializer(staticConfiguration.partitionConfig().exporting().exporters().keySet(), staticConfiguration.localMemberId(), this.managerActor)).andThen(new RoutingStateInitializer(staticConfiguration.enablePartitionScaling(), staticConfiguration.partitionCount()));
    }

    public ActorFuture<Void> start(ActorSchedulingService actorSchedulingService, StaticConfiguration staticConfiguration) {
        return startGossiper(actorSchedulingService).andThen(() -> {
            return startClusterTopologyServices(actorSchedulingService, staticConfiguration);
        }, (v0) -> {
            v0.run();
        });
    }

    private ActorFuture<Void> startGossiper(ActorSchedulingService actorSchedulingService) {
        ActorFuture submitActor = actorSchedulingService.submitActor(this.gossipActor);
        ClusterConfigurationGossiper clusterConfigurationGossiper = this.clusterConfigurationGossiper;
        Objects.requireNonNull(clusterConfigurationGossiper);
        return submitActor.andThen(clusterConfigurationGossiper::start, (v0) -> {
            v0.run();
        });
    }

    private CompletableActorFuture<Void> startClusterTopologyServices(ActorSchedulingService actorSchedulingService, StaticConfiguration staticConfiguration) {
        CompletableActorFuture<Void> completableActorFuture = new CompletableActorFuture<>();
        ClusterConfigurationInitializer coordinatorInitializer = this.isCoordinator ? getCoordinatorInitializer(staticConfiguration) : getNonCoordinatorInitializer(this.memberShipService, staticConfiguration);
        this.configurationRequestServer.start();
        actorSchedulingService.submitActor(this.managerActor).onComplete((r6, th) -> {
            if (th != null) {
                completableActorFuture.completeExceptionally(th);
            } else {
                this.clusterConfigurationManager.start(coordinatorInitializer).onComplete(completableActorFuture);
            }
        });
        return completableActorFuture;
    }

    public ActorFuture<ClusterConfiguration> getClusterTopology() {
        return this.clusterConfigurationManager.getClusterConfiguration();
    }

    public Optional<ConfigurationChangeCoordinator> getTopologyChangeCoordinator() {
        return Optional.ofNullable(this.configurationChangeCoordinator);
    }

    public ActorFuture<Void> closeAsync() {
        if (this.configurationRequestServer != null) {
            this.configurationRequestServer.close();
        }
        this.clusterConfigurationGossiper.close();
        ActorFuture closeAsync = this.managerActor.closeAsync();
        Actor actor = this.gossipActor;
        Objects.requireNonNull(actor);
        return closeAsync.andThen(actor::closeAsync, (v0) -> {
            v0.run();
        });
    }

    public void registerPartitionChangeExecutors(PartitionChangeExecutor partitionChangeExecutor, PartitionScalingChangeExecutor partitionScalingChangeExecutor) {
        this.clusterConfigurationManager.registerTopologyChangeAppliers(new ConfigurationChangeAppliersImpl(partitionChangeExecutor, new NoopClusterMembershipChangeExecutor(), partitionScalingChangeExecutor, this.clusterChangeExecutor));
    }

    public void removePartitionChangeExecutor() {
        this.clusterConfigurationManager.removeTopologyChangeAppliers();
    }

    public void registerTopologyChangedListener(ClusterConfigurationManager.InconsistentConfigurationListener inconsistentConfigurationListener) {
        this.clusterConfigurationManager.registerTopologyChangedListener(inconsistentConfigurationListener);
    }

    public void removeTopologyChangedListener() {
        this.clusterConfigurationManager.removeTopologyChangedListener();
    }

    @Override // io.camunda.zeebe.dynamic.config.ClusterConfigurationUpdateNotifier
    public void addUpdateListener(ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener clusterConfigurationUpdateListener) {
        this.clusterConfigurationGossiper.addUpdateListener(clusterConfigurationUpdateListener);
    }

    @Override // io.camunda.zeebe.dynamic.config.ClusterConfigurationUpdateNotifier
    public void removeUpdateListener(ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener clusterConfigurationUpdateListener) {
        this.clusterConfigurationGossiper.removeUpdateListener(clusterConfigurationUpdateListener);
    }
}
