package org.apache.hadoop.ozone.container.replication;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntConsumer;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask;
import org.apache.hadoop.ozone.container.replication.ReplicationServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.class */
public final class ReplicationSupervisor {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationSupervisor.class);
    private static final Comparator<TaskRunner> TASK_RUNNER_COMPARATOR = Comparator.comparing((v0) -> {
        return v0.getTaskPriority();
    }).thenComparing((v0) -> {
        return v0.getTaskQueueTime();
    });
    private final ExecutorService executor;
    private final StateContext context;
    private final Clock clock;
    private final AtomicLong requestCounter;
    private final AtomicLong successCounter;
    private final AtomicLong failureCounter;
    private final AtomicLong timeoutCounter;
    private final AtomicLong skippedCounter;
    private final Set<AbstractReplicationTask> inFlight;
    private final Map<Class<?>, AtomicInteger> taskCounter;
    private int maxQueueSize;
    private final AtomicReference<HddsProtos.NodeOperationalState> state;
    private final IntConsumer executorThreadUpdater;
    private final ReplicationServer.ReplicationConfig replicationConfig;
    private final DatanodeConfiguration datanodeConfig;

    /* loaded from: input_file:org/apache/hadoop/ozone/container/replication/ReplicationSupervisor$Builder.class */
    public static class Builder {
        private StateContext context;
        private ReplicationServer.ReplicationConfig replicationConfig;
        private DatanodeConfiguration datanodeConfig;
        private ExecutorService executor;
        private Clock clock;
        private IntConsumer executorThreadUpdater = i -> {
        };
        private String threadNamePrefix;

        public Builder clock(Clock clock) {
            this.clock = clock;
            return this;
        }

        public Builder executor(ExecutorService executorService) {
            this.executor = executorService;
            return this;
        }

        public Builder replicationConfig(ReplicationServer.ReplicationConfig replicationConfig) {
            this.replicationConfig = replicationConfig;
            return this;
        }

        public Builder datanodeConfig(DatanodeConfiguration datanodeConfiguration) {
            this.datanodeConfig = datanodeConfiguration;
            return this;
        }

        public Builder stateContext(StateContext stateContext) {
            this.context = stateContext;
            return this;
        }

        public Builder executorThreadUpdater(IntConsumer intConsumer) {
            this.executorThreadUpdater = intConsumer;
            return this;
        }

        public Builder threadNamePrefix(String str) {
            this.threadNamePrefix = str;
            return this;
        }

        public ReplicationSupervisor build() {
            if (this.replicationConfig == null || this.datanodeConfig == null) {
                OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
                if (this.replicationConfig == null) {
                    this.replicationConfig = (ReplicationServer.ReplicationConfig) ozoneConfiguration.getObject(ReplicationServer.ReplicationConfig.class);
                }
                if (this.datanodeConfig == null) {
                    this.datanodeConfig = (DatanodeConfiguration) ozoneConfiguration.getObject(DatanodeConfiguration.class);
                }
            }
            if (this.clock == null) {
                this.clock = Clock.system(ZoneId.systemDefault());
            }
            if (this.executor == null) {
                ReplicationSupervisor.LOG.info("Initializing replication supervisor with thread count = {}", Integer.valueOf(this.replicationConfig.getReplicationMaxStreams()));
                ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(this.replicationConfig.getReplicationMaxStreams(), this.replicationConfig.getReplicationMaxStreams(), 60L, TimeUnit.SECONDS, new PriorityBlockingQueue(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(this.threadNamePrefix + "ContainerReplicationThread-%d").build());
                this.executor = threadPoolExecutor;
                this.executorThreadUpdater = i -> {
                    if (i < threadPoolExecutor.getCorePoolSize()) {
                        threadPoolExecutor.setCorePoolSize(i);
                        threadPoolExecutor.setMaximumPoolSize(i);
                    } else {
                        threadPoolExecutor.setMaximumPoolSize(i);
                        threadPoolExecutor.setCorePoolSize(i);
                    }
                };
            }
            return new ReplicationSupervisor(this.context, this.executor, this.replicationConfig, this.datanodeConfig, this.clock, this.executorThreadUpdater);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/ozone/container/replication/ReplicationSupervisor$TaskRunner.class */
    public final class TaskRunner implements Comparable<TaskRunner>, Runnable {
        private final AbstractReplicationTask task;

        public TaskRunner(AbstractReplicationTask abstractReplicationTask) {
            this.task = abstractReplicationTask;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    ReplicationSupervisor.this.requestCounter.incrementAndGet();
                    long millis = ReplicationSupervisor.this.clock.millis();
                    long deadline = this.task.getDeadline();
                    if (deadline > 0 && millis > deadline) {
                        ReplicationSupervisor.LOG.info("Ignoring {} since the deadline has passed ({} < {})", new Object[]{this, Instant.ofEpochMilli(deadline), Instant.ofEpochMilli(millis)});
                        ReplicationSupervisor.this.timeoutCounter.incrementAndGet();
                        ReplicationSupervisor.this.inFlight.remove(this.task);
                        ReplicationSupervisor.this.decrementTaskCounter(this.task);
                        return;
                    }
                    if (ReplicationSupervisor.this.context != null) {
                        DatanodeDetails datanodeDetails = ReplicationSupervisor.this.context.getParent().getDatanodeDetails();
                        if (datanodeDetails != null && datanodeDetails.getPersistedOpState() != HddsProtos.NodeOperationalState.IN_SERVICE && this.task.shouldOnlyRunOnInServiceDatanodes()) {
                            ReplicationSupervisor.LOG.info("Ignoring {} since datanode is not in service ({})", this, datanodeDetails.getPersistedOpState());
                            ReplicationSupervisor.this.inFlight.remove(this.task);
                            ReplicationSupervisor.this.decrementTaskCounter(this.task);
                            return;
                        }
                        OptionalLong termOfLeaderSCM = ReplicationSupervisor.this.context.getTermOfLeaderSCM();
                        long term = this.task.getTerm();
                        if (termOfLeaderSCM.isPresent() && term < termOfLeaderSCM.getAsLong()) {
                            ReplicationSupervisor.LOG.info("Ignoring {} since SCM leader has new term ({} < {})", new Object[]{this, Long.valueOf(term), Long.valueOf(termOfLeaderSCM.getAsLong())});
                            ReplicationSupervisor.this.inFlight.remove(this.task);
                            ReplicationSupervisor.this.decrementTaskCounter(this.task);
                            return;
                        }
                    }
                    this.task.setStatus(AbstractReplicationTask.Status.IN_PROGRESS);
                    this.task.runTask();
                    if (this.task.getStatus() == AbstractReplicationTask.Status.FAILED) {
                        ReplicationSupervisor.LOG.warn("Failed {}", this);
                        ReplicationSupervisor.this.failureCounter.incrementAndGet();
                    } else if (this.task.getStatus() == AbstractReplicationTask.Status.DONE) {
                        ReplicationSupervisor.LOG.info("Successful {}", this);
                        ReplicationSupervisor.this.successCounter.incrementAndGet();
                    } else if (this.task.getStatus() == AbstractReplicationTask.Status.SKIPPED) {
                        ReplicationSupervisor.LOG.info("Skipped {}", this);
                        ReplicationSupervisor.this.skippedCounter.incrementAndGet();
                    }
                    ReplicationSupervisor.this.inFlight.remove(this.task);
                    ReplicationSupervisor.this.decrementTaskCounter(this.task);
                } catch (Exception e) {
                    this.task.setStatus(AbstractReplicationTask.Status.FAILED);
                    ReplicationSupervisor.LOG.warn("Failed {}", this, e);
                    ReplicationSupervisor.this.failureCounter.incrementAndGet();
                    ReplicationSupervisor.this.inFlight.remove(this.task);
                    ReplicationSupervisor.this.decrementTaskCounter(this.task);
                }
            } catch (Throwable th) {
                ReplicationSupervisor.this.inFlight.remove(this.task);
                ReplicationSupervisor.this.decrementTaskCounter(this.task);
                throw th;
            }
        }

        public String toString() {
            return this.task.toString();
        }

        public StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority getTaskPriority() {
            return this.task.getPriority();
        }

        public long getTaskQueueTime() {
            return this.task.getQueued().toEpochMilli();
        }

        @Override // java.lang.Comparable
        public int compareTo(TaskRunner taskRunner) {
            return ReplicationSupervisor.TASK_RUNNER_COMPARATOR.compare(this, taskRunner);
        }

        public int hashCode() {
            return Objects.hash(this.task);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return this.task.equals(((TaskRunner) obj).task);
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private ReplicationSupervisor(StateContext stateContext, ExecutorService executorService, ReplicationServer.ReplicationConfig replicationConfig, DatanodeConfiguration datanodeConfiguration, Clock clock, IntConsumer intConsumer) {
        DatanodeDetails datanodeDetails;
        this.requestCounter = new AtomicLong();
        this.successCounter = new AtomicLong();
        this.failureCounter = new AtomicLong();
        this.timeoutCounter = new AtomicLong();
        this.skippedCounter = new AtomicLong();
        this.taskCounter = new ConcurrentHashMap();
        this.state = new AtomicReference<>();
        this.inFlight = ConcurrentHashMap.newKeySet();
        this.context = stateContext;
        this.executor = executorService;
        this.replicationConfig = replicationConfig;
        this.datanodeConfig = datanodeConfiguration;
        this.maxQueueSize = datanodeConfiguration.getCommandQueueLimit();
        this.clock = clock;
        this.executorThreadUpdater = intConsumer;
        if (stateContext == null || (datanodeDetails = stateContext.getParent().getDatanodeDetails()) == null) {
            return;
        }
        nodeStateUpdated(datanodeDetails.getPersistedOpState());
    }

    public void addTask(AbstractReplicationTask abstractReplicationTask) {
        int i = this.maxQueueSize;
        if (getTotalInFlightReplications() >= i) {
            LOG.warn("Ignored {} command for container {} in Replication Supervisoras queue reached max size of {}.", new Object[]{abstractReplicationTask.getClass(), Long.valueOf(abstractReplicationTask.getContainerId()), Integer.valueOf(i)});
        } else if (this.inFlight.add(abstractReplicationTask)) {
            if (abstractReplicationTask.getPriority() != StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW) {
                this.taskCounter.computeIfAbsent(abstractReplicationTask.getClass(), cls -> {
                    return new AtomicInteger();
                }).incrementAndGet();
            }
            this.executor.execute(new TaskRunner(abstractReplicationTask));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void decrementTaskCounter(AbstractReplicationTask abstractReplicationTask) {
        AtomicInteger atomicInteger;
        if (abstractReplicationTask.getPriority() == StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW || (atomicInteger = this.taskCounter.get(abstractReplicationTask.getClass())) == null) {
            return;
        }
        atomicInteger.decrementAndGet();
    }

    @VisibleForTesting
    public void shutdownAfterFinish() throws InterruptedException {
        this.executor.shutdown();
        this.executor.awaitTermination(1L, TimeUnit.DAYS);
    }

    public void stop() {
        try {
            this.executor.shutdown();
            if (!this.executor.awaitTermination(3L, TimeUnit.SECONDS)) {
                this.executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public int getInFlightReplications(Class<? extends AbstractReplicationTask> cls) {
        AtomicInteger atomicInteger = this.taskCounter.get(cls);
        if (atomicInteger == null) {
            return 0;
        }
        return atomicInteger.get();
    }

    public Map<String, Integer> getInFlightReplicationSummary() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<Class<?>, AtomicInteger> entry : this.taskCounter.entrySet()) {
            hashMap.put(entry.getKey().getSimpleName(), Integer.valueOf(entry.getValue().get()));
        }
        return hashMap;
    }

    public int getTotalInFlightReplications() {
        return this.inFlight.size();
    }

    public int getMaxQueueSize() {
        return this.maxQueueSize;
    }

    public void nodeStateUpdated(HddsProtos.NodeOperationalState nodeOperationalState) {
        if (this.state.getAndSet(nodeOperationalState) != nodeOperationalState) {
            int replicationMaxStreams = this.replicationConfig.getReplicationMaxStreams();
            int commandQueueLimit = this.datanodeConfig.getCommandQueueLimit();
            if (DatanodeDetails.isMaintenance(nodeOperationalState) || DatanodeDetails.isDecommission(nodeOperationalState)) {
                replicationMaxStreams = this.replicationConfig.scaleOutOfServiceLimit(replicationMaxStreams);
                commandQueueLimit = this.replicationConfig.scaleOutOfServiceLimit(commandQueueLimit);
            }
            LOG.info("Node state updated to {}, scaling executor pool size to {}", nodeOperationalState, Integer.valueOf(replicationMaxStreams));
            this.maxQueueSize = commandQueueLimit;
            this.executorThreadUpdater.accept(replicationMaxStreams);
        }
    }

    public long getReplicationRequestCount() {
        return this.requestCounter.get();
    }

    public long getQueueSize() {
        if (this.executor instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor) this.executor).getQueue().size();
        }
        return 0L;
    }

    public long getMaxReplicationStreams() {
        if (this.executor instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor) this.executor).getMaximumPoolSize();
        }
        return 1L;
    }

    public long getReplicationSuccessCount() {
        return this.successCounter.get();
    }

    public long getReplicationFailureCount() {
        return this.failureCounter.get();
    }

    public long getReplicationTimeoutCount() {
        return this.timeoutCounter.get();
    }

    public long getReplicationSkippedCount() {
        return this.skippedCounter.get();
    }
}
