package org.apache.omid.tso;

import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.metrics.Meter;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.proto.TSOProto;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/omid/tso/ReplyProcessorImpl.class */
class ReplyProcessorImpl implements EventHandler<ReplyBatchEvent>, ReplyProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(ReplyProcessorImpl.class);
    private final Disruptor<ReplyBatchEvent> disruptor;
    private final RingBuffer<ReplyBatchEvent> replyRing;
    private final ObjectPool<Batch> batchPool;

    @VisibleForTesting
    PriorityQueue<ReplyBatchEvent> futureEvents;
    private final Meter abortMeter;
    private final Meter commitMeter;
    private final Meter timestampMeter;
    private final Meter fenceMeter;

    @VisibleForTesting
    AtomicLong nextIDToHandle = new AtomicLong();
    private final ExecutorService disruptorExec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("reply-%d").build());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/omid/tso/ReplyProcessorImpl$ReplyBatchEvent.class */
    public static final class ReplyBatchEvent {
        private Batch batch;
        private long batchSequence;
        static final EventFactory<ReplyBatchEvent> EVENT_FACTORY = new EventFactory<ReplyBatchEvent>() { // from class: org.apache.omid.tso.ReplyProcessorImpl.ReplyBatchEvent.1
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public ReplyBatchEvent m27newInstance() {
                return new ReplyBatchEvent();
            }
        };

        ReplyBatchEvent() {
        }

        static void makeReplyBatch(ReplyBatchEvent replyBatchEvent, Batch batch, long j) {
            replyBatchEvent.batch = batch;
            replyBatchEvent.batchSequence = j;
        }

        Batch getBatch() {
            return this.batch;
        }

        long getBatchSequence() {
            return this.batchSequence;
        }
    }

    @Inject
    ReplyProcessorImpl(@Named("ReplyStrategy") WaitStrategy waitStrategy, MetricsRegistry metricsRegistry, Panicker panicker, ObjectPool<Batch> objectPool) {
        this.disruptor = new Disruptor<>(ReplyBatchEvent.EVENT_FACTORY, 4096, this.disruptorExec, ProducerType.MULTI, waitStrategy);
        this.disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
        this.disruptor.handleEventsWith(new EventHandler[]{this});
        this.replyRing = this.disruptor.start();
        this.batchPool = objectPool;
        this.nextIDToHandle.set(0L);
        this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() { // from class: org.apache.omid.tso.ReplyProcessorImpl.1
            @Override // java.util.Comparator
            public int compare(ReplyBatchEvent replyBatchEvent, ReplyBatchEvent replyBatchEvent2) {
                return Long.compare(replyBatchEvent.getBatchSequence(), replyBatchEvent2.getBatchSequence());
            }
        });
        this.abortMeter = metricsRegistry.meter(MetricRegistry.name("tso", new String[]{"aborts"}));
        this.commitMeter = metricsRegistry.meter(MetricRegistry.name("tso", new String[]{"commits"}));
        this.timestampMeter = metricsRegistry.meter(MetricRegistry.name("tso", new String[]{"timestampAllocation"}));
        this.fenceMeter = metricsRegistry.meter(MetricRegistry.name("tso", new String[]{"fences"}));
        LOG.info("ReplyProcessor initialized");
    }

    @VisibleForTesting
    void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
        Batch batch = replyBatchEvent.getBatch();
        for (int i = 0; i < batch.getNumEvents(); i++) {
            PersistEvent persistEvent = batch.get(i);
            switch (persistEvent.getType()) {
                case COMMIT:
                    sendCommitResponse(persistEvent.getStartTimestamp(), persistEvent.getCommitTimestamp(), persistEvent.getChannel(), persistEvent.getMonCtx());
                    break;
                case ABORT:
                    sendAbortResponse(persistEvent.getStartTimestamp(), persistEvent.getChannel(), persistEvent.getMonCtx());
                    break;
                case TIMESTAMP:
                    sendTimestampResponse(persistEvent.getStartTimestamp(), persistEvent.getChannel(), persistEvent.getMonCtx());
                    break;
                case FENCE:
                    sendFenceResponse(persistEvent.getStartTimestamp(), persistEvent.getCommitTimestamp(), persistEvent.getChannel(), persistEvent.getMonCtx());
                    break;
                case COMMIT_RETRY:
                    throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + persistEvent);
                default:
                    throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + persistEvent);
            }
            persistEvent.getMonCtx().publish();
        }
        this.batchPool.returnObject(batch);
    }

    private void processWaitingEvents() throws Exception {
        while (!this.futureEvents.isEmpty() && this.futureEvents.peek().getBatchSequence() == this.nextIDToHandle.get()) {
            handleReplyBatchEvent(this.futureEvents.poll());
            this.nextIDToHandle.incrementAndGet();
        }
    }

    public void onEvent(ReplyBatchEvent replyBatchEvent, long j, boolean z) throws Exception {
        if (replyBatchEvent.getBatchSequence() > this.nextIDToHandle.get()) {
            this.futureEvents.add(replyBatchEvent);
            return;
        }
        handleReplyBatchEvent(replyBatchEvent);
        this.nextIDToHandle.incrementAndGet();
        processWaitingEvents();
    }

    @Override // org.apache.omid.tso.ReplyProcessor
    public void manageResponsesBatch(long j, Batch batch) {
        long next = this.replyRing.next();
        ReplyBatchEvent.makeReplyBatch((ReplyBatchEvent) this.replyRing.get(next), batch, j);
        this.replyRing.publish(next);
    }

    @Override // org.apache.omid.tso.ReplyProcessor
    public void sendCommitResponse(long j, long j2, Channel channel, MonitoringContext monitoringContext) {
        TSOProto.Response.Builder newBuilder = TSOProto.Response.newBuilder();
        TSOProto.CommitResponse.Builder newBuilder2 = TSOProto.CommitResponse.newBuilder();
        newBuilder2.setAborted(false).setStartTimestamp(j).setCommitTimestamp(j2);
        newBuilder.setCommitResponse(newBuilder2.build());
        channel.write(newBuilder.build());
        this.commitMeter.mark();
        monitoringContext.timerStop("reply.processor.commit.latency");
    }

    @Override // org.apache.omid.tso.ReplyProcessor
    public void sendAbortResponse(long j, Channel channel, MonitoringContext monitoringContext) {
        TSOProto.Response.Builder newBuilder = TSOProto.Response.newBuilder();
        TSOProto.CommitResponse.Builder newBuilder2 = TSOProto.CommitResponse.newBuilder();
        newBuilder2.setAborted(true);
        newBuilder2.setStartTimestamp(j);
        newBuilder.setCommitResponse(newBuilder2.build());
        channel.write(newBuilder.build());
        this.abortMeter.mark();
        monitoringContext.timerStop("reply.processor.abort.latency");
    }

    @Override // org.apache.omid.tso.ReplyProcessor
    public void sendTimestampResponse(long j, Channel channel, MonitoringContext monitoringContext) {
        TSOProto.Response.Builder newBuilder = TSOProto.Response.newBuilder();
        TSOProto.TimestampResponse.Builder newBuilder2 = TSOProto.TimestampResponse.newBuilder();
        newBuilder2.setStartTimestamp(j);
        newBuilder.setTimestampResponse(newBuilder2.build());
        channel.write(newBuilder.build());
        this.timestampMeter.mark();
        monitoringContext.timerStop("reply.processor.timestamp.latency");
    }

    @Override // org.apache.omid.tso.ReplyProcessor
    public void sendFenceResponse(long j, long j2, Channel channel, MonitoringContext monitoringContext) {
        TSOProto.Response.Builder newBuilder = TSOProto.Response.newBuilder();
        TSOProto.FenceResponse.Builder newBuilder2 = TSOProto.FenceResponse.newBuilder();
        newBuilder2.setTableId(j);
        newBuilder2.setFenceId(j2);
        newBuilder.setFenceResponse(newBuilder2.build());
        channel.write(newBuilder.build());
        monitoringContext.timerStop("reply.processor.fence.latency");
        this.fenceMeter.mark();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        LOG.info("Terminating Reply Processor...");
        this.disruptor.halt();
        this.disruptor.shutdown();
        LOG.info("\tReply Processor Disruptor shutdown");
        this.disruptorExec.shutdownNow();
        try {
            this.disruptorExec.awaitTermination(3L, TimeUnit.SECONDS);
            LOG.info("\tReply Processor Disruptor executor shutdown");
        } catch (InterruptedException e) {
            LOG.error("Interrupted whilst finishing Reply Processor Disruptor executor");
            Thread.currentThread().interrupt();
        }
        LOG.info("Reply Processor terminated");
    }
}
