package org.apache.hadoop.hdds.scm.container.replication;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.class */
public class OverReplicatedProcessor implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(OverReplicatedProcessor.class);
    private final ReplicationManager replicationManager;
    private final ContainerReplicaPendingOps pendingOps;
    private final EventPublisher eventPublisher;
    private volatile boolean runImmediately = false;
    private final long intervalInMillis;

    public OverReplicatedProcessor(ReplicationManager replicationManager, ContainerReplicaPendingOps containerReplicaPendingOps, EventPublisher eventPublisher, long j) {
        this.replicationManager = replicationManager;
        this.pendingOps = containerReplicaPendingOps;
        this.eventPublisher = eventPublisher;
        this.intervalInMillis = j;
    }

    public void processAll() {
        ContainerHealthResult.OverReplicatedHealthResult dequeueOverReplicatedContainer;
        int i = 0;
        int i2 = 0;
        while (this.replicationManager.shouldRun() && (dequeueOverReplicatedContainer = this.replicationManager.dequeueOverReplicatedContainer()) != null) {
            try {
                processContainer(dequeueOverReplicatedContainer);
                i++;
            } catch (Exception e) {
                LOG.error("Error processing over replicated container {}", dequeueOverReplicatedContainer.getContainerInfo(), e);
                i2++;
                this.replicationManager.requeueOverReplicatedContainer(dequeueOverReplicatedContainer);
            }
        }
        LOG.info("Processed {} over replicated containers, failed processing {}", Integer.valueOf(i), Integer.valueOf(i2));
    }

    protected void processContainer(ContainerHealthResult.OverReplicatedHealthResult overReplicatedHealthResult) throws IOException {
        for (Map.Entry<DatanodeDetails, SCMCommand<?>> entry : this.replicationManager.processOverReplicatedContainer(overReplicatedHealthResult).entrySet()) {
            SCMCommand<?> value = entry.getValue();
            value.setTerm(this.replicationManager.getScmTerm());
            this.eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, new CommandForDatanode(entry.getKey().getUuid(), value));
            adjustPendingOps(overReplicatedHealthResult.getContainerInfo().containerID(), value, entry.getKey());
        }
    }

    private void adjustPendingOps(ContainerID containerID, SCMCommand<?> sCMCommand, DatanodeDetails datanodeDetails) throws IOException {
        if (sCMCommand.getType() != StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand) {
            throw new IOException("Unexpected command type " + sCMCommand.getType());
        }
        this.pendingOps.scheduleDeleteReplica(containerID, datanodeDetails, ((DeleteContainerCommand) sCMCommand).getReplicaIndex());
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                if (this.replicationManager.shouldRun()) {
                    processAll();
                }
                synchronized (this) {
                    if (!this.runImmediately) {
                        wait(this.intervalInMillis);
                    }
                    this.runImmediately = false;
                }
            } catch (InterruptedException e) {
                LOG.warn("{} interrupted. Exiting...", Thread.currentThread().getName());
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    @VisibleForTesting
    synchronized void runImmediately() {
        this.runImmediately = true;
        notify();
    }
}
