package org.apache.hadoop.hdds.scm.container.replication;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.function.Supplier;
import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.class */
public abstract class UnhealthyReplicationProcessor<HealthResult extends ContainerHealthResult> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(UnhealthyReplicationProcessor.class);
    private final ReplicationManager replicationManager;
    private volatile boolean runImmediately = false;
    private final Supplier<Duration> interval;

    public UnhealthyReplicationProcessor(ReplicationManager replicationManager, Supplier<Duration> supplier) {
        this.replicationManager = replicationManager;
        this.interval = supplier;
    }

    protected abstract HealthResult dequeueHealthResultFromQueue(ReplicationQueue replicationQueue);

    protected abstract void requeueHealthResult(ReplicationQueue replicationQueue, HealthResult healthresult);

    protected abstract boolean inflightOperationLimitReached(ReplicationManager replicationManager, long j);

    public void processAll(ReplicationQueue replicationQueue) {
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        HashMap newHashMap = Maps.newHashMap();
        LinkedList linkedList = new LinkedList();
        long replicationInFlightLimit = this.replicationManager.getReplicationInFlightLimit();
        while (true) {
            if (!this.replicationManager.shouldRun()) {
                break;
            }
            if (replicationInFlightLimit > 0 && inflightOperationLimitReached(this.replicationManager, replicationInFlightLimit)) {
                LOG.info("The maximum number of pending replicas ({}) are scheduled. Ending the iteration.", Long.valueOf(replicationInFlightLimit));
                this.replicationManager.getMetrics().incrPendingReplicationLimitReachedTotal();
                break;
            }
            HealthResult dequeueHealthResultFromQueue = dequeueHealthResultFromQueue(replicationQueue);
            if (dequeueHealthResultFromQueue == null) {
                break;
            }
            try {
                processContainer(dequeueHealthResultFromQueue);
                i++;
                newHashMap.compute(dequeueHealthResultFromQueue.getHealthState(), (healthState, num) -> {
                    return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
                });
            } catch (CommandTargetOverloadedException e) {
                LOG.debug("All targets overloaded when processing Health result of class: {} for container {}", dequeueHealthResultFromQueue.getClass(), dequeueHealthResultFromQueue.getContainerInfo());
                i3++;
                linkedList.add(dequeueHealthResultFromQueue);
            } catch (Exception e2) {
                LOG.error("Error processing Health result of class: {} for container {}", new Object[]{dequeueHealthResultFromQueue.getClass(), dequeueHealthResultFromQueue.getContainerInfo(), e2});
                i2++;
                linkedList.add(dequeueHealthResultFromQueue);
            }
        }
        linkedList.forEach(containerHealthResult -> {
            requeueHealthResult(replicationQueue, containerHealthResult);
        });
        if (i > 0 || i2 > 0 || i3 > 0) {
            LOG.info("Processed {} containers with health state counts {}, failed processing {}, deferred due to load {}", new Object[]{Integer.valueOf(i), newHashMap, Integer.valueOf(i2), Integer.valueOf(i3)});
        }
    }

    protected abstract int sendDatanodeCommands(ReplicationManager replicationManager, HealthResult healthresult) throws IOException;

    private void processContainer(HealthResult healthresult) throws IOException {
        synchronized (healthresult.getContainerInfo()) {
            sendDatanodeCommands(this.replicationManager, healthresult);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                if (this.replicationManager.shouldRun()) {
                    processAll(this.replicationManager.getQueue());
                }
                Duration duration = this.interval.get();
                if (!this.runImmediately && LOG.isDebugEnabled()) {
                    LOG.debug("May wait {} before next run", duration);
                }
                synchronized (this) {
                    if (!this.runImmediately) {
                        wait(duration.toMillis());
                    }
                    this.runImmediately = false;
                }
            } catch (InterruptedException e) {
                LOG.warn("{} interrupted. Exiting...", Thread.currentThread().getName());
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    @VisibleForTesting
    synchronized void runImmediately() {
        this.runImmediately = true;
        notify();
    }
}
