package org.apache.omid.tso;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.phoenix.shaded.com.google.inject.name.Named;
import org.apache.phoenix.shaded.com.lmax.disruptor.EventFactory;
import org.apache.phoenix.shaded.com.lmax.disruptor.RingBuffer;
import org.apache.phoenix.shaded.com.lmax.disruptor.WaitStrategy;
import org.apache.phoenix.shaded.com.lmax.disruptor.dsl.Disruptor;
import org.apache.phoenix.shaded.com.lmax.disruptor.dsl.ProducerType;
import org.apache.phoenix.shaded.javax.inject.Inject;
import org.apache.phoenix.shaded.org.apache.commons.pool2.ObjectPool;
import org.apache.phoenix.shaded.org.jboss.netty.channel.Channel;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/omid/tso/PersistenceProcessorImpl.class */
class PersistenceProcessorImpl implements PersistenceProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorImpl.class);
    private final ExecutorService disruptorExec;
    private final Disruptor<PersistBatchEvent> disruptor;
    private final RingBuffer<PersistBatchEvent> persistRing;
    private final ObjectPool<Batch> batchPool;

    @VisibleForTesting
    Batch currentBatch;
    private volatile long batchSequence;
    private MetricsRegistry metrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/omid/tso/PersistenceProcessorImpl$PersistBatchEvent.class */
    public static final class PersistBatchEvent {
        private long batchSequence;
        private Batch batch;
        static final EventFactory<PersistBatchEvent> EVENT_FACTORY = new EventFactory<PersistBatchEvent>() { // from class: org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.phoenix.shaded.com.lmax.disruptor.EventFactory
            public PersistBatchEvent newInstance() {
                return new PersistBatchEvent();
            }
        };

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void makePersistBatch(PersistBatchEvent persistBatchEvent, long j, Batch batch) {
            persistBatchEvent.batch = batch;
            persistBatchEvent.batchSequence = j;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Batch getBatch() {
            return this.batch;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public long getBatchSequence() {
            return this.batchSequence;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("batchSequence", this.batchSequence).add("batch", this.batch).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Inject
    public PersistenceProcessorImpl(TSOServerConfig tSOServerConfig, @Named("PersistenceStrategy") WaitStrategy waitStrategy, CommitTable commitTable, ObjectPool<Batch> objectPool, Panicker panicker, PersistenceProcessorHandler[] persistenceProcessorHandlerArr, MetricsRegistry metricsRegistry) throws Exception {
        this.disruptorExec = Executors.newFixedThreadPool(tSOServerConfig.getNumConcurrentCTWriters(), new ThreadFactoryBuilder().setNameFormat("persist-%d").build());
        this.disruptor = new Disruptor<>(PersistBatchEvent.EVENT_FACTORY, 1048576, this.disruptorExec, ProducerType.SINGLE, waitStrategy);
        this.disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
        this.disruptor.handleEventsWithWorkerPool(persistenceProcessorHandlerArr);
        this.persistRing = this.disruptor.start();
        this.metrics = metricsRegistry;
        this.batchSequence = 0L;
        this.batchPool = objectPool;
        this.currentBatch = objectPool.borrowObject();
        LOG.info("PersistentProcessor initialized");
    }

    @Override // org.apache.omid.tso.PersistenceProcessor
    public void triggerCurrentBatchFlush() throws Exception {
        if (this.currentBatch.isEmpty()) {
            return;
        }
        long next = this.persistRing.next();
        PersistBatchEvent persistBatchEvent = this.persistRing.get(next);
        long j = this.batchSequence;
        this.batchSequence = j + 1;
        PersistBatchEvent.makePersistBatch(persistBatchEvent, j, this.currentBatch);
        this.persistRing.publish(next);
        this.currentBatch = this.batchPool.borrowObject();
    }

    @Override // org.apache.omid.tso.PersistenceProcessor
    public void addCommitToBatch(long j, long j2, Channel channel, MonitoringContext monitoringContext, Optional<Long> optional) throws Exception {
        this.currentBatch.addCommit(j, j2, channel, monitoringContext, optional);
        if (this.currentBatch.isFull()) {
            triggerCurrentBatchFlush();
        }
    }

    @Override // org.apache.omid.tso.PersistenceProcessor
    public void addCommitRetryToBatch(long j, Channel channel, MonitoringContext monitoringContext) throws Exception {
        this.currentBatch.addCommitRetry(j, channel, monitoringContext);
        if (this.currentBatch.isFull()) {
            triggerCurrentBatchFlush();
        }
    }

    @Override // org.apache.omid.tso.PersistenceProcessor
    public void addAbortToBatch(long j, Channel channel, MonitoringContext monitoringContext) throws Exception {
        this.currentBatch.addAbort(j, channel, monitoringContext);
        if (this.currentBatch.isFull()) {
            triggerCurrentBatchFlush();
        }
    }

    @Override // org.apache.omid.tso.PersistenceProcessor
    public void addTimestampToBatch(long j, Channel channel, MonitoringContext monitoringContext) throws Exception {
        this.currentBatch.addTimestamp(j, channel, monitoringContext);
        if (this.currentBatch.isFull()) {
            triggerCurrentBatchFlush();
        }
    }

    @Override // org.apache.omid.tso.PersistenceProcessor
    public void addFenceToBatch(long j, long j2, Channel channel, MonitoringContext monitoringContext) throws Exception {
        this.currentBatch.addFence(j, j2, channel, monitoringContext);
        if (this.currentBatch.isFull()) {
            triggerCurrentBatchFlush();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        LOG.info("Terminating Persistence Processor...");
        this.disruptor.halt();
        this.disruptor.shutdown();
        LOG.info("\tPersistence Processor Disruptor shutdown");
        this.disruptorExec.shutdownNow();
        try {
            this.disruptorExec.awaitTermination(3L, TimeUnit.SECONDS);
            LOG.info("\tPersistence Processor Disruptor executor shutdown");
        } catch (InterruptedException e) {
            LOG.error("Interrupted whilst finishing Persistence Processor Disruptor executor");
            Thread.currentThread().interrupt();
        }
        LOG.info("Persistence Processor terminated");
    }
}
