package com.twitter.distributedlog;

import com.twitter.distributedlog.LogSegmentMetadata;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/distributedlog/TestReadAhead.class */
public class TestReadAhead extends TestDistributedLogBase {
    static final Logger logger = LoggerFactory.getLogger(TestReadAhead.class);

    @Test(timeout = 60000)
    public void testNoSuchLedgerExceptionOnReadLAC() throws Exception {
        LedgerDescriptor currentLedgerDescriptor;
        LedgerDescriptor currentLedgerDescriptor2;
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setReadAheadWaitTime(500);
        distributedLogConfiguration.setReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis(2000L);
        distributedLogConfiguration.setDLLedgerMetadataLayoutVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value);
        distributedLogConfiguration.setLogSegmentSequenceNumberValidationEnabled(false);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-nosuchledger-exception-on-readlac");
        DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(createNewDLM, distributedLogConfiguration, 1L, 1L, false, 0L, false);
        DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(createNewDLM, distributedLogConfiguration, 2L, 11L, true, 10L, true);
        BKDistributedLogManager createNewDLM2 = createNewDLM(distributedLogConfiguration, "distrlog-nosuchledger-exception-on-readlac");
        BKAsyncLogReaderDLSN asyncLogReader = createNewDLM2.getAsyncLogReader(DLSN.InitialDLSN);
        Future readNext = asyncLogReader.readNext();
        try {
            Await.result(readNext, Duration.fromMilliseconds(2000L));
            Assert.fail("Should not read any data beyond an empty inprogress log segment");
        } catch (TimeoutException e) {
        }
        while (true) {
            currentLedgerDescriptor = asyncLogReader.bkLedgerManager.readAheadWorker.getCurrentLedgerDescriptor();
            if (null != currentLedgerDescriptor) {
                break;
            } else {
                Thread.sleep(100L);
            }
        }
        TimeUnit.MILLISECONDS.sleep(4000L);
        while (true) {
            currentLedgerDescriptor2 = asyncLogReader.bkLedgerManager.readAheadWorker.getCurrentLedgerDescriptor();
            if (null != currentLedgerDescriptor2) {
                break;
            } else {
                Thread.sleep(100L);
            }
        }
        Assert.assertTrue("ledger handle should be reinitialized, after reaching error threshold.", currentLedgerDescriptor != currentLedgerDescriptor2);
        createNewDLM.close();
        createNewDLM(distributedLogConfiguration, "distrlog-nosuchledger-exception-on-readlac").recover();
        LogRecord logRecord = (LogRecord) Await.result(readNext);
        Assert.assertNotNull(logRecord);
        DLMTestUtil.verifyLogRecord(logRecord);
        Assert.assertEquals(11L, logRecord.getTransactionId());
        long j = 11 + 1;
        for (int i = 1; i < 10; i++) {
            LogRecord logRecord2 = (LogRecord) Await.result(asyncLogReader.readNext());
            Assert.assertNotNull(logRecord2);
            DLMTestUtil.verifyLogRecord(logRecord2);
            Assert.assertEquals(j, logRecord2.getTransactionId());
            j++;
        }
        Utils.close(asyncLogReader);
        createNewDLM2.close();
    }

    @Test(timeout = 60000)
    public void testReadAheadWaitOnEndOfStream() throws Exception {
        AsyncNotification metadataNotification;
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.loadConf(conf);
        distributedLogConfiguration.setZKNumRetries(0);
        distributedLogConfiguration.setReadAheadWaitTime(500);
        distributedLogConfiguration.setReadAheadWaitTimeOnEndOfStream(Integer.MAX_VALUE);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, "distrlog-readahead-wait-on-end-of-stream");
        DLMTestUtil.generateCompletedLogSegments(createNewDLM, distributedLogConfiguration, 3L, 10L);
        BKDistributedLogManager createNewDLM2 = createNewDLM(distributedLogConfiguration, "distrlog-readahead-wait-on-end-of-stream");
        BKAsyncLogReaderDLSN asyncLogReader = createNewDLM2.getAsyncLogReader(DLSN.InitialDLSN);
        int i = 0;
        long j = 1;
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= 3) {
                break;
            }
            long j4 = 1;
            while (true) {
                long j5 = j4;
                if (j5 <= 10) {
                    LogRecordWithDLSN logRecordWithDLSN = (LogRecordWithDLSN) Await.result(asyncLogReader.readNext());
                    long j6 = j;
                    j = j6 + 1;
                    Assert.assertEquals(j6, logRecordWithDLSN.getTransactionId());
                    DLMTestUtil.verifyLogRecord(logRecordWithDLSN);
                    i++;
                    j4 = j5 + 1;
                }
            }
            j2 = j3 + 1;
        }
        Assert.assertEquals(30L, i);
        Future readNext = asyncLogReader.readNext();
        while (true) {
            metadataNotification = asyncLogReader.bkLedgerManager.readAheadWorker.getMetadataNotification();
            if (null != metadataNotification) {
                break;
            } else {
                Thread.sleep(200L);
            }
        }
        Thread.sleep(1000L);
        ZooKeeperClientUtils.expireSession(asyncLogReader.bkLedgerManager.zooKeeperClient, zkServers, 1000);
        while (true) {
            Thread.sleep(200L);
            AsyncNotification metadataNotification2 = asyncLogReader.bkLedgerManager.readAheadWorker.getMetadataNotification();
            if (null != metadataNotification2 && metadataNotification != metadataNotification2) {
                BKSyncLogWriter startLogSegmentNonPartitioned = createNewDLM.startLogSegmentNonPartitioned();
                startLogSegmentNonPartitioned.write(DLMTestUtil.getLogRecordInstance(31L));
                startLogSegmentNonPartitioned.closeAndComplete();
                LogRecordWithDLSN logRecordWithDLSN2 = (LogRecordWithDLSN) Await.result(readNext);
                Assert.assertEquals(31L, logRecordWithDLSN2.getTransactionId());
                DLMTestUtil.verifyLogRecord(logRecordWithDLSN2);
                Utils.close(asyncLogReader);
                createNewDLM2.close();
                createNewDLM.close();
                return;
            }
        }
    }
}
