package io.camunda.zeebe.topology.gossip;

import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.topology.TopologyUpdateNotifier;
import io.camunda.zeebe.topology.metrics.TopologyMetrics;
import io.camunda.zeebe.topology.serializer.ClusterTopologySerializer;
import io.camunda.zeebe.topology.state.ClusterTopology;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/topology/gossip/ClusterTopologyGossiper.class */
public final class ClusterTopologyGossiper implements TopologyUpdateNotifier, ClusterMembershipEventListener, AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterTopologyGossiper.class);
    private static final String SYNC_REQUEST_TOPIC = "cluster-topology-sync";
    private static final String GOSSIP_REQUEST_TOPIC = "cluster-topology-gossip";
    private final ConcurrencyControl executor;
    private final ClusterCommunicationService communicationService;
    private final ClusterMembershipService membershipService;
    private final ClusterTopologyGossiperConfig config;
    private final ClusterTopologySerializer serializer;
    private final Consumer<ClusterTopology> clusterTopologyUpdateHandler;
    private final ClusterTopologyGossipState gossipState = new ClusterTopologyGossipState();
    private final Set<TopologyUpdateNotifier.TopologyUpdateListener> topologyUpdateListeners = new HashSet();
    private List<MemberId> membersToSync = new LinkedList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.camunda.zeebe.topology.gossip.ClusterTopologyGossiper$1, reason: invalid class name */
    /* loaded from: input_file:io/camunda/zeebe/topology/gossip/ClusterTopologyGossiper$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type = new int[ClusterMembershipEvent.Type.values().length];

        static {
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.MEMBER_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.MEMBER_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public ClusterTopologyGossiper(ConcurrencyControl concurrencyControl, ClusterCommunicationService clusterCommunicationService, ClusterMembershipService clusterMembershipService, ClusterTopologySerializer clusterTopologySerializer, ClusterTopologyGossiperConfig clusterTopologyGossiperConfig, Consumer<ClusterTopology> consumer) {
        this.executor = concurrencyControl;
        this.communicationService = clusterCommunicationService;
        this.membershipService = clusterMembershipService;
        this.config = clusterTopologyGossiperConfig;
        this.serializer = clusterTopologySerializer;
        this.clusterTopologyUpdateHandler = consumer;
    }

    public CompletableActorFuture<Void> start() {
        CompletableActorFuture<Void> completableActorFuture = new CompletableActorFuture<>();
        this.executor.run(() -> {
            internalStart();
            completableActorFuture.complete((Object) null);
        });
        return completableActorFuture;
    }

    private void internalStart() {
        scheduleSync();
        registerSyncHandler();
        registerGossipHandler();
        registerMemberAddedListener();
    }

    private void registerMemberAddedListener() {
        this.membershipService.addListener(this);
    }

    private void unregisterMemberListener() {
        this.membershipService.removeListener(this);
    }

    private void registerSyncHandler() {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        ClusterTopologySerializer clusterTopologySerializer = this.serializer;
        Objects.requireNonNull(clusterTopologySerializer);
        Function function = clusterTopologySerializer::decode;
        BiFunction biFunction = this::handleSyncRequest;
        ClusterTopologySerializer clusterTopologySerializer2 = this.serializer;
        Objects.requireNonNull(clusterTopologySerializer2);
        Function function2 = clusterTopologySerializer2::encode;
        ConcurrencyControl concurrencyControl = this.executor;
        Objects.requireNonNull(concurrencyControl);
        clusterCommunicationService.replyTo(SYNC_REQUEST_TOPIC, function, biFunction, function2, concurrencyControl::run);
    }

    private void unregisterSyncHandler() {
        this.communicationService.unsubscribe(SYNC_REQUEST_TOPIC);
    }

    private void registerGossipHandler() {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        ClusterTopologySerializer clusterTopologySerializer = this.serializer;
        Objects.requireNonNull(clusterTopologySerializer);
        Function function = clusterTopologySerializer::decode;
        BiConsumer biConsumer = this::handleGossip;
        ConcurrencyControl concurrencyControl = this.executor;
        Objects.requireNonNull(concurrencyControl);
        clusterCommunicationService.consume(GOSSIP_REQUEST_TOPIC, function, biConsumer, concurrencyControl::run);
    }

    private void unregisterGossipHandler() {
        this.communicationService.unsubscribe(GOSSIP_REQUEST_TOPIC);
    }

    private void scheduleSync() {
        if (this.config.enableSync()) {
            this.executor.schedule(this.config.syncDelay(), this::sync);
        }
    }

    private void sync() {
        refreshMembersToSync();
        if (this.membersToSync.isEmpty()) {
            return;
        }
        sync(this.membersToSync.remove(0));
    }

    private void sync(MemberId memberId) {
        LOGGER.trace("Sending sync request to {}", memberId);
        CompletableFuture<ClusterTopologyGossipState> sendSyncRequest = sendSyncRequest(memberId);
        BiConsumer<? super ClusterTopologyGossipState, ? super Throwable> biConsumer = (clusterTopologyGossipState, th) -> {
            handleSyncResponse(clusterTopologyGossipState, th, memberId);
        };
        ConcurrencyControl concurrencyControl = this.executor;
        Objects.requireNonNull(concurrencyControl);
        sendSyncRequest.whenCompleteAsync(biConsumer, concurrencyControl::run);
    }

    private void refreshMembersToSync() {
        if (this.membersToSync.isEmpty()) {
            this.membersToSync = (List) this.membershipService.getMembers().stream().map((v0) -> {
                return v0.id();
            }).filter(memberId -> {
                return !memberId.equals(this.membershipService.getLocalMember().id());
            }).collect(Collectors.toCollection(LinkedList::new));
            Collections.shuffle(this.membersToSync);
        }
    }

    private void handleSyncResponse(ClusterTopologyGossipState clusterTopologyGossipState, Throwable th, MemberId memberId) {
        if (th == null) {
            update(clusterTopologyGossipState);
        } else {
            LOGGER.warn("Failed to sync with {}", memberId, th);
        }
        scheduleSync();
    }

    private void update(ClusterTopologyGossipState clusterTopologyGossipState) {
        ClusterTopology clusterTopology;
        if (clusterTopologyGossipState.equals(this.gossipState) || (clusterTopology = clusterTopologyGossipState.getClusterTopology()) == null) {
            return;
        }
        this.clusterTopologyUpdateHandler.accept(clusterTopology);
    }

    private void onTopologyUpdated(ClusterTopology clusterTopology) {
        this.gossipState.setClusterTopology(clusterTopology);
        LOGGER.trace("Updated local gossipState to {}", clusterTopology);
        gossip();
        notifyListeners(clusterTopology);
        TopologyMetrics.updateFromTopology(clusterTopology);
    }

    private void notifyListeners(ClusterTopology clusterTopology) {
        this.topologyUpdateListeners.forEach(topologyUpdateListener -> {
            topologyUpdateListener.onTopologyUpdated(clusterTopology);
        });
    }

    private ClusterTopologyGossipState handleSyncRequest(MemberId memberId, ClusterTopologyGossipState clusterTopologyGossipState) {
        LOGGER.trace("Received topology sync request from {} with state {}", memberId, clusterTopologyGossipState);
        update(clusterTopologyGossipState);
        return this.gossipState;
    }

    public void updateClusterTopology(ClusterTopology clusterTopology) {
        if (clusterTopology == null) {
            return;
        }
        this.executor.run(() -> {
            if (clusterTopology.equals(this.gossipState.getClusterTopology())) {
                return;
            }
            onTopologyUpdated(clusterTopology);
        });
    }

    public ActorFuture<ClusterTopology> queryClusterTopology(MemberId memberId) {
        ActorFuture<ClusterTopology> createFuture = this.executor.createFuture();
        CompletableFuture<ClusterTopologyGossipState> sendSyncRequest = sendSyncRequest(memberId);
        BiConsumer<? super ClusterTopologyGossipState, ? super Throwable> biConsumer = (clusterTopologyGossipState, th) -> {
            if (th == null) {
                createFuture.complete(clusterTopologyGossipState.getClusterTopology());
            } else {
                createFuture.completeExceptionally(th);
            }
        };
        ConcurrencyControl concurrencyControl = this.executor;
        Objects.requireNonNull(concurrencyControl);
        sendSyncRequest.whenCompleteAsync(biConsumer, concurrencyControl::run);
        return createFuture;
    }

    private CompletableFuture<ClusterTopologyGossipState> sendSyncRequest(MemberId memberId) {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        ClusterTopologyGossipState clusterTopologyGossipState = this.gossipState;
        ClusterTopologySerializer clusterTopologySerializer = this.serializer;
        Objects.requireNonNull(clusterTopologySerializer);
        Function function = clusterTopologySerializer::encode;
        ClusterTopologySerializer clusterTopologySerializer2 = this.serializer;
        Objects.requireNonNull(clusterTopologySerializer2);
        return clusterCommunicationService.send(SYNC_REQUEST_TOPIC, clusterTopologyGossipState, function, clusterTopologySerializer2::decode, memberId, this.config.syncRequestTimeout());
    }

    private void gossip() {
        refreshMembersToSync();
        if (this.membersToSync.isEmpty()) {
            return;
        }
        List<MemberId> subList = this.membersToSync.subList(0, Math.min(this.config.gossipFanout(), this.membersToSync.size()));
        LOGGER.trace("Gossiping {} to {}", this.gossipState, subList);
        subList.forEach(memberId -> {
            ClusterCommunicationService clusterCommunicationService = this.communicationService;
            ClusterTopologyGossipState clusterTopologyGossipState = this.gossipState;
            ClusterTopologySerializer clusterTopologySerializer = this.serializer;
            Objects.requireNonNull(clusterTopologySerializer);
            clusterCommunicationService.unicast(GOSSIP_REQUEST_TOPIC, clusterTopologyGossipState, clusterTopologySerializer::encode, memberId, true);
        });
        subList.clear();
    }

    private void handleGossip(MemberId memberId, ClusterTopologyGossipState clusterTopologyGossipState) {
        LOGGER.trace("Received {} from {}", this.gossipState, memberId);
        update(clusterTopologyGossipState);
    }

    @Override // io.camunda.zeebe.topology.TopologyUpdateNotifier
    public void addUpdateListener(TopologyUpdateNotifier.TopologyUpdateListener topologyUpdateListener) {
        this.executor.run(() -> {
            this.topologyUpdateListeners.add(topologyUpdateListener);
            if (this.gossipState.getClusterTopology() != null) {
                topologyUpdateListener.onTopologyUpdated(this.gossipState.getClusterTopology());
            }
        });
    }

    @Override // io.camunda.zeebe.topology.TopologyUpdateNotifier
    public void removeUpdateListener(TopologyUpdateNotifier.TopologyUpdateListener topologyUpdateListener) {
        this.executor.run(() -> {
            this.topologyUpdateListeners.remove(topologyUpdateListener);
        });
    }

    public boolean isRelevant(ClusterMembershipEvent clusterMembershipEvent) {
        return clusterMembershipEvent.type() == ClusterMembershipEvent.Type.MEMBER_ADDED || clusterMembershipEvent.type() == ClusterMembershipEvent.Type.MEMBER_REMOVED;
    }

    public void event(ClusterMembershipEvent clusterMembershipEvent) {
        switch (AnonymousClass1.$SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[clusterMembershipEvent.type().ordinal()]) {
            case 1:
                this.executor.run(() -> {
                    if (this.config.enableSync()) {
                        sync(((Member) clusterMembershipEvent.subject()).id());
                    }
                });
                return;
            case 2:
                this.executor.run(() -> {
                    this.membersToSync.remove(((Member) clusterMembershipEvent.subject()).id());
                });
                return;
            default:
                return;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        unregisterMemberListener();
        unregisterSyncHandler();
        unregisterGossipHandler();
    }
}
