package io.camunda.zeebe.topology.changes;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.topology.ClusterTopologyManager;
import io.camunda.zeebe.topology.api.TopologyRequestFailedException;
import io.camunda.zeebe.topology.changes.TopologyChangeAppliers;
import io.camunda.zeebe.topology.changes.TopologyChangeCoordinator;
import io.camunda.zeebe.topology.state.ClusterTopology;
import io.camunda.zeebe.topology.state.TopologyChangeOperation;
import io.camunda.zeebe.util.Either;
import java.util.List;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/topology/changes/TopologyChangeCoordinatorImpl.class */
public class TopologyChangeCoordinatorImpl implements TopologyChangeCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(TopologyChangeCoordinatorImpl.class);
    private final ClusterTopologyManager clusterTopologyManager;
    private final ConcurrencyControl executor;
    private final MemberId localMemberId;

    public TopologyChangeCoordinatorImpl(ClusterTopologyManager clusterTopologyManager, MemberId memberId, ConcurrencyControl concurrencyControl) {
        this.clusterTopologyManager = clusterTopologyManager;
        this.executor = concurrencyControl;
        this.localMemberId = memberId;
    }

    @Override // io.camunda.zeebe.topology.changes.TopologyChangeCoordinator
    public ActorFuture<ClusterTopology> getTopology() {
        return this.clusterTopologyManager.getClusterTopology();
    }

    @Override // io.camunda.zeebe.topology.changes.TopologyChangeCoordinator
    public ActorFuture<TopologyChangeCoordinator.TopologyChangeResult> applyOperations(TopologyChangeCoordinator.TopologyChangeRequest topologyChangeRequest) {
        return applyOrDryRun(false, topologyChangeRequest);
    }

    @Override // io.camunda.zeebe.topology.changes.TopologyChangeCoordinator
    public ActorFuture<TopologyChangeCoordinator.TopologyChangeResult> simulateOperations(TopologyChangeCoordinator.TopologyChangeRequest topologyChangeRequest) {
        return applyOrDryRun(true, topologyChangeRequest);
    }

    @Override // io.camunda.zeebe.topology.changes.TopologyChangeCoordinator
    public ActorFuture<ClusterTopology> cancelChange(long j) {
        ActorFuture<ClusterTopology> createFuture = this.executor.createFuture();
        this.executor.run(() -> {
            this.clusterTopologyManager.updateClusterTopology(clusterTopology -> {
                if (!validateCancel(j, clusterTopology, createFuture)) {
                    return clusterTopology;
                }
                LOG.warn("Cancelling topology change '{}'. Following operations have been already applied: {}. Following pending operations won't be applied: {}", new Object[]{Long.valueOf(j), (List) clusterTopology.pendingChanges().map((v0) -> {
                    return v0.completedOperations();
                }).orElse(List.of()), (List) clusterTopology.pendingChanges().map((v0) -> {
                    return v0.pendingOperations();
                }).orElse(List.of())});
                ClusterTopology cancelPendingChanges = clusterTopology.cancelPendingChanges();
                createFuture.complete(cancelPendingChanges);
                return cancelPendingChanges;
            });
        });
        return createFuture;
    }

    private ActorFuture<TopologyChangeCoordinator.TopologyChangeResult> applyOrDryRun(boolean z, TopologyChangeCoordinator.TopologyChangeRequest topologyChangeRequest) {
        ActorFuture<TopologyChangeCoordinator.TopologyChangeResult> createFuture = this.executor.createFuture();
        this.executor.run(() -> {
            this.clusterTopologyManager.getClusterTopology().onComplete((clusterTopology, th) -> {
                if (th != null) {
                    failFuture(createFuture, th);
                    return;
                }
                if (!topologyChangeRequest.isForced() && !isCoordinator(clusterTopology)) {
                    failFuture(createFuture, new InternalError(String.format("Cannot process request to change the topology. The broker '%s' is not the coordinator.", this.localMemberId)));
                    return;
                }
                Either<Exception, List<TopologyChangeOperation>> operations = topologyChangeRequest.operations(clusterTopology);
                if (operations.isLeft()) {
                    failFuture(createFuture, (Throwable) operations.getLeft());
                } else {
                    applyOrDryRunOnTopology(z, clusterTopology, (List) operations.get(), createFuture);
                }
            }, this.executor);
        });
        return createFuture;
    }

    private void applyOrDryRunOnTopology(boolean z, ClusterTopology clusterTopology, List<TopologyChangeOperation> list, ActorFuture<TopologyChangeCoordinator.TopologyChangeResult> actorFuture) {
        if (list.isEmpty()) {
            actorFuture.complete(new TopologyChangeCoordinator.TopologyChangeResult(clusterTopology, clusterTopology, ((Long) clusterTopology.lastChange().map((v0) -> {
                return v0.id();
            }).orElse(0L)).longValue(), list));
        } else {
            validateTopologyChangeRequest(clusterTopology, list).onComplete((clusterTopology2, th) -> {
                if (th != null) {
                    failFuture(actorFuture, th);
                    return;
                }
                ActorFuture<ClusterTopology> createFuture = this.executor.createFuture();
                if (z) {
                    createFuture.complete(clusterTopology.startTopologyChange(list));
                } else {
                    applyTopologyChange(list, clusterTopology, clusterTopology2, createFuture);
                }
                createFuture.onComplete((clusterTopology2, th) -> {
                    if (th == null) {
                        actorFuture.complete(new TopologyChangeCoordinator.TopologyChangeResult(clusterTopology, clusterTopology2, ((Long) clusterTopology2.pendingChanges().map((v0) -> {
                            return v0.id();
                        }).orElse(0L)).longValue(), list));
                    } else {
                        failFuture(actorFuture, th);
                    }
                });
            });
        }
    }

    private ActorFuture<ClusterTopology> validateTopologyChangeRequest(ClusterTopology clusterTopology, List<TopologyChangeOperation> list) {
        ActorFuture<ClusterTopology> createFuture = this.executor.createFuture();
        if (clusterTopology.isUninitialized()) {
            failFuture(createFuture, new TopologyRequestFailedException.OperationNotAllowed("Cannot apply topology change. The topology is not initialized."));
        } else if (clusterTopology.hasPendingChanges()) {
            failFuture(createFuture, new TopologyRequestFailedException.ConcurrentModificationException(String.format("Cannot apply topology change. Another topology change [%s] is in progress.", clusterTopology)));
        } else {
            simulateTopologyChange(clusterTopology.startTopologyChange(list), new TopologyChangeAppliersImpl(new NoopPartitionChangeExecutor(), new NoopTopologyMembershipChangeExecutor()), createFuture);
        }
        return createFuture;
    }

    private void applyTopologyChange(List<TopologyChangeOperation> list, ClusterTopology clusterTopology, ClusterTopology clusterTopology2, ActorFuture<ClusterTopology> actorFuture) {
        this.executor.run(() -> {
            this.clusterTopologyManager.updateClusterTopology(clusterTopology3 -> {
                if (clusterTopology3.equals(clusterTopology)) {
                    return clusterTopology3.startTopologyChange(list);
                }
                throw new TopologyRequestFailedException.ConcurrentModificationException("Topology changed while applying the change. Please retry.");
            }).onComplete((clusterTopology4, th) -> {
                if (th != null) {
                    failFuture(actorFuture, th);
                } else {
                    LOG.debug("Applying the topology change has started. The resulting topology will be {}", clusterTopology2);
                    actorFuture.complete(clusterTopology4);
                }
            });
        });
    }

    private void simulateTopologyChange(ClusterTopology clusterTopology, TopologyChangeAppliersImpl topologyChangeAppliersImpl, ActorFuture<ClusterTopology> actorFuture) {
        if (!clusterTopology.hasPendingChanges()) {
            actorFuture.complete(clusterTopology);
            return;
        }
        TopologyChangeAppliers.ClusterOperationApplier applier = topologyChangeAppliersImpl.getApplier(clusterTopology.nextPendingOperation());
        Either<Exception, UnaryOperator<ClusterTopology>> init = applier.init(clusterTopology);
        if (init.isLeft()) {
            failFuture(actorFuture, new TopologyRequestFailedException.InvalidRequest((Throwable) init.getLeft()));
        } else {
            ClusterTopology clusterTopology2 = (ClusterTopology) ((UnaryOperator) init.get()).apply(clusterTopology);
            applier.apply().onComplete((unaryOperator, th) -> {
                if (th != null) {
                    failFuture(actorFuture, new TopologyRequestFailedException.InvalidRequest(th));
                } else {
                    simulateTopologyChange(clusterTopology2.advanceTopologyChange(unaryOperator), topologyChangeAppliersImpl, actorFuture);
                }
            });
        }
    }

    private void failFuture(ActorFuture<?> actorFuture, Throwable th) {
        LOG.warn("Failed to handle topology request", th);
        if (th instanceof TopologyRequestFailedException) {
            actorFuture.completeExceptionally(th);
        } else {
            actorFuture.completeExceptionally(new TopologyRequestFailedException.InternalError(th));
        }
    }

    private boolean validateCancel(long j, ClusterTopology clusterTopology, ActorFuture<ClusterTopology> actorFuture) {
        if (clusterTopology.isUninitialized()) {
            failFuture(actorFuture, new TopologyRequestFailedException.InvalidRequest("Cannot cancel change " + j + " because the topology is not initialized"));
            return false;
        }
        if (!clusterTopology.hasPendingChanges()) {
            failFuture(actorFuture, new TopologyRequestFailedException.InvalidRequest("Cannot cancel change " + j + " because no change is in progress"));
            return false;
        }
        if (clusterTopology.pendingChanges().orElseThrow().id() == j) {
            return true;
        }
        failFuture(actorFuture, new TopologyRequestFailedException.InvalidRequest("Cannot cancel change " + j + " because it is not the current change"));
        return false;
    }

    private boolean isCoordinator(ClusterTopology clusterTopology) {
        return this.localMemberId.equals(clusterTopology.members().keySet().stream().min((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElse(null));
    }
}
