package org.apache.hadoop.ozone.om.ratis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.S3SecretManager;
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry;
import org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerDoubleBufferMetrics;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.class */
public final class OzoneManagerDoubleBuffer {
    private static final Logger LOG = LoggerFactory.getLogger(OzoneManagerDoubleBuffer.class);
    private Queue<DoubleBufferEntry<OMClientResponse>> currentBuffer;
    private Queue<DoubleBufferEntry<OMClientResponse>> readyBuffer;
    private volatile Queue<CompletableFuture<Void>> currentFutureQueue;
    private volatile Queue<CompletableFuture<Void>> readyFutureQueue;
    private final Daemon daemon;
    private final OMMetadataManager omMetadataManager;
    private final AtomicLong flushedTransactionCount;
    private final AtomicLong flushIterations;
    private final AtomicBoolean isRunning;
    private final OzoneManagerDoubleBufferMetrics ozoneManagerDoubleBufferMetrics;
    private long maxFlushedTransactionsInOneIteration;
    private final OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot;
    private final boolean isRatisEnabled;
    private final boolean isTracingEnabled;
    private final Semaphore unFlushedTransactions;
    private final FlushNotifier flushNotifier;
    private final String threadPrefix;
    private final S3SecretManager s3SecretManager;
    private final Function<Long, Long> indexToTerm;

    /* loaded from: input_file:org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer$Builder.class */
    public static class Builder {
        private OMMetadataManager mm;
        private OzoneManagerRatisSnapshot rs;
        private FlushNotifier flushNotifier;
        private S3SecretManager s3SecretManager;
        private boolean isRatisEnabled = false;
        private boolean isTracingEnabled = false;
        private Function<Long, Long> indexToTerm = null;
        private int maxUnFlushedTransactionCount = 0;
        private String threadPrefix = "";

        public Builder setOmMetadataManager(OMMetadataManager oMMetadataManager) {
            this.mm = oMMetadataManager;
            return this;
        }

        public Builder setOzoneManagerRatisSnapShot(OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot) {
            this.rs = ozoneManagerRatisSnapshot;
            return this;
        }

        public Builder enableRatis(boolean z) {
            this.isRatisEnabled = z;
            return this;
        }

        public Builder enableTracing(boolean z) {
            this.isTracingEnabled = z;
            return this;
        }

        public Builder setIndexToTerm(Function<Long, Long> function) {
            this.indexToTerm = function;
            return this;
        }

        public Builder setmaxUnFlushedTransactionCount(int i) {
            this.maxUnFlushedTransactionCount = i;
            return this;
        }

        public Builder setFlushNotifier(FlushNotifier flushNotifier) {
            this.flushNotifier = flushNotifier;
            return this;
        }

        public Builder setThreadPrefix(String str) {
            this.threadPrefix = str;
            return this;
        }

        public Builder setS3SecretManager(S3SecretManager s3SecretManager) {
            this.s3SecretManager = s3SecretManager;
            return this;
        }

        public OzoneManagerDoubleBuffer build() {
            if (this.isRatisEnabled) {
                Preconditions.checkNotNull(this.rs, "When ratis is enabled, OzoneManagerRatisSnapshot should not be null");
                Preconditions.checkNotNull(this.indexToTerm, "When ratis is enabled indexToTerm should not be null");
                Preconditions.checkState(((long) this.maxUnFlushedTransactionCount) > 0, "when ratis is enable, maxUnFlushedTransactions should be bigger than 0");
            }
            if (this.flushNotifier == null) {
                this.flushNotifier = new FlushNotifier();
            }
            return new OzoneManagerDoubleBuffer(this.mm, this.rs, this.isRatisEnabled, this.isTracingEnabled, this.indexToTerm, this.maxUnFlushedTransactionCount, this.flushNotifier, this.s3SecretManager, this.threadPrefix, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer$FlushNotifier.class */
    public static class FlushNotifier {
        private final Set<CountDownLatch> flushLatches = ConcurrentHashMap.newKeySet();

        FlushNotifier() {
        }

        void await() throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(2);
            this.flushLatches.add(countDownLatch);
            countDownLatch.await();
            this.flushLatches.remove(countDownLatch);
        }

        int notifyFlush() {
            int size = this.flushLatches.size();
            Iterator<CountDownLatch> it = this.flushLatches.iterator();
            while (it.hasNext()) {
                it.next().countDown();
            }
            return size;
        }
    }

    private OzoneManagerDoubleBuffer(OMMetadataManager oMMetadataManager, OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot, boolean z, boolean z2, Function<Long, Long> function, int i, FlushNotifier flushNotifier, S3SecretManager s3SecretManager, String str) {
        this.flushedTransactionCount = new AtomicLong(0L);
        this.flushIterations = new AtomicLong(0L);
        this.isRunning = new AtomicBoolean(false);
        this.currentBuffer = new ConcurrentLinkedQueue();
        this.readyBuffer = new ConcurrentLinkedQueue();
        this.isRatisEnabled = z;
        this.isTracingEnabled = z2;
        if (!z) {
            this.currentFutureQueue = new ConcurrentLinkedQueue();
            this.readyFutureQueue = new ConcurrentLinkedQueue();
        }
        this.unFlushedTransactions = new Semaphore(i);
        this.omMetadataManager = oMMetadataManager;
        this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapshot;
        this.ozoneManagerDoubleBufferMetrics = OzoneManagerDoubleBufferMetrics.create();
        this.indexToTerm = function;
        this.flushNotifier = flushNotifier;
        this.threadPrefix = str;
        this.isRunning.set(true);
        this.daemon = new Daemon(this::flushTransactions);
        this.daemon.setName(String.valueOf(str) + "OMDoubleBufferFlushThread");
        this.daemon.start();
        this.s3SecretManager = s3SecretManager;
    }

    public void acquireUnFlushedTransactions(int i) throws InterruptedException {
        this.unFlushedTransactions.acquire(i);
    }

    public void releaseUnFlushedTransactions(int i) {
        this.unFlushedTransactions.release(i);
    }

    private void addToBatchWithTrace(OzoneManagerProtocolProtos.OMResponse oMResponse, CheckedRunnable<IOException> checkedRunnable) throws IOException {
        if (this.isTracingEnabled) {
            TracingUtil.executeAsChildSpan("DB-addToWriteBatch-" + oMResponse.getCmdType(), oMResponse.getTraceID(), checkedRunnable);
        } else {
            checkedRunnable.run();
        }
    }

    private void flushBatchWithTrace(String str, int i, CheckedRunnable<IOException> checkedRunnable) throws IOException {
        if (this.isTracingEnabled) {
            TracingUtil.executeAsChildSpan("DB-commitWriteBatch-Size-" + i, str, checkedRunnable);
        } else {
            checkedRunnable.run();
        }
    }

    private void addToBatchTransactionInfoWithTrace(String str, long j, CheckedRunnable<IOException> checkedRunnable) throws IOException {
        if (this.isTracingEnabled) {
            TracingUtil.executeAsChildSpan("DB-addWriteBatch-transactioninfo-" + j, str, checkedRunnable);
        } else {
            checkedRunnable.run();
        }
    }

    @VisibleForTesting
    void flushTransactions() {
        while (this.isRunning.get() && canFlush()) {
            flushCurrentBuffer();
        }
    }

    @VisibleForTesting
    void flushCurrentBuffer() {
        try {
            swapCurrentAndReadyBuffer();
            Iterator<Queue<DoubleBufferEntry<OMClientResponse>>> it = splitReadyBufferAtCreateSnapshot().iterator();
            while (it.hasNext()) {
                flushBatch(it.next());
            }
            clearReadyBuffer();
            this.flushNotifier.notifyFlush();
        } catch (IOException e) {
            terminate(e, 1);
        } catch (Throwable th) {
            terminate(th, 2);
        }
    }

    private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> queue) throws IOException {
        HashMap hashMap = new HashMap();
        Throwable th = null;
        try {
            BatchOperation initBatchOperation = this.omMetadataManager.getStore().initBatchOperation();
            try {
                String addToBatch = addToBatch(queue, initBatchOperation);
                queue.iterator().forEachRemaining(doubleBufferEntry -> {
                    addCleanupEntry(doubleBufferEntry, hashMap);
                });
                List<Long> list = (List) queue.stream().map((v0) -> {
                    return v0.getTrxLogIndex();
                }).sorted().collect(Collectors.toList());
                long longValue = list.get(list.size() - 1).longValue();
                long longValue2 = this.isRatisEnabled ? this.indexToTerm.apply(Long.valueOf(longValue)).longValue() : -1L;
                addToBatchTransactionInfoWithTrace(addToBatch, longValue, () -> {
                    this.omMetadataManager.getTransactionInfoTable().putWithBatch(initBatchOperation, "#TRANSACTIONINFO", new TransactionInfo.Builder().setTransactionIndex(longValue).setCurrentTerm(longValue2).build());
                });
                long monotonicNow = Time.monotonicNow();
                flushBatchWithTrace(addToBatch, queue.size(), () -> {
                    this.omMetadataManager.getStore().commitBatchOperation(initBatchOperation);
                });
                this.ozoneManagerDoubleBufferMetrics.updateFlushTime(Time.monotonicNow() - monotonicNow);
                if (initBatchOperation != null) {
                    initBatchOperation.close();
                }
                if (!this.isRatisEnabled) {
                    clearReadyFutureQueue(queue.size());
                }
                int size = queue.size();
                this.flushedTransactionCount.addAndGet(size);
                this.flushIterations.incrementAndGet();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Sync iteration {} flushed transactions in this iteration {}", Long.valueOf(this.flushIterations.get()), Integer.valueOf(size));
                }
                cleanupCache(hashMap);
                if (this.isRatisEnabled) {
                    releaseUnFlushedTransactions(size);
                }
                this.ozoneManagerRatisSnapShot.updateLastAppliedIndex(list);
                updateMetrics(size);
            } catch (Throwable th2) {
                if (initBatchOperation != null) {
                    initBatchOperation.close();
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (0 == 0) {
                th = th3;
            } else if (null != th3) {
                th.addSuppressed(th3);
            }
            throw th;
        }
    }

    private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> queue, BatchOperation batchOperation) {
        String str = null;
        Iterator<DoubleBufferEntry<OMClientResponse>> it = queue.iterator();
        while (it.hasNext()) {
            OMClientResponse response = it.next().getResponse();
            OzoneManagerProtocolProtos.OMResponse oMResponse = response.getOMResponse();
            str = oMResponse.getTraceID();
            try {
                addToBatchWithTrace(oMResponse, () -> {
                    response.checkAndUpdateDB(this.omMetadataManager, batchOperation);
                });
            } catch (IOException e) {
                terminate(e, 1, oMResponse);
            } catch (Throwable th) {
                terminate(th, 2, oMResponse);
            }
        }
        return str;
    }

    private List<Queue<DoubleBufferEntry<OMClientResponse>>> splitReadyBufferAtCreateSnapshot() {
        ArrayList arrayList = new ArrayList();
        Iterator<DoubleBufferEntry<OMClientResponse>> it = this.readyBuffer.iterator();
        OzoneManagerProtocolProtos.OMResponse oMResponse = null;
        while (true) {
            OzoneManagerProtocolProtos.OMResponse oMResponse2 = oMResponse;
            if (!it.hasNext()) {
                return arrayList;
            }
            DoubleBufferEntry<OMClientResponse> next = it.next();
            OzoneManagerProtocolProtos.OMResponse oMResponse3 = next.getResponse().getOMResponse();
            if (arrayList.isEmpty() || oMResponse3.hasCreateSnapshotResponse() || (oMResponse2 != null && oMResponse2.hasCreateSnapshotResponse())) {
                arrayList.add(new LinkedList());
            }
            ((Queue) arrayList.get(arrayList.size() - 1)).add(next);
            oMResponse = oMResponse3;
        }
    }

    private void addCleanupEntry(DoubleBufferEntry doubleBufferEntry, Map<String, List<Long>> map) {
        Class<?> cls = doubleBufferEntry.getResponse().getClass();
        CleanupTableInfo cleanupTableInfo = (CleanupTableInfo) cls.getAnnotation(CleanupTableInfo.class);
        if (cleanupTableInfo == null) {
            throw new RuntimeException("CleanupTableInfo Annotation is missing for" + cls);
        }
        Iterator it = (cleanupTableInfo.cleanupAll() ? (List) new OMDBDefinition().getColumnFamilies().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList()) : Arrays.asList(cleanupTableInfo.cleanupTables())).iterator();
        while (it.hasNext()) {
            map.computeIfAbsent((String) it.next(), str -> {
                return new ArrayList();
            }).add(Long.valueOf(doubleBufferEntry.getTrxLogIndex()));
        }
    }

    private void clearReadyFutureQueue(int i) {
        while (!this.readyFutureQueue.isEmpty() && i > 0) {
            this.readyFutureQueue.remove().complete(null);
            i--;
        }
    }

    private void cleanupCache(Map<String, List<Long>> map) {
        map.forEach((str, list) -> {
            Collections.sort(list);
            this.omMetadataManager.getTable(str).cleanupCache(list);
            if (str.equals(OMDBDefinition.S3_SECRET_TABLE.getName())) {
                this.s3SecretManager.clearS3Cache(list);
            }
        });
    }

    private synchronized void clearReadyBuffer() {
        this.readyBuffer.clear();
    }

    private void updateMetrics(int i) {
        this.ozoneManagerDoubleBufferMetrics.incrTotalNumOfFlushOperations();
        this.ozoneManagerDoubleBufferMetrics.incrTotalSizeOfFlushedTransactions(i);
        this.ozoneManagerDoubleBufferMetrics.setAvgFlushTransactionsInOneIteration(((float) this.ozoneManagerDoubleBufferMetrics.getTotalNumOfFlushedTransactions()) / ((float) this.ozoneManagerDoubleBufferMetrics.getTotalNumOfFlushOperations()));
        if (this.maxFlushedTransactionsInOneIteration < i) {
            this.maxFlushedTransactionsInOneIteration = i;
            this.ozoneManagerDoubleBufferMetrics.setMaxNumberOfTransactionsFlushedInOneIteration(i);
        }
        this.ozoneManagerDoubleBufferMetrics.updateQueueSize(i);
    }

    public void stop() {
        stopDaemon();
        this.ozoneManagerDoubleBufferMetrics.unRegister();
    }

    @VisibleForTesting
    public void stopDaemon() {
        if (!this.isRunning.compareAndSet(true, false)) {
            LOG.info("OMDoubleBuffer flush thread is not running.");
            return;
        }
        LOG.info("Stopping OMDoubleBuffer flush thread");
        this.daemon.interrupt();
        try {
            this.daemon.join();
        } catch (InterruptedException e) {
            LOG.debug("Interrupted while waiting for daemon to exit.", e);
        }
    }

    private void terminate(Throwable th, int i) {
        terminate(th, i, null);
    }

    private void terminate(Throwable th, int i, OzoneManagerProtocolProtos.OMResponse oMResponse) {
        StringBuilder sb = new StringBuilder("During flush to DB encountered error in OMDoubleBuffer flush thread " + Thread.currentThread().getName());
        if (oMResponse != null) {
            sb.append(" when handling OMRequest: ").append(oMResponse);
        }
        ExitUtils.terminate(i, sb.toString(), th, LOG);
    }

    public long getFlushedTransactionCount() {
        return this.flushedTransactionCount.get();
    }

    public long getFlushIterations() {
        return this.flushIterations.get();
    }

    public synchronized CompletableFuture<Void> add(OMClientResponse oMClientResponse, long j) {
        this.currentBuffer.add(new DoubleBufferEntry<>(j, oMClientResponse));
        notify();
        if (this.isRatisEnabled) {
            return null;
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.currentFutureQueue.add(completableFuture);
        return completableFuture;
    }

    private synchronized boolean canFlush() {
        while (this.currentBuffer.size() == 0) {
            try {
                this.flushNotifier.notifyFlush();
                this.flushNotifier.notifyFlush();
                wait(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (this.isRunning.get()) {
                    ExitUtils.terminate(1, "OMDoubleBuffer flush thread " + Thread.currentThread().getName() + " encountered Interrupted exception while running", e, LOG);
                }
                LOG.info("OMDoubleBuffer flush thread {} is interrupted and will exit.", Thread.currentThread().getName());
                return false;
            }
        }
        return true;
    }

    private synchronized void swapCurrentAndReadyBuffer() {
        Queue<DoubleBufferEntry<OMClientResponse>> queue = this.currentBuffer;
        this.currentBuffer = this.readyBuffer;
        this.readyBuffer = queue;
        if (this.isRatisEnabled) {
            return;
        }
        Queue<CompletableFuture<Void>> queue2 = this.currentFutureQueue;
        this.currentFutureQueue = this.readyFutureQueue;
        this.readyFutureQueue = queue2;
    }

    @VisibleForTesting
    public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
        return this.ozoneManagerDoubleBufferMetrics;
    }

    @VisibleForTesting
    int getCurrentBufferSize() {
        return this.currentBuffer.size();
    }

    @VisibleForTesting
    int getReadyBufferSize() {
        return this.readyBuffer.size();
    }

    @VisibleForTesting
    void resume() {
        this.isRunning.set(true);
    }

    public void awaitFlush() throws InterruptedException {
        this.flushNotifier.await();
    }

    /* synthetic */ OzoneManagerDoubleBuffer(OMMetadataManager oMMetadataManager, OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot, boolean z, boolean z2, Function function, int i, FlushNotifier flushNotifier, S3SecretManager s3SecretManager, String str, OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer) {
        this(oMMetadataManager, ozoneManagerRatisSnapshot, z, z2, function, i, flushNotifier, s3SecretManager, str);
    }
}
