package io.camunda.zeebe.broker.client.impl;

import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.camunda.zeebe.broker.client.api.BrokerClusterState;
import io.camunda.zeebe.broker.client.api.BrokerTopologyListener;
import io.camunda.zeebe.broker.client.api.BrokerTopologyManager;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationUpdateNotifier;
import io.camunda.zeebe.dynamic.config.state.ClusterConfiguration;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.scheduler.Actor;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/broker/client/impl/BrokerTopologyManagerImpl.class */
public final class BrokerTopologyManagerImpl extends Actor implements BrokerTopologyManager, ClusterMembershipEventListener, ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener {
    private static final Logger LOG = LoggerFactory.getLogger(BrokerTopologyManagerImpl.class);
    private final Supplier<Set<Member>> membersSupplier;
    private volatile BrokerClusterStateImpl topology = new BrokerClusterStateImpl();
    private volatile ClusterConfiguration clusterConfiguration = ClusterConfiguration.uninitialized();
    private final BrokerClientTopologyMetrics topologyMetrics = new BrokerClientTopologyMetrics();
    private final Set<BrokerTopologyListener> topologyListeners = new HashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.camunda.zeebe.broker.client.impl.BrokerTopologyManagerImpl$1, reason: invalid class name */
    /* loaded from: input_file:io/camunda/zeebe/broker/client/impl/BrokerTopologyManagerImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type = new int[ClusterMembershipEvent.Type.values().length];

        static {
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.MEMBER_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.METADATA_CHANGED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.MEMBER_REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public BrokerTopologyManagerImpl(Supplier<Set<Member>> supplier) {
        this.membersSupplier = supplier;
    }

    @Override // io.camunda.zeebe.broker.client.api.BrokerTopologyManager
    public BrokerClusterState getTopology() {
        return this.topology;
    }

    @Override // io.camunda.zeebe.broker.client.api.BrokerTopologyManager
    public ClusterConfiguration getClusterConfiguration() {
        return this.clusterConfiguration;
    }

    @Override // io.camunda.zeebe.broker.client.api.BrokerTopologyManager
    public void addTopologyListener(BrokerTopologyListener brokerTopologyListener) {
        this.actor.run(() -> {
            this.topologyListeners.add(brokerTopologyListener);
            Stream<R> map = this.topology.getBrokers().stream().map(num -> {
                return MemberId.from(String.valueOf(num));
            });
            Objects.requireNonNull(brokerTopologyListener);
            map.forEach(brokerTopologyListener::brokerAdded);
        });
    }

    @Override // io.camunda.zeebe.broker.client.api.BrokerTopologyManager
    public void removeTopologyListener(BrokerTopologyListener brokerTopologyListener) {
        this.actor.run(() -> {
            this.topologyListeners.remove(brokerTopologyListener);
        });
    }

    private void updateTopology(Consumer<BrokerClusterStateImpl> consumer) {
        this.actor.run(() -> {
            BrokerClusterStateImpl brokerClusterStateImpl = new BrokerClusterStateImpl(this.topology);
            consumer.accept(brokerClusterStateImpl);
            this.topology = brokerClusterStateImpl;
            updateMetrics(brokerClusterStateImpl);
        });
    }

    private void checkForMissingEvents() {
        Set<Member> set = this.membersSupplier.get();
        if (set == null || set.isEmpty()) {
            return;
        }
        updateTopology(brokerClusterStateImpl -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                Member member = (Member) it.next();
                BrokerInfo fromProperties = BrokerInfo.fromProperties(member.properties());
                if (fromProperties != null) {
                    addBroker(brokerClusterStateImpl, member, fromProperties);
                }
            }
        });
    }

    private void addBroker(BrokerClusterStateImpl brokerClusterStateImpl, Member member, BrokerInfo brokerInfo) {
        if (brokerClusterStateImpl.addBrokerIfAbsent(brokerInfo.getNodeId())) {
            this.topologyListeners.forEach(brokerTopologyListener -> {
                brokerTopologyListener.brokerAdded(member.id());
            });
        }
        processProperties(brokerClusterStateImpl, brokerInfo);
    }

    private void removeBroker(BrokerClusterStateImpl brokerClusterStateImpl, Member member, BrokerInfo brokerInfo) {
        brokerClusterStateImpl.removeBroker(brokerInfo.getNodeId());
        this.topologyListeners.forEach(brokerTopologyListener -> {
            brokerTopologyListener.brokerRemoved(member.id());
        });
    }

    public String getName() {
        return "GatewayTopologyManager";
    }

    protected void onActorStarted() {
        checkForMissingEvents();
    }

    public void event(ClusterMembershipEvent clusterMembershipEvent) {
        Member member = (Member) clusterMembershipEvent.subject();
        ClusterMembershipEvent.Type type = clusterMembershipEvent.type();
        BrokerInfo fromProperties = BrokerInfo.fromProperties(member.properties());
        if (fromProperties == null) {
            return;
        }
        switch (AnonymousClass1.$SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[type.ordinal()]) {
            case 1:
                LOG.debug("Received new broker {}.", fromProperties);
                updateTopology(brokerClusterStateImpl -> {
                    addBroker(brokerClusterStateImpl, member, fromProperties);
                });
                return;
            case 2:
                LOG.debug("Received metadata change from Broker {}, partitions {}, terms {} and health {}.", new Object[]{Integer.valueOf(fromProperties.getNodeId()), fromProperties.getPartitionRoles(), fromProperties.getPartitionLeaderTerms(), fromProperties.getPartitionHealthStatuses()});
                updateTopology(brokerClusterStateImpl2 -> {
                    addBroker(brokerClusterStateImpl2, member, fromProperties);
                });
                return;
            case 3:
                LOG.debug("Received broker was removed {}.", fromProperties);
                updateTopology(brokerClusterStateImpl3 -> {
                    removeBroker(brokerClusterStateImpl3, member, fromProperties);
                });
                return;
            default:
                LOG.debug("Received {} for broker {}, do nothing.", type, Integer.valueOf(fromProperties.getNodeId()));
                return;
        }
    }

    private void processProperties(BrokerClusterStateImpl brokerClusterStateImpl, BrokerInfo brokerInfo) {
        if (!brokerClusterStateImpl.isInitialized()) {
            brokerClusterStateImpl.setClusterSize(brokerInfo.getClusterSize());
            brokerClusterStateImpl.setPartitionsCount(brokerInfo.getPartitionsCount());
            brokerClusterStateImpl.setReplicationFactor(brokerInfo.getReplicationFactor());
        }
        int nodeId = brokerInfo.getNodeId();
        brokerClusterStateImpl.syncPartitions(nodeId, brokerInfo.getPartitionRoles().keySet());
        Objects.requireNonNull(brokerClusterStateImpl);
        brokerInfo.consumePartitions(brokerClusterStateImpl::addPartitionIfAbsent, (num, j) -> {
            brokerClusterStateImpl.setPartitionLeader(num.intValue(), nodeId, j);
        }, i -> {
            brokerClusterStateImpl.addPartitionFollower(i, nodeId);
        }, i2 -> {
            brokerClusterStateImpl.addPartitionInactive(i2, nodeId);
        });
        brokerInfo.consumePartitionsHealth((num2, partitionHealthStatus) -> {
            brokerClusterStateImpl.setPartitionHealthStatus(nodeId, num2.intValue(), partitionHealthStatus);
        });
        String commandApiAddress = brokerInfo.getCommandApiAddress();
        if (commandApiAddress != null) {
            brokerClusterStateImpl.setBrokerAddressIfPresent(nodeId, commandApiAddress);
        }
        brokerClusterStateImpl.setBrokerVersionIfPresent(nodeId, brokerInfo.getVersion());
    }

    private void updateMetrics(BrokerClusterStateImpl brokerClusterStateImpl) {
        brokerClusterStateImpl.getPartitions().forEach(num -> {
            int leaderForPartition = brokerClusterStateImpl.getLeaderForPartition(num.intValue());
            if (leaderForPartition != -2) {
                this.topologyMetrics.setLeaderForPartition(num.intValue(), leaderForPartition);
            }
            Set<Integer> followersForPartition = brokerClusterStateImpl.getFollowersForPartition(num.intValue());
            if (followersForPartition != null) {
                followersForPartition.forEach(num -> {
                    this.topologyMetrics.setFollower(num.intValue(), num.intValue());
                });
            }
        });
    }

    public void onClusterConfigurationUpdated(ClusterConfiguration clusterConfiguration) {
        if (clusterConfiguration.isUninitialized()) {
            return;
        }
        this.clusterConfiguration = clusterConfiguration;
        updateTopology(brokerClusterStateImpl -> {
            int clusterSize = clusterConfiguration.clusterSize();
            int partitionCount = clusterConfiguration.partitionCount();
            Integer minReplicationFactor = clusterConfiguration.minReplicationFactor();
            if (clusterSize == brokerClusterStateImpl.getClusterSize() && partitionCount == brokerClusterStateImpl.getPartitionsCount() && minReplicationFactor.intValue() == brokerClusterStateImpl.getReplicationFactor()) {
                return;
            }
            LOG.debug("Updating topology with clusterSize {}, partitionsCount {} and replicationFactor {}", new Object[]{Integer.valueOf(clusterSize), Integer.valueOf(partitionCount), minReplicationFactor});
            brokerClusterStateImpl.setClusterSize(clusterSize);
            brokerClusterStateImpl.setPartitionsCount(partitionCount);
            brokerClusterStateImpl.setReplicationFactor(minReplicationFactor.intValue());
        });
    }
}
