package io.camunda.zeebe.dynamic.config;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationManager;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeAppliers;
import io.camunda.zeebe.dynamic.config.metrics.TopologyMetrics;
import io.camunda.zeebe.dynamic.config.state.ClusterConfiguration;
import io.camunda.zeebe.dynamic.config.state.ClusterConfigurationChangeOperation;
import io.camunda.zeebe.dynamic.config.state.MemberState;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.ExponentialBackoffRetryDelay;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/dynamic/config/ClusterConfigurationManagerImpl.class */
public final class ClusterConfigurationManagerImpl implements ClusterConfigurationManager {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterConfigurationManagerImpl.class);
    private static final Duration MIN_RETRY_DELAY = Duration.ofSeconds(10);
    private static final Duration MAX_RETRY_DELAY = Duration.ofMinutes(1);
    private final ConcurrencyControl executor;
    private final PersistedClusterConfiguration persistedClusterConfiguration;
    private Consumer<ClusterConfiguration> configurationGossiper;
    private final ActorFuture<Void> startFuture;
    private ConfigurationChangeAppliers changeAppliers;
    private ClusterConfigurationManager.InconsistentConfigurationListener onInconsistentConfigurationDetected;
    private final MemberId localMemberId;
    private boolean onGoingConfigurationChangeOperation;
    private boolean shouldRetry;
    private final ExponentialBackoffRetryDelay backoffRetry;
    private boolean initialized;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClusterConfigurationManagerImpl(ConcurrencyControl concurrencyControl, MemberId memberId, PersistedClusterConfiguration persistedClusterConfiguration) {
        this(concurrencyControl, memberId, persistedClusterConfiguration, MIN_RETRY_DELAY, MAX_RETRY_DELAY);
    }

    ClusterConfigurationManagerImpl(ConcurrencyControl concurrencyControl, MemberId memberId, PersistedClusterConfiguration persistedClusterConfiguration, Duration duration, Duration duration2) {
        this.onGoingConfigurationChangeOperation = false;
        this.shouldRetry = false;
        this.initialized = false;
        this.executor = concurrencyControl;
        this.persistedClusterConfiguration = persistedClusterConfiguration;
        this.startFuture = concurrencyControl.createFuture();
        this.localMemberId = memberId;
        this.backoffRetry = new ExponentialBackoffRetryDelay(duration2, duration);
    }

    @Override // io.camunda.zeebe.dynamic.config.ClusterConfigurationManager
    public ActorFuture<ClusterConfiguration> getClusterConfiguration() {
        ActorFuture<ClusterConfiguration> createFuture = this.executor.createFuture();
        this.executor.run(() -> {
            createFuture.complete(this.persistedClusterConfiguration.getConfiguration());
        });
        return createFuture;
    }

    @Override // io.camunda.zeebe.dynamic.config.ClusterConfigurationManager
    public ActorFuture<ClusterConfiguration> updateClusterConfiguration(UnaryOperator<ClusterConfiguration> unaryOperator) {
        ActorFuture<ClusterConfiguration> createFuture = this.executor.createFuture();
        this.executor.run(() -> {
            try {
                ClusterConfiguration clusterConfiguration = (ClusterConfiguration) unaryOperator.apply(this.persistedClusterConfiguration.getConfiguration());
                Either<Exception, ClusterConfiguration> updateLocalConfiguration = updateLocalConfiguration(clusterConfiguration);
                Consumer consumer = clusterConfiguration2 -> {
                    createFuture.complete(clusterConfiguration2);
                    applyConfigurationChangeOperation(clusterConfiguration);
                };
                Objects.requireNonNull(createFuture);
                updateLocalConfiguration.ifRightOrLeft(consumer, (v1) -> {
                    r2.completeExceptionally(v1);
                });
            } catch (Exception e) {
                LOG.error("Failed to update cluster configuration", e);
                createFuture.completeExceptionally(e);
            }
        });
        return createFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ActorFuture<Void> start(ClusterConfigurationInitializer clusterConfigurationInitializer) {
        this.executor.run(() -> {
            if (this.startFuture.isDone()) {
                return;
            }
            initialize(clusterConfigurationInitializer);
        });
        return this.startFuture;
    }

    public void setConfigurationGossiper(Consumer<ClusterConfiguration> consumer) {
        this.configurationGossiper = consumer;
    }

    private void initialize(ClusterConfigurationInitializer clusterConfigurationInitializer) {
        clusterConfigurationInitializer.initialize().onComplete((clusterConfiguration, th) -> {
            if (th != null) {
                LOG.error("Failed to initialize configuration", th);
                this.startFuture.completeExceptionally(th);
                return;
            }
            if (clusterConfiguration.isUninitialized()) {
                LOG.error("Expected to initialize configuration, but got uninitialized configuration");
                this.startFuture.completeExceptionally(new IllegalStateException("Expected to initialize configuration, but got uninitialized configuration"));
                return;
            }
            try {
                this.persistedClusterConfiguration.update(clusterConfiguration.merge(this.persistedClusterConfiguration.getConfiguration()));
                LOG.debug("Initialized cluster configuration '{}'", this.persistedClusterConfiguration.getConfiguration());
                this.configurationGossiper.accept(this.persistedClusterConfiguration.getConfiguration());
                setStarted();
            } catch (IOException e) {
                this.startFuture.completeExceptionally("Failed to start update cluster configuration", e);
            }
        });
    }

    private void setStarted() {
        if (this.startFuture.isDone()) {
            return;
        }
        this.initialized = true;
        this.startFuture.complete((Object) null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onGossipReceived(ClusterConfiguration clusterConfiguration) {
        this.executor.run(() -> {
            if (!this.initialized) {
                LOG.trace("Received configuration {} before ClusterConfigurationManager is initialized.", clusterConfiguration);
                this.configurationGossiper.accept(clusterConfiguration);
                return;
            }
            if (clusterConfiguration != null) {
                try {
                    ClusterConfiguration merge = this.persistedClusterConfiguration.getConfiguration().merge(clusterConfiguration);
                    if (!merge.equals(this.persistedClusterConfiguration.getConfiguration())) {
                        LOG.debug("Received new configuration {}. Updating local configuration to {}", clusterConfiguration, merge);
                        ClusterConfiguration configuration = this.persistedClusterConfiguration.getConfiguration();
                        boolean isConflictingConfiguration = isConflictingConfiguration(merge, configuration);
                        this.persistedClusterConfiguration.update(merge);
                        if (isConflictingConfiguration && this.onInconsistentConfigurationDetected != null) {
                            this.onInconsistentConfigurationDetected.onInconsistentConfiguration(merge, configuration);
                        }
                        this.configurationGossiper.accept(merge);
                        applyConfigurationChangeOperation(merge);
                    }
                } catch (IOException e) {
                    LOG.warn("Failed to process cluster configuration received via gossip. '{}'", clusterConfiguration, e);
                }
            }
        });
    }

    private boolean isConflictingConfiguration(ClusterConfiguration clusterConfiguration, ClusterConfiguration clusterConfiguration2) {
        return ((!clusterConfiguration.hasMember(this.localMemberId) && clusterConfiguration2.hasMember(this.localMemberId) && clusterConfiguration2.getMember(this.localMemberId).state() == MemberState.State.LEFT) || Objects.equals(clusterConfiguration.getMember(this.localMemberId), clusterConfiguration2.getMember(this.localMemberId))) ? false : true;
    }

    private boolean shouldApplyConfigurationChangeOperation(ClusterConfiguration clusterConfiguration) {
        return (!this.onGoingConfigurationChangeOperation || this.shouldRetry) && clusterConfiguration.pendingChangesFor(this.localMemberId).isPresent() && this.changeAppliers != null;
    }

    private void applyConfigurationChangeOperation(ClusterConfiguration clusterConfiguration) {
        if (shouldApplyConfigurationChangeOperation(clusterConfiguration)) {
            this.onGoingConfigurationChangeOperation = true;
            this.shouldRetry = false;
            ClusterConfigurationChangeOperation orElseThrow = clusterConfiguration.pendingChangesFor(this.localMemberId).orElseThrow();
            TopologyMetrics.OperationObserver observeOperation = TopologyMetrics.observeOperation(orElseThrow);
            LOG.info("Applying configuration change operation {}", orElseThrow);
            ConfigurationChangeAppliers.ClusterOperationApplier applier = this.changeAppliers.getApplier(orElseThrow);
            Either flatMap = applier.init(clusterConfiguration).map(unaryOperator -> {
                return (ClusterConfiguration) unaryOperator.apply(clusterConfiguration);
            }).flatMap(this::updateLocalConfiguration);
            if (!flatMap.isLeft()) {
                ClusterConfiguration clusterConfiguration2 = (ClusterConfiguration) flatMap.get();
                applier.apply().onComplete((unaryOperator2, th) -> {
                    onOperationApplied(clusterConfiguration2, orElseThrow, unaryOperator2, th, observeOperation);
                });
            } else {
                observeOperation.failed();
                this.onGoingConfigurationChangeOperation = false;
                LOG.error("Failed to initialize configuration change operation {}", orElseThrow, flatMap.getLeft());
            }
        }
    }

    private void logAndScheduleRetry(ClusterConfigurationChangeOperation clusterConfigurationChangeOperation, Throwable th) {
        this.shouldRetry = true;
        Duration nextDelay = this.backoffRetry.nextDelay();
        LOG.warn("Failed to apply configuration change operation {}. Will be retried in {}.", new Object[]{clusterConfigurationChangeOperation, nextDelay, th});
        this.executor.schedule(nextDelay, () -> {
            LOG.debug("Retrying last applied operation");
            applyConfigurationChangeOperation(this.persistedClusterConfiguration.getConfiguration());
        });
    }

    private void onOperationApplied(ClusterConfiguration clusterConfiguration, ClusterConfigurationChangeOperation clusterConfigurationChangeOperation, UnaryOperator<ClusterConfiguration> unaryOperator, Throwable th, TopologyMetrics.OperationObserver operationObserver) {
        this.onGoingConfigurationChangeOperation = false;
        if (th != null) {
            operationObserver.failed();
            logAndScheduleRetry(clusterConfigurationChangeOperation, th);
            return;
        }
        operationObserver.applied();
        this.backoffRetry.reset();
        if (this.persistedClusterConfiguration.getConfiguration().version() != clusterConfiguration.version()) {
            LOG.debug("Configuration changed while applying operation {}. Expected configuration is {}. Current configuration is {}. Most likely the change operation was cancelled.", new Object[]{clusterConfigurationChangeOperation, clusterConfiguration, this.persistedClusterConfiguration.getConfiguration()});
            return;
        }
        updateLocalConfiguration(this.persistedClusterConfiguration.getConfiguration().advanceConfigurationChange(unaryOperator));
        LOG.info("Operation {} applied. Updated local configuration to {}", clusterConfigurationChangeOperation, this.persistedClusterConfiguration.getConfiguration());
        this.executor.run(() -> {
            applyConfigurationChangeOperation(this.persistedClusterConfiguration.getConfiguration());
        });
    }

    private Either<Exception, ClusterConfiguration> updateLocalConfiguration(ClusterConfiguration clusterConfiguration) {
        if (clusterConfiguration.equals(this.persistedClusterConfiguration.getConfiguration())) {
            return Either.right(clusterConfiguration);
        }
        try {
            this.persistedClusterConfiguration.update(clusterConfiguration);
            this.configurationGossiper.accept(clusterConfiguration);
            return Either.right(clusterConfiguration);
        } catch (Exception e) {
            return Either.left(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerTopologyChangeAppliers(ConfigurationChangeAppliers configurationChangeAppliers) {
        this.executor.run(() -> {
            this.changeAppliers = configurationChangeAppliers;
            applyConfigurationChangeOperation(this.persistedClusterConfiguration.getConfiguration());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeTopologyChangeAppliers() {
        this.executor.run(() -> {
            this.changeAppliers = null;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerTopologyChangedListener(ClusterConfigurationManager.InconsistentConfigurationListener inconsistentConfigurationListener) {
        this.executor.run(() -> {
            this.onInconsistentConfigurationDetected = inconsistentConfigurationListener;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeTopologyChangedListener() {
        this.executor.run(() -> {
            this.onInconsistentConfigurationDetected = null;
        });
    }
}
