package io.camunda.zeebe.topology;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.topology.changes.TopologyChangeAppliers;
import io.camunda.zeebe.topology.metrics.TopologyMetrics;
import io.camunda.zeebe.topology.state.ClusterTopology;
import io.camunda.zeebe.topology.state.MemberState;
import io.camunda.zeebe.topology.state.TopologyChangeOperation;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.ExponentialBackoffRetryDelay;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/topology/ClusterTopologyManagerImpl.class */
public final class ClusterTopologyManagerImpl implements ClusterTopologyManager {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterTopologyManagerImpl.class);
    private static final Duration MIN_RETRY_DELAY = Duration.ofSeconds(10);
    private static final Duration MAX_RETRY_DELAY = Duration.ofMinutes(1);
    private final ConcurrencyControl executor;
    private final PersistedClusterTopology persistedClusterTopology;
    private Consumer<ClusterTopology> topologyGossiper;
    private final ActorFuture<Void> startFuture;
    private TopologyChangeAppliers changeAppliers;
    private final MemberId localMemberId;
    private boolean onGoingTopologyChangeOperation;
    private boolean shouldRetry;
    private final ExponentialBackoffRetryDelay backoffRetry;
    private boolean initialized;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClusterTopologyManagerImpl(ConcurrencyControl concurrencyControl, MemberId memberId, PersistedClusterTopology persistedClusterTopology) {
        this(concurrencyControl, memberId, persistedClusterTopology, MIN_RETRY_DELAY, MAX_RETRY_DELAY);
    }

    ClusterTopologyManagerImpl(ConcurrencyControl concurrencyControl, MemberId memberId, PersistedClusterTopology persistedClusterTopology, Duration duration, Duration duration2) {
        this.onGoingTopologyChangeOperation = false;
        this.shouldRetry = false;
        this.initialized = false;
        this.executor = concurrencyControl;
        this.persistedClusterTopology = persistedClusterTopology;
        this.startFuture = concurrencyControl.createFuture();
        this.localMemberId = memberId;
        this.backoffRetry = new ExponentialBackoffRetryDelay(duration2, duration);
    }

    @Override // io.camunda.zeebe.topology.ClusterTopologyManager
    public ActorFuture<ClusterTopology> getClusterTopology() {
        ActorFuture<ClusterTopology> createFuture = this.executor.createFuture();
        this.executor.run(() -> {
            createFuture.complete(this.persistedClusterTopology.getTopology());
        });
        return createFuture;
    }

    @Override // io.camunda.zeebe.topology.ClusterTopologyManager
    public ActorFuture<ClusterTopology> updateClusterTopology(UnaryOperator<ClusterTopology> unaryOperator) {
        ActorFuture<ClusterTopology> createFuture = this.executor.createFuture();
        this.executor.run(() -> {
            try {
                ClusterTopology clusterTopology = (ClusterTopology) unaryOperator.apply(this.persistedClusterTopology.getTopology());
                Either<Exception, ClusterTopology> updateLocalTopology = updateLocalTopology(clusterTopology);
                Consumer consumer = clusterTopology2 -> {
                    createFuture.complete(clusterTopology2);
                    applyTopologyChangeOperation(clusterTopology);
                };
                Objects.requireNonNull(createFuture);
                updateLocalTopology.ifRightOrLeft(consumer, (v1) -> {
                    r2.completeExceptionally(v1);
                });
            } catch (Exception e) {
                LOG.error("Failed to update cluster topology", e);
                createFuture.completeExceptionally(e);
            }
        });
        return createFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ActorFuture<Void> start(TopologyInitializer topologyInitializer) {
        this.executor.run(() -> {
            if (this.startFuture.isDone()) {
                return;
            }
            initialize(topologyInitializer);
        });
        return this.startFuture;
    }

    public void setTopologyGossiper(Consumer<ClusterTopology> consumer) {
        this.topologyGossiper = consumer;
    }

    private void initialize(TopologyInitializer topologyInitializer) {
        topologyInitializer.initialize().onComplete((clusterTopology, th) -> {
            if (th != null) {
                LOG.error("Failed to initialize topology", th);
                this.startFuture.completeExceptionally(th);
                return;
            }
            if (clusterTopology.isUninitialized()) {
                LOG.error("Expected to initialize topology, but got uninitialized topology");
                this.startFuture.completeExceptionally(new IllegalStateException("Expected to initialize topology, but got uninitialized topology"));
                return;
            }
            try {
                this.persistedClusterTopology.update(clusterTopology.merge(this.persistedClusterTopology.getTopology()));
                LOG.debug("Initialized topology '{}'", this.persistedClusterTopology.getTopology());
                this.topologyGossiper.accept(this.persistedClusterTopology.getTopology());
                setStarted();
            } catch (IOException e) {
                this.startFuture.completeExceptionally("Failed to start update cluster topology", e);
            }
        });
    }

    private void setStarted() {
        if (this.startFuture.isDone()) {
            return;
        }
        this.initialized = true;
        this.startFuture.complete((Object) null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onGossipReceived(ClusterTopology clusterTopology) {
        this.executor.run(() -> {
            if (!this.initialized) {
                LOG.trace("Received topology {} before ClusterTopologyManager is initialized.", clusterTopology);
                this.topologyGossiper.accept(clusterTopology);
                return;
            }
            if (clusterTopology != null) {
                try {
                    ClusterTopology merge = this.persistedClusterTopology.getTopology().merge(clusterTopology);
                    if (!merge.equals(this.persistedClusterTopology.getTopology())) {
                        LOG.debug("Received new topology {}. Updating local topology to {}", clusterTopology, merge);
                        this.persistedClusterTopology.update(merge);
                        this.topologyGossiper.accept(merge);
                        applyTopologyChangeOperation(merge);
                    }
                } catch (IOException e) {
                    LOG.warn("Failed to process cluster topology received via gossip. '{}'", clusterTopology, e);
                }
            }
        });
    }

    private boolean shouldApplyTopologyChangeOperation(ClusterTopology clusterTopology) {
        return (!this.onGoingTopologyChangeOperation || this.shouldRetry) && clusterTopology.pendingChangesFor(this.localMemberId).isPresent() && this.changeAppliers != null;
    }

    private void applyTopologyChangeOperation(ClusterTopology clusterTopology) {
        if (shouldApplyTopologyChangeOperation(clusterTopology)) {
            this.onGoingTopologyChangeOperation = true;
            this.shouldRetry = false;
            TopologyChangeOperation orElseThrow = clusterTopology.pendingChangesFor(this.localMemberId).orElseThrow();
            TopologyMetrics.OperationObserver observeOperation = TopologyMetrics.observeOperation(orElseThrow);
            LOG.info("Applying topology change operation {}", orElseThrow);
            TopologyChangeAppliers.OperationApplier applier = this.changeAppliers.getApplier(orElseThrow);
            Either flatMap = applier.init(clusterTopology).map(unaryOperator -> {
                return clusterTopology.updateMember(this.localMemberId, unaryOperator);
            }).flatMap(this::updateLocalTopology);
            if (!flatMap.isLeft()) {
                ClusterTopology clusterTopology2 = (ClusterTopology) flatMap.get();
                applier.apply().onComplete((unaryOperator2, th) -> {
                    onOperationApplied(clusterTopology2, orElseThrow, unaryOperator2, th, observeOperation);
                });
            } else {
                observeOperation.failed();
                this.onGoingTopologyChangeOperation = false;
                LOG.error("Failed to initialize topology change operation {}", orElseThrow, flatMap.getLeft());
            }
        }
    }

    private void logAndScheduleRetry(TopologyChangeOperation topologyChangeOperation, Throwable th) {
        this.shouldRetry = true;
        Duration nextDelay = this.backoffRetry.nextDelay();
        LOG.warn("Failed to apply topology change operation {}. Will be retried in {}.", new Object[]{topologyChangeOperation, nextDelay, th});
        this.executor.schedule(nextDelay, () -> {
            LOG.debug("Retrying last applied operation");
            applyTopologyChangeOperation(this.persistedClusterTopology.getTopology());
        });
    }

    private void onOperationApplied(ClusterTopology clusterTopology, TopologyChangeOperation topologyChangeOperation, UnaryOperator<MemberState> unaryOperator, Throwable th, TopologyMetrics.OperationObserver operationObserver) {
        this.onGoingTopologyChangeOperation = false;
        if (th != null) {
            operationObserver.failed();
            logAndScheduleRetry(topologyChangeOperation, th);
            return;
        }
        operationObserver.applied();
        this.backoffRetry.reset();
        if (this.persistedClusterTopology.getTopology().version() != clusterTopology.version()) {
            LOG.debug("Topology changed while applying operation {}. Expected topology is {}. Current topology is {}. Most likely the change operation was cancelled.", new Object[]{topologyChangeOperation, clusterTopology, this.persistedClusterTopology.getTopology()});
            return;
        }
        updateLocalTopology(this.persistedClusterTopology.getTopology().advanceTopologyChange(this.localMemberId, unaryOperator));
        LOG.info("Operation {} applied. Updated local topology to {}", topologyChangeOperation, this.persistedClusterTopology.getTopology());
        this.executor.run(() -> {
            applyTopologyChangeOperation(this.persistedClusterTopology.getTopology());
        });
    }

    private Either<Exception, ClusterTopology> updateLocalTopology(ClusterTopology clusterTopology) {
        if (clusterTopology.equals(this.persistedClusterTopology.getTopology())) {
            return Either.right(clusterTopology);
        }
        try {
            this.persistedClusterTopology.update(clusterTopology);
            this.topologyGossiper.accept(clusterTopology);
            return Either.right(clusterTopology);
        } catch (Exception e) {
            return Either.left(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerTopologyChangeAppliers(TopologyChangeAppliers topologyChangeAppliers) {
        this.executor.run(() -> {
            this.changeAppliers = topologyChangeAppliers;
            applyTopologyChangeOperation(this.persistedClusterTopology.getTopology());
        });
    }

    public void removeTopologyChangeAppliers() {
        this.executor.run(() -> {
            this.changeAppliers = null;
        });
    }
}
