package org.apache.omid.tso;

import java.io.IOException;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
import org.jboss.netty.channel.Channel;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/omid/tso/TestPersistenceProcessor.class */
public class TestPersistenceProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessor.class);
    private static final long ANY_LWM = 1234;
    private static final int ANY_ST = 0;
    private static final int ANY_CT = 1;

    @Mock
    private CommitTable.Writer mockWriter;

    @Mock
    private CommitTable.Client mockClient;

    @Mock
    private RetryProcessor retryProcessor;

    @Mock
    private Panicker panicker;
    private MetricsRegistry metrics;
    private CommitTable commitTable;

    @BeforeMethod(alwaysRun = true, timeOut = 30000)
    public void initMocksAndComponents() throws Exception {
        MockitoAnnotations.initMocks(this);
        this.metrics = new NullMetricsProvider();
        this.commitTable = new CommitTable() { // from class: org.apache.omid.tso.TestPersistenceProcessor.1
            public CommitTable.Writer getWriter() {
                return TestPersistenceProcessor.this.mockWriter;
            }

            public CommitTable.Client getClient() {
                return TestPersistenceProcessor.this.mockClient;
            }
        };
    }

    @AfterMethod
    void afterMethod() {
        Mockito.reset(new CommitTable.Writer[]{this.mockWriter});
    }

    @Test(timeOut = 30000)
    public void testLowWatermarkIsPersisted() throws Exception {
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        PersistenceProcessorHandler[] persistenceProcessorHandlerArr = new PersistenceProcessorHandler[tSOServerConfig.getNumConcurrentCTWriters()];
        for (int i = ANY_ST; i < tSOServerConfig.getNumConcurrentCTWriters(); i += ANY_CT) {
            persistenceProcessorHandlerArr[i] = new PersistenceProcessorHandler(this.metrics, "localhost:1234", (LeaseManagement) Mockito.mock(LeaseManager.class), this.commitTable, (ReplyProcessor) Mockito.mock(ReplyProcessor.class), this.retryProcessor, this.panicker);
        }
        new PersistenceProcessorImpl(tSOServerConfig, this.commitTable, (ObjectPool) Mockito.mock(ObjectPool.class), this.panicker, persistenceProcessorHandlerArr, this.metrics).persistLowWatermark(ANY_LWM).get();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.class);
        ((CommitTable.Writer) Mockito.verify(this.commitTable.getWriter(), Mockito.timeout(100).times(ANY_CT))).updateLowWatermark(((Long) forClass.capture()).longValue());
        Assert.assertEquals(((Long) forClass.getValue()).longValue(), ANY_LWM);
    }

    @Test(timeOut = 30000)
    public void testCommitPersistenceWithSingleCommitTableWriter() throws Exception {
        VoidLeaseManager voidLeaseManager = (VoidLeaseManager) Mockito.spy(new VoidLeaseManager((TSOChannelHandler) Mockito.mock(TSOChannelHandler.class), (TSOStateManager) Mockito.mock(TSOStateManager.class)));
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setBatchSizePerCTWriter(2);
        tSOServerConfig.setNumConcurrentCTWriters(ANY_CT);
        ObjectPool objectPool = (ObjectPool) Mockito.spy(new BatchPoolModule(tSOServerConfig).getBatchPool());
        ReplyProcessorImpl replyProcessorImpl = new ReplyProcessorImpl(this.metrics, this.panicker, objectPool);
        PersistenceProcessorHandler[] persistenceProcessorHandlerArr = new PersistenceProcessorHandler[tSOServerConfig.getNumConcurrentCTWriters()];
        for (int i = ANY_ST; i < tSOServerConfig.getNumConcurrentCTWriters(); i += ANY_CT) {
            persistenceProcessorHandlerArr[i] = new PersistenceProcessorHandler(this.metrics, "localhost:1234", voidLeaseManager, this.commitTable, replyProcessorImpl, this.retryProcessor, this.panicker);
        }
        PersistenceProcessorImpl persistenceProcessorImpl = new PersistenceProcessorImpl(tSOServerConfig, this.commitTable, objectPool, this.panicker, persistenceProcessorHandlerArr, this.metrics);
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(ANY_CT))).borrowObject();
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(3))).borrowObject();
    }

    @Test(timeOut = 30000)
    public void testCommitPersistenceWithMultipleCommitTableWriters() throws Exception {
        VoidLeaseManager voidLeaseManager = (VoidLeaseManager) Mockito.spy(new VoidLeaseManager((TSOChannelHandler) Mockito.mock(TSOChannelHandler.class), (TSOStateManager) Mockito.mock(TSOStateManager.class)));
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setBatchSizePerCTWriter(2);
        tSOServerConfig.setNumConcurrentCTWriters(2);
        ObjectPool objectPool = (ObjectPool) Mockito.spy(new BatchPoolModule(tSOServerConfig).getBatchPool());
        ReplyProcessorImpl replyProcessorImpl = new ReplyProcessorImpl(this.metrics, this.panicker, objectPool);
        PersistenceProcessorHandler[] persistenceProcessorHandlerArr = new PersistenceProcessorHandler[tSOServerConfig.getNumConcurrentCTWriters()];
        for (int i = ANY_ST; i < tSOServerConfig.getNumConcurrentCTWriters(); i += ANY_CT) {
            persistenceProcessorHandlerArr[i] = new PersistenceProcessorHandler(this.metrics, "localhost:1234", voidLeaseManager, this.commitTable, replyProcessorImpl, this.retryProcessor, this.panicker);
        }
        PersistenceProcessorImpl persistenceProcessorImpl = new PersistenceProcessorImpl(tSOServerConfig, this.commitTable, objectPool, this.panicker, persistenceProcessorHandlerArr, this.metrics);
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(ANY_CT))).borrowObject();
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(2))).borrowObject();
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(3))).borrowObject();
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(3))).borrowObject();
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(5))).borrowObject();
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(5))).borrowObject();
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(6))).borrowObject();
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(6))).borrowObject();
    }

    @Test(timeOut = 30000)
    public void testCommitPersistenceWithNonHALeaseManager() throws Exception {
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setBatchSizePerCTWriter(ANY_CT);
        tSOServerConfig.setNumConcurrentCTWriters(ANY_CT);
        tSOServerConfig.setBatchPersistTimeoutInMs(100);
        ObjectPool objectPool = (ObjectPool) Mockito.spy(new BatchPoolModule(tSOServerConfig).getBatchPool());
        ReplyProcessorImpl replyProcessorImpl = new ReplyProcessorImpl(this.metrics, this.panicker, objectPool);
        VoidLeaseManager voidLeaseManager = (VoidLeaseManager) Mockito.spy(new VoidLeaseManager((TSOChannelHandler) Mockito.mock(TSOChannelHandler.class), (TSOStateManager) Mockito.mock(TSOStateManager.class)));
        PersistenceProcessorHandler[] persistenceProcessorHandlerArr = new PersistenceProcessorHandler[tSOServerConfig.getNumConcurrentCTWriters()];
        for (int i = ANY_ST; i < tSOServerConfig.getNumConcurrentCTWriters(); i += ANY_CT) {
            persistenceProcessorHandlerArr[i] = new PersistenceProcessorHandler(this.metrics, "localhost:1234", voidLeaseManager, this.commitTable, replyProcessorImpl, this.retryProcessor, this.panicker);
        }
        PersistenceProcessorImpl persistenceProcessorImpl = new PersistenceProcessorImpl(tSOServerConfig, this.commitTable, objectPool, this.panicker, persistenceProcessorHandlerArr, this.metrics);
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        ((VoidLeaseManager) Mockito.verify(voidLeaseManager, Mockito.timeout(1000).times(2))).stillInLeasePeriod();
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(2))).borrowObject();
    }

    @Test(timeOut = 30000)
    public void testCommitPersistenceWithHALeaseManagerAndMinimumCommitTableWriters() throws Exception {
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setNumConcurrentCTWriters(2);
        testPersistenceWithHALeaseManagerPreservingLease(tSOServerConfig);
        testPersistenceWithHALeaseManagerFailingToPreserveLease1(tSOServerConfig);
        testPersistenceWithHALeaseManagerFailingToPreserveLease2(tSOServerConfig);
        testPersistenceWithHALeaseManagerFailingToPreserveLease3(tSOServerConfig);
    }

    @Test(timeOut = 30000)
    public void testCommitPersistenceWithHALeaseManagerAndMultipleCommitTableWriters() throws Exception {
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setNumConcurrentCTWriters(4);
        tSOServerConfig.setBatchSizePerCTWriter(4);
        tSOServerConfig.setBatchPersistTimeoutInMs(100);
        testPersistenceWithHALeaseManagerPreservingLease(tSOServerConfig);
        testPersistenceWithHALeaseManagerFailingToPreserveLease1(tSOServerConfig);
        testPersistenceWithHALeaseManagerFailingToPreserveLease2(tSOServerConfig);
        testPersistenceWithHALeaseManagerFailingToPreserveLease3(tSOServerConfig);
    }

    private void testPersistenceWithHALeaseManagerPreservingLease(TSOServerConfig tSOServerConfig) throws Exception {
        LeaseManager leaseManager = (LeaseManager) Mockito.mock(LeaseManager.class);
        ObjectPool<Batch> objectPool = (ObjectPool) Mockito.spy(new BatchPoolModule(tSOServerConfig).getBatchPool());
        PersistenceProcessorImpl persistenceProcessorImpl = new PersistenceProcessorImpl(tSOServerConfig, this.commitTable, objectPool, this.panicker, configureHandlers(tSOServerConfig, leaseManager, objectPool), this.metrics);
        ((LeaseManager) Mockito.doReturn(true).when(leaseManager)).stillInLeasePeriod();
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        ((LeaseManager) Mockito.verify(leaseManager, Mockito.timeout(1000).times(2))).stillInLeasePeriod();
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(2))).borrowObject();
    }

    private void testPersistenceWithHALeaseManagerFailingToPreserveLease1(TSOServerConfig tSOServerConfig) throws Exception {
        LeaseManager leaseManager = (LeaseManager) Mockito.mock(LeaseManager.class);
        ObjectPool<Batch> objectPool = (ObjectPool) Mockito.spy(new BatchPoolModule(tSOServerConfig).getBatchPool());
        PersistenceProcessorImpl persistenceProcessorImpl = new PersistenceProcessorImpl(tSOServerConfig, this.commitTable, objectPool, this.panicker, configureHandlers(tSOServerConfig, leaseManager, objectPool), this.metrics);
        ((LeaseManager) Mockito.doReturn(true).doReturn(false).when(leaseManager)).stillInLeasePeriod();
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        ((LeaseManager) Mockito.verify(leaseManager, Mockito.timeout(1000).times(2))).stillInLeasePeriod();
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(2))).borrowObject();
    }

    private void testPersistenceWithHALeaseManagerFailingToPreserveLease2(TSOServerConfig tSOServerConfig) throws Exception {
        LeaseManager leaseManager = (LeaseManager) Mockito.mock(LeaseManager.class);
        ObjectPool<Batch> objectPool = (ObjectPool) Mockito.spy(new BatchPoolModule(tSOServerConfig).getBatchPool());
        PersistenceProcessorImpl persistenceProcessorImpl = new PersistenceProcessorImpl(tSOServerConfig, this.commitTable, objectPool, this.panicker, configureHandlers(tSOServerConfig, leaseManager, objectPool), this.metrics);
        ((LeaseManager) Mockito.doReturn(false).when(leaseManager)).stillInLeasePeriod();
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        ((LeaseManager) Mockito.verify(leaseManager, Mockito.timeout(1000).times(ANY_CT))).stillInLeasePeriod();
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(2))).borrowObject();
    }

    private void testPersistenceWithHALeaseManagerFailingToPreserveLease3(TSOServerConfig tSOServerConfig) throws Exception {
        LeaseManager leaseManager = (LeaseManager) Mockito.mock(LeaseManager.class);
        ObjectPool<Batch> objectPool = (ObjectPool) Mockito.spy(new BatchPoolModule(tSOServerConfig).getBatchPool());
        PersistenceProcessorImpl persistenceProcessorImpl = new PersistenceProcessorImpl(tSOServerConfig, this.commitTable, objectPool, this.panicker, configureHandlers(tSOServerConfig, leaseManager, objectPool), this.metrics);
        ((CommitTable.Writer) Mockito.doThrow(new IOException("Unable to write")).when(this.mockWriter)).flush();
        ((LeaseManager) Mockito.doReturn(true).doReturn(false).when(leaseManager)).stillInLeasePeriod();
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), (MonitoringContext) Mockito.mock(MonitoringContext.class));
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        ((LeaseManager) Mockito.verify(leaseManager, Mockito.timeout(1000).times(ANY_CT))).stillInLeasePeriod();
        ((ObjectPool) Mockito.verify(objectPool, Mockito.times(2))).borrowObject();
    }

    private PersistenceProcessorHandler[] configureHandlers(TSOServerConfig tSOServerConfig, LeaseManager leaseManager, ObjectPool<Batch> objectPool) throws Exception {
        PersistenceProcessorHandler[] persistenceProcessorHandlerArr = new PersistenceProcessorHandler[tSOServerConfig.getNumConcurrentCTWriters()];
        for (int i = ANY_ST; i < tSOServerConfig.getNumConcurrentCTWriters(); i += ANY_CT) {
            persistenceProcessorHandlerArr[i] = new PersistenceProcessorHandler(this.metrics, "localhost:1234", leaseManager, this.commitTable, new ReplyProcessorImpl(this.metrics, this.panicker, objectPool), this.retryProcessor, new RuntimeExceptionPanicker());
        }
        return persistenceProcessorHandlerArr;
    }

    @Test(timeOut = 30000)
    public void testCommitTableExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
        LeaseManagement leaseManagement = (LeaseManagement) Mockito.mock(LeaseManagement.class);
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        ObjectPool objectPool = (ObjectPool) Mockito.spy(new BatchPoolModule(tSOServerConfig).getBatchPool());
        ReplyProcessorImpl replyProcessorImpl = new ReplyProcessorImpl(this.metrics, this.panicker, objectPool);
        PersistenceProcessorHandler[] persistenceProcessorHandlerArr = new PersistenceProcessorHandler[tSOServerConfig.getNumConcurrentCTWriters()];
        for (int i = ANY_ST; i < tSOServerConfig.getNumConcurrentCTWriters(); i += ANY_CT) {
            persistenceProcessorHandlerArr[i] = new PersistenceProcessorHandler(this.metrics, "localhost:1234", leaseManagement, this.commitTable, replyProcessorImpl, (RetryProcessor) Mockito.mock(RetryProcessor.class), this.panicker);
        }
        PersistenceProcessorImpl persistenceProcessorImpl = new PersistenceProcessorImpl(tSOServerConfig, this.commitTable, objectPool, this.panicker, persistenceProcessorHandlerArr, this.metrics);
        MonitoringContext monitoringContext = new MonitoringContext(this.metrics);
        ((LeaseManagement) Mockito.doReturn(true).when(leaseManagement)).stillInLeasePeriod();
        ((CommitTable.Writer) Mockito.doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(this.mockWriter)).flush();
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), monitoringContext);
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        ((Panicker) Mockito.verify(this.panicker, Mockito.timeout(1000).atLeastOnce())).panic(Matchers.anyString(), (Throwable) Matchers.any(Throwable.class));
    }

    @Test(timeOut = 30000)
    public void testRuntimeExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        ObjectPool batchPool = new BatchPoolModule(tSOServerConfig).getBatchPool();
        ReplyProcessorImpl replyProcessorImpl = new ReplyProcessorImpl(this.metrics, this.panicker, batchPool);
        PersistenceProcessorHandler[] persistenceProcessorHandlerArr = new PersistenceProcessorHandler[tSOServerConfig.getNumConcurrentCTWriters()];
        for (int i = ANY_ST; i < tSOServerConfig.getNumConcurrentCTWriters(); i += ANY_CT) {
            persistenceProcessorHandlerArr[i] = new PersistenceProcessorHandler(this.metrics, "localhost:1234", (LeaseManagement) Mockito.mock(LeaseManager.class), this.commitTable, replyProcessorImpl, this.retryProcessor, this.panicker);
        }
        PersistenceProcessorImpl persistenceProcessorImpl = new PersistenceProcessorImpl(tSOServerConfig, this.commitTable, batchPool, this.panicker, persistenceProcessorHandlerArr, this.metrics);
        ((CommitTable.Writer) Mockito.doThrow(new RuntimeException("Kaboom!")).when(this.mockWriter)).addCommittedTransaction(Matchers.anyLong(), Matchers.anyLong());
        persistenceProcessorImpl.addCommitToBatch(0L, 1L, (Channel) Mockito.mock(Channel.class), new MonitoringContext(this.metrics));
        persistenceProcessorImpl.triggerCurrentBatchFlush();
        ((Panicker) Mockito.verify(this.panicker, Mockito.timeout(1000).atLeastOnce())).panic(Matchers.anyString(), (Throwable) Matchers.any(Throwable.class));
    }
}
