package org.apache.hadoop.hdds.scm.container.replication;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerUtil;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.class */
public class RatisUnderReplicationHandler implements UnhealthyReplicationHandler {
    public static final Logger LOG = LoggerFactory.getLogger(RatisUnderReplicationHandler.class);
    private final PlacementPolicy placementPolicy;
    private final long currentContainerSize;
    private final ReplicationManager replicationManager;
    private final ReplicationManagerMetrics metrics;

    public RatisUnderReplicationHandler(PlacementPolicy placementPolicy, ConfigurationSource configurationSource, ReplicationManager replicationManager) {
        this.placementPolicy = placementPolicy;
        this.currentContainerSize = (long) configurationSource.getStorageSize("ozone.scm.container.size", "5GB", StorageUnit.BYTES);
        this.replicationManager = replicationManager;
        this.metrics = replicationManager.getMetrics();
    }

    @Override // org.apache.hadoop.hdds.scm.container.replication.UnhealthyReplicationHandler
    public int processAndSendCommands(Set<ContainerReplica> set, List<ContainerReplicaOp> list, ContainerHealthResult containerHealthResult, int i) throws IOException {
        ContainerInfo containerInfo = containerHealthResult.getContainerInfo();
        LOG.debug("Handling under replicated Ratis container {}", containerInfo);
        RatisContainerReplicaCount ratisContainerReplicaCount = new RatisContainerReplicaCount(containerInfo, set, list, i, true);
        RatisContainerReplicaCount ratisContainerReplicaCount2 = new RatisContainerReplicaCount(containerInfo, set, list, i, false);
        if ((containerHealthResult instanceof ContainerHealthResult.UnderReplicatedHealthResult) && ((ContainerHealthResult.UnderReplicatedHealthResult) containerHealthResult).hasVulnerableUnhealthy()) {
            return handleVulnerableUnhealthyReplicas(ratisContainerReplicaCount, list);
        }
        RatisContainerReplicaCount verifyUnderReplication = verifyUnderReplication(ratisContainerReplicaCount, ratisContainerReplicaCount2);
        if (verifyUnderReplication == null) {
            return 0;
        }
        List<DatanodeDetails> sources = getSources(verifyUnderReplication, list);
        if (sources.isEmpty()) {
            LOG.warn("Cannot replicate container {} because no CLOSED, QUASI_CLOSED or UNHEALTHY replicas were found.", containerInfo);
            return 0;
        }
        try {
            List<DatanodeDetails> targets = getTargets(verifyUnderReplication, list);
            int sendReplicationCommands = sendReplicationCommands(containerInfo, sources, targets);
            if (targets.size() >= verifyUnderReplication.additionalReplicaNeeded()) {
                return sendReplicationCommands;
            }
            LOG.debug("Placement policy failed to find enough targets to satisfy under replication for container {}. Targets found: {}, additional replicas needed: {}", new Object[]{containerInfo, Integer.valueOf(targets.size()), Integer.valueOf(verifyUnderReplication.additionalReplicaNeeded())});
            this.metrics.incrPartialReplicationTotal();
            throw new InsufficientDatanodesException(verifyUnderReplication.additionalReplicaNeeded(), targets.size());
        } catch (SCMException e) {
            if (e.getResult() != SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE) {
                throw e;
            }
            LOG.warn("Cannot replicate container {} because no suitable targets were found.", containerInfo);
            removeUnhealthyReplicaIfPossible(containerInfo, set, list);
            throw e;
        }
    }

    private int handleVulnerableUnhealthyReplicas(RatisContainerReplicaCount ratisContainerReplicaCount, List<ContainerReplicaOp> list) throws NotLeaderException, CommandTargetOverloadedException, SCMException {
        ContainerInfo container = ratisContainerReplicaCount.getContainer();
        List<ContainerReplica> vulnerableUnhealthyReplicas = ratisContainerReplicaCount.getVulnerableUnhealthyReplicas(datanodeDetails -> {
            try {
                return this.replicationManager.getNodeStatus(datanodeDetails);
            } catch (NodeNotFoundException e) {
                LOG.warn("Exception for datanode {} while handling vulnerable replicas for container {}, with all replicas {}.", new Object[]{datanodeDetails, container, ratisContainerReplicaCount.getReplicas(), e});
                return null;
            }
        });
        LOG.info("Handling vulnerable UNHEALTHY replicas {} for container {}.", vulnerableUnhealthyReplicas, container);
        int i = 0;
        Iterator<ContainerReplicaOp> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
                i++;
            }
        }
        if (i >= vulnerableUnhealthyReplicas.size()) {
            LOG.debug("There are {} pending adds for container {}, while the number of UNHEALTHY replicas is {}.", new Object[]{Integer.valueOf(i), container.containerID(), Integer.valueOf(vulnerableUnhealthyReplicas.size())});
            return 0;
        }
        Collections.shuffle(vulnerableUnhealthyReplicas);
        return replicateEachSource(ratisContainerReplicaCount, vulnerableUnhealthyReplicas, list);
    }

    private int replicateEachSource(RatisContainerReplicaCount ratisContainerReplicaCount, List<ContainerReplica> list, List<ContainerReplicaOp> list2) throws NotLeaderException, SCMException, CommandTargetOverloadedException {
        List<ContainerReplica> replicas = ratisContainerReplicaCount.getReplicas();
        ContainerInfo container = ratisContainerReplicaCount.getContainer();
        ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes = ReplicationManagerUtil.getExcludedAndUsedNodes(container, replicas, Collections.emptySet(), list2, this.replicationManager);
        CommandTargetOverloadedException commandTargetOverloadedException = null;
        int i = 0;
        for (ContainerReplica containerReplica : list) {
            List<DatanodeDetails> targetDatanodes = ReplicationManagerUtil.getTargetDatanodes(this.placementPolicy, 1, excludedAndUsedNodes.getUsedNodes(), excludedAndUsedNodes.getExcludedNodes(), this.currentContainerSize, container);
            int i2 = 0;
            try {
                i2 = sendReplicationCommands(container, ImmutableList.of(containerReplica.getDatanodeDetails()), targetDatanodes);
            } catch (CommandTargetOverloadedException e) {
                LOG.info("Exception while replicating {} to target {} for container {}.", new Object[]{containerReplica, targetDatanodes, container, e});
                if (commandTargetOverloadedException == null) {
                    commandTargetOverloadedException = e;
                }
            }
            if (i2 == 1) {
                excludedAndUsedNodes.getUsedNodes().add(targetDatanodes.get(0));
            }
            i += i2;
        }
        if (commandTargetOverloadedException != null) {
            throw commandTargetOverloadedException;
        }
        return i;
    }

    private void removeUnhealthyReplicaIfPossible(ContainerInfo containerInfo, Set<ContainerReplica> set, List<ContainerReplicaOp> list) throws NotLeaderException {
        LOG.info("Finding an unhealthy replica to delete for container {} with replicas {} to unblock under replication handling.", containerInfo, set);
        int i = 0;
        Iterator<ContainerReplicaOp> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
                i++;
            }
        }
        ContainerReplica selectUnhealthyReplicaForDelete = ReplicationManagerUtil.selectUnhealthyReplicaForDelete(containerInfo, set, i, datanodeDetails -> {
            try {
                return this.replicationManager.getNodeStatus(datanodeDetails);
            } catch (NodeNotFoundException e) {
                LOG.warn("Exception while finding an unhealthy replica to delete for container {} with replicas {}.", new Object[]{containerInfo, set, e});
                return null;
            }
        });
        if (selectUnhealthyReplicaForDelete != null) {
            this.replicationManager.sendDeleteCommand(containerInfo, selectUnhealthyReplicaForDelete.getReplicaIndex(), selectUnhealthyReplicaForDelete.getDatanodeDetails(), true);
        } else {
            LOG.info("Unable to find a replica to remove for container {} with replicas {}", containerInfo, set);
        }
    }

    private RatisContainerReplicaCount verifyUnderReplication(RatisContainerReplicaCount ratisContainerReplicaCount, RatisContainerReplicaCount ratisContainerReplicaCount2) {
        if (ratisContainerReplicaCount2.isSufficientlyReplicated()) {
            LOG.info("The container {} state changed and it's not under replicated any more.", ratisContainerReplicaCount2.getContainer().containerID());
            return null;
        }
        if (ratisContainerReplicaCount2.isSufficientlyReplicated(true)) {
            LOG.info("Container {} with replicas {} will be sufficiently replicated after pending replicas are created.", Long.valueOf(ratisContainerReplicaCount2.getContainer().getContainerID()), ratisContainerReplicaCount2.getReplicas());
            return null;
        }
        if (ratisContainerReplicaCount.getReplicas().isEmpty()) {
            LOG.warn("Container {} does not have any replicas and is unrecoverable.", ratisContainerReplicaCount.getContainer());
            return null;
        }
        if (!ratisContainerReplicaCount.isSufficientlyReplicated(true) || ratisContainerReplicaCount.getHealthyReplicaCount() != 0) {
            return ratisContainerReplicaCount2.getHealthyReplicaCount() > 0 ? ratisContainerReplicaCount2 : ratisContainerReplicaCount;
        }
        LOG.info("Container {} with only UNHEALTHY replicas [{}] will be sufficiently replicated after pending adds are created.", ratisContainerReplicaCount.getContainer(), ratisContainerReplicaCount.getReplicas());
        return null;
    }

    private List<DatanodeDetails> getSources(RatisContainerReplicaCount ratisContainerReplicaCount, List<ContainerReplicaOp> list) {
        HashSet hashSet = new HashSet();
        for (ContainerReplicaOp containerReplicaOp : list) {
            if (containerReplicaOp.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
                hashSet.add(containerReplicaOp.getTarget());
            }
        }
        Predicate<? super ContainerReplica> predicate = containerReplica -> {
            return containerReplica.getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
        };
        boolean z = false;
        Iterator<ContainerReplica> it = ratisContainerReplicaCount.getReplicas().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (it.next().getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED) {
                z = true;
                break;
            }
        }
        if (!z || ratisContainerReplicaCount.getContainer().getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
            predicate = predicate.or(containerReplica2 -> {
                return containerReplica2.getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED;
            });
        }
        if (ratisContainerReplicaCount.getHealthyReplicaCount() == 0) {
            predicate = predicate.or(containerReplica3 -> {
                return containerReplica3.getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.UNHEALTHY;
            });
        }
        List list2 = (List) ratisContainerReplicaCount.getReplicas().stream().filter(predicate).filter(containerReplica4 -> {
            try {
                return this.replicationManager.getNodeStatus(containerReplica4.getDatanodeDetails()).isHealthy();
            } catch (NodeNotFoundException e) {
                return false;
            }
        }).filter(containerReplica5 -> {
            return !hashSet.contains(containerReplica5.getDatanodeDetails());
        }).collect(Collectors.toList());
        OptionalLong max = list2.stream().filter(containerReplica6 -> {
            return containerReplica6.getSequenceId() != null;
        }).mapToLong((v0) -> {
            return v0.getSequenceId();
        }).max();
        Stream stream = list2.stream();
        if (max.isPresent()) {
            stream = stream.filter(containerReplica7 -> {
                return containerReplica7.getSequenceId() != null;
            }).filter(containerReplica8 -> {
                return containerReplica8.getSequenceId().longValue() == max.getAsLong();
            });
        }
        return (List) stream.map((v0) -> {
            return v0.getDatanodeDetails();
        }).collect(Collectors.toList());
    }

    private List<DatanodeDetails> getTargets(RatisContainerReplicaCount ratisContainerReplicaCount, List<ContainerReplicaOp> list) throws IOException {
        LOG.debug("Need {} target datanodes for container {}. Current replicas: {}.", new Object[]{Integer.valueOf(ratisContainerReplicaCount.additionalReplicaNeeded()), ratisContainerReplicaCount.getContainer().containerID(), ratisContainerReplicaCount.getReplicas()});
        ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes = ReplicationManagerUtil.getExcludedAndUsedNodes(ratisContainerReplicaCount.getContainer(), ratisContainerReplicaCount.getReplicas(), Collections.emptySet(), list, this.replicationManager);
        List<DatanodeDetails> excludedNodes = excludedAndUsedNodes.getExcludedNodes();
        List<DatanodeDetails> usedNodes = excludedAndUsedNodes.getUsedNodes();
        LOG.debug("UsedList: {}, size {}. ExcludeList: {}, size: {}. ", new Object[]{usedNodes, Integer.valueOf(usedNodes.size()), excludedNodes, Integer.valueOf(excludedNodes.size())});
        return ReplicationManagerUtil.getTargetDatanodes(this.placementPolicy, ratisContainerReplicaCount.additionalReplicaNeeded(), usedNodes, excludedNodes, this.currentContainerSize, ratisContainerReplicaCount.getContainer());
    }

    private int sendReplicationCommands(ContainerInfo containerInfo, List<DatanodeDetails> list, List<DatanodeDetails> list2) throws CommandTargetOverloadedException, NotLeaderException {
        int i = 0;
        if (this.replicationManager.getConfig().isPush()) {
            Iterator<DatanodeDetails> it = list2.iterator();
            while (it.hasNext()) {
                this.replicationManager.sendThrottledReplicationCommand(containerInfo, list, it.next(), 0);
                i++;
            }
        } else {
            Iterator<DatanodeDetails> it2 = list2.iterator();
            while (it2.hasNext()) {
                this.replicationManager.sendDatanodeCommand(ReplicateContainerCommand.fromSources(containerInfo.getContainerID(), list), containerInfo, it2.next());
                i++;
            }
        }
        return i;
    }
}
