package io.camunda.zeebe.dynamic.config.gossip;

import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationUpdateNotifier;
import io.camunda.zeebe.dynamic.config.metrics.TopologyMetrics;
import io.camunda.zeebe.dynamic.config.serializer.ClusterConfigurationSerializer;
import io.camunda.zeebe.dynamic.config.state.ClusterConfiguration;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/dynamic/config/gossip/ClusterConfigurationGossiper.class */
public final class ClusterConfigurationGossiper implements ClusterConfigurationUpdateNotifier, ClusterMembershipEventListener, AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterConfigurationGossiper.class);
    private static final String SYNC_REQUEST_TOPIC = "cluster-topology-sync";
    private static final String GOSSIP_REQUEST_TOPIC = "cluster-topology-gossip";
    private final ConcurrencyControl executor;
    private final ClusterCommunicationService communicationService;
    private final ClusterMembershipService membershipService;
    private final ClusterConfigurationGossiperConfig config;
    private final ClusterConfigurationSerializer serializer;
    private final Consumer<ClusterConfiguration> clusterConfigurationUpdateHandler;
    private final ClusterConfigurationGossipState gossipState = new ClusterConfigurationGossipState();
    private final Set<ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener> configurationUpdateListeners = new HashSet();
    private List<MemberId> membersToSync = new LinkedList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.camunda.zeebe.dynamic.config.gossip.ClusterConfigurationGossiper$1, reason: invalid class name */
    /* loaded from: input_file:io/camunda/zeebe/dynamic/config/gossip/ClusterConfigurationGossiper$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type = new int[ClusterMembershipEvent.Type.values().length];

        static {
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.MEMBER_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[ClusterMembershipEvent.Type.MEMBER_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public ClusterConfigurationGossiper(ConcurrencyControl concurrencyControl, ClusterCommunicationService clusterCommunicationService, ClusterMembershipService clusterMembershipService, ClusterConfigurationSerializer clusterConfigurationSerializer, ClusterConfigurationGossiperConfig clusterConfigurationGossiperConfig, Consumer<ClusterConfiguration> consumer) {
        this.executor = concurrencyControl;
        this.communicationService = clusterCommunicationService;
        this.membershipService = clusterMembershipService;
        this.config = clusterConfigurationGossiperConfig;
        this.serializer = clusterConfigurationSerializer;
        this.clusterConfigurationUpdateHandler = consumer;
    }

    public CompletableActorFuture<Void> start() {
        CompletableActorFuture<Void> completableActorFuture = new CompletableActorFuture<>();
        this.executor.run(() -> {
            internalStart();
            completableActorFuture.complete((Object) null);
        });
        return completableActorFuture;
    }

    private void internalStart() {
        scheduleSync();
        registerSyncHandler();
        registerGossipHandler();
        registerMemberAddedListener();
    }

    private void registerMemberAddedListener() {
        this.membershipService.addListener(this);
    }

    private void unregisterMemberListener() {
        this.membershipService.removeListener(this);
    }

    private void registerSyncHandler() {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        ClusterConfigurationSerializer clusterConfigurationSerializer = this.serializer;
        Objects.requireNonNull(clusterConfigurationSerializer);
        Function function = clusterConfigurationSerializer::decode;
        BiFunction biFunction = this::handleSyncRequest;
        ClusterConfigurationSerializer clusterConfigurationSerializer2 = this.serializer;
        Objects.requireNonNull(clusterConfigurationSerializer2);
        Function function2 = clusterConfigurationSerializer2::encode;
        ConcurrencyControl concurrencyControl = this.executor;
        Objects.requireNonNull(concurrencyControl);
        clusterCommunicationService.replyTo(SYNC_REQUEST_TOPIC, function, biFunction, function2, concurrencyControl::run);
    }

    private void unregisterSyncHandler() {
        this.communicationService.unsubscribe(SYNC_REQUEST_TOPIC);
    }

    private void registerGossipHandler() {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        ClusterConfigurationSerializer clusterConfigurationSerializer = this.serializer;
        Objects.requireNonNull(clusterConfigurationSerializer);
        Function function = clusterConfigurationSerializer::decode;
        BiConsumer biConsumer = this::handleGossip;
        ConcurrencyControl concurrencyControl = this.executor;
        Objects.requireNonNull(concurrencyControl);
        clusterCommunicationService.consume(GOSSIP_REQUEST_TOPIC, function, biConsumer, concurrencyControl::run);
    }

    private void unregisterGossipHandler() {
        this.communicationService.unsubscribe(GOSSIP_REQUEST_TOPIC);
    }

    private void scheduleSync() {
        if (this.config.enableSync()) {
            this.executor.schedule(this.config.syncDelay(), this::sync);
        }
    }

    private void sync() {
        refreshMembersToSync();
        if (this.membersToSync.isEmpty()) {
            return;
        }
        sync(this.membersToSync.remove(0));
    }

    private void sync(MemberId memberId) {
        LOGGER.trace("Sending sync request to {}", memberId);
        CompletableFuture<ClusterConfigurationGossipState> sendSyncRequest = sendSyncRequest(memberId);
        BiConsumer<? super ClusterConfigurationGossipState, ? super Throwable> biConsumer = (clusterConfigurationGossipState, th) -> {
            handleSyncResponse(clusterConfigurationGossipState, th, memberId);
        };
        ConcurrencyControl concurrencyControl = this.executor;
        Objects.requireNonNull(concurrencyControl);
        sendSyncRequest.whenCompleteAsync(biConsumer, concurrencyControl::run);
    }

    private void refreshMembersToSync() {
        if (this.membersToSync.isEmpty()) {
            this.membersToSync = (List) this.membershipService.getMembers().stream().map((v0) -> {
                return v0.id();
            }).filter(memberId -> {
                return !memberId.equals(this.membershipService.getLocalMember().id());
            }).collect(Collectors.toCollection(LinkedList::new));
            Collections.shuffle(this.membersToSync);
        }
    }

    private void handleSyncResponse(ClusterConfigurationGossipState clusterConfigurationGossipState, Throwable th, MemberId memberId) {
        if (th == null) {
            update(clusterConfigurationGossipState);
        } else {
            LOGGER.warn("Failed to sync with {}", memberId, th);
        }
        scheduleSync();
    }

    private void update(ClusterConfigurationGossipState clusterConfigurationGossipState) {
        ClusterConfiguration clusterConfiguration;
        if (clusterConfigurationGossipState.equals(this.gossipState) || (clusterConfiguration = clusterConfigurationGossipState.getClusterConfiguration()) == null) {
            return;
        }
        this.clusterConfigurationUpdateHandler.accept(clusterConfiguration);
    }

    private void onConfigurationUpdated(ClusterConfiguration clusterConfiguration) {
        this.gossipState.setClusterConfiguration(clusterConfiguration);
        LOGGER.trace("Updated local gossipState to {}", clusterConfiguration);
        gossip();
        notifyListeners(clusterConfiguration);
        TopologyMetrics.updateFromTopology(clusterConfiguration);
    }

    private void notifyListeners(ClusterConfiguration clusterConfiguration) {
        this.configurationUpdateListeners.forEach(clusterConfigurationUpdateListener -> {
            clusterConfigurationUpdateListener.onClusterConfigurationUpdated(clusterConfiguration);
        });
    }

    private ClusterConfigurationGossipState handleSyncRequest(MemberId memberId, ClusterConfigurationGossipState clusterConfigurationGossipState) {
        LOGGER.trace("Received configuration sync request from {} with state {}", memberId, clusterConfigurationGossipState);
        update(clusterConfigurationGossipState);
        return this.gossipState;
    }

    public void updateClusterConfiguration(ClusterConfiguration clusterConfiguration) {
        if (clusterConfiguration == null) {
            return;
        }
        this.executor.run(() -> {
            if (clusterConfiguration.equals(this.gossipState.getClusterConfiguration())) {
                return;
            }
            onConfigurationUpdated(clusterConfiguration);
        });
    }

    public ActorFuture<ClusterConfiguration> queryClusterConfiguration(MemberId memberId) {
        ActorFuture<ClusterConfiguration> createFuture = this.executor.createFuture();
        CompletableFuture<ClusterConfigurationGossipState> sendSyncRequest = sendSyncRequest(memberId);
        BiConsumer<? super ClusterConfigurationGossipState, ? super Throwable> biConsumer = (clusterConfigurationGossipState, th) -> {
            if (th == null) {
                createFuture.complete(clusterConfigurationGossipState.getClusterConfiguration());
            } else {
                createFuture.completeExceptionally(th);
            }
        };
        ConcurrencyControl concurrencyControl = this.executor;
        Objects.requireNonNull(concurrencyControl);
        sendSyncRequest.whenCompleteAsync(biConsumer, concurrencyControl::run);
        return createFuture;
    }

    private CompletableFuture<ClusterConfigurationGossipState> sendSyncRequest(MemberId memberId) {
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        ClusterConfigurationGossipState clusterConfigurationGossipState = this.gossipState;
        ClusterConfigurationSerializer clusterConfigurationSerializer = this.serializer;
        Objects.requireNonNull(clusterConfigurationSerializer);
        Function function = clusterConfigurationSerializer::encode;
        ClusterConfigurationSerializer clusterConfigurationSerializer2 = this.serializer;
        Objects.requireNonNull(clusterConfigurationSerializer2);
        return clusterCommunicationService.send(SYNC_REQUEST_TOPIC, clusterConfigurationGossipState, function, clusterConfigurationSerializer2::decode, memberId, this.config.syncRequestTimeout());
    }

    private void gossip() {
        refreshMembersToSync();
        if (this.membersToSync.isEmpty()) {
            return;
        }
        List<MemberId> subList = this.membersToSync.subList(0, Math.min(this.config.gossipFanout(), this.membersToSync.size()));
        LOGGER.trace("Gossiping {} to {}", this.gossipState, subList);
        subList.forEach(memberId -> {
            ClusterCommunicationService clusterCommunicationService = this.communicationService;
            ClusterConfigurationGossipState clusterConfigurationGossipState = this.gossipState;
            ClusterConfigurationSerializer clusterConfigurationSerializer = this.serializer;
            Objects.requireNonNull(clusterConfigurationSerializer);
            clusterCommunicationService.unicast(GOSSIP_REQUEST_TOPIC, clusterConfigurationGossipState, clusterConfigurationSerializer::encode, memberId, true);
        });
        subList.clear();
    }

    private void handleGossip(MemberId memberId, ClusterConfigurationGossipState clusterConfigurationGossipState) {
        LOGGER.trace("Received {} from {}", this.gossipState, memberId);
        update(clusterConfigurationGossipState);
    }

    @Override // io.camunda.zeebe.dynamic.config.ClusterConfigurationUpdateNotifier
    public void addUpdateListener(ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener clusterConfigurationUpdateListener) {
        this.executor.run(() -> {
            this.configurationUpdateListeners.add(clusterConfigurationUpdateListener);
            if (this.gossipState.getClusterConfiguration() != null) {
                clusterConfigurationUpdateListener.onClusterConfigurationUpdated(this.gossipState.getClusterConfiguration());
            }
        });
    }

    @Override // io.camunda.zeebe.dynamic.config.ClusterConfigurationUpdateNotifier
    public void removeUpdateListener(ClusterConfigurationUpdateNotifier.ClusterConfigurationUpdateListener clusterConfigurationUpdateListener) {
        this.executor.run(() -> {
            this.configurationUpdateListeners.remove(clusterConfigurationUpdateListener);
        });
    }

    public boolean isRelevant(ClusterMembershipEvent clusterMembershipEvent) {
        return clusterMembershipEvent.type() == ClusterMembershipEvent.Type.MEMBER_ADDED || clusterMembershipEvent.type() == ClusterMembershipEvent.Type.MEMBER_REMOVED;
    }

    public void event(ClusterMembershipEvent clusterMembershipEvent) {
        switch (AnonymousClass1.$SwitchMap$io$atomix$cluster$ClusterMembershipEvent$Type[clusterMembershipEvent.type().ordinal()]) {
            case 1:
                this.executor.run(() -> {
                    if (this.config.enableSync()) {
                        sync(((Member) clusterMembershipEvent.subject()).id());
                    }
                });
                return;
            case 2:
                this.executor.run(() -> {
                    this.membersToSync.remove(((Member) clusterMembershipEvent.subject()).id());
                });
                return;
            default:
                return;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        unregisterMemberListener();
        unregisterSyncHandler();
        unregisterGossipHandler();
    }
}
