/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import org.apache.distributedlog.BKAsyncLogReader;
import org.apache.distributedlog.BKAsyncLogWriter;
import org.apache.distributedlog.BKDistributedLogManager;
import org.apache.distributedlog.BKLogSegmentWriter;
import org.apache.distributedlog.BKLogWriteHandler;
import org.apache.distributedlog.BKSyncLogWriter;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.EntryPosition;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.feature.CoreFeatureKeys;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryReader;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.Utils;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.pulsar.shade.org.apache.bookkeeper.feature.SettableFeature;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestRollLogSegments
extends TestDistributedLogBase {
    private static final Logger logger = LoggerFactory.getLogger(TestRollLogSegments.class);

    private static void ensureOnlyOneInprogressLogSegments(List<LogSegmentMetadata> segments) throws Exception {
        int numInprogress = 0;
        for (LogSegmentMetadata segment : segments) {
            if (!segment.isInProgress()) continue;
            ++numInprogress;
        }
        Assert.assertEquals((long)1L, (long)numInprogress);
    }

    @Test(timeout=60000L)
    public void testDisableRollingLogSegments() throws Exception {
        String name = "distrlog-disable-rolling-log-segments";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(conf);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setOutputBufferSize(0);
        confLocal.setLogSegmentRollingIntervalMinutes(0);
        confLocal.setMaxLogSegmentBytes(40L);
        int numEntries = 100;
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
        SettableFeature disableLogSegmentRolling = (SettableFeature)dlm.getFeatureProvider().getFeature(CoreFeatureKeys.DISABLE_LOGSEGMENT_ROLLING.name().toLowerCase());
        disableLogSegmentRolling.set(true);
        final CountDownLatch latch = new CountDownLatch(numEntries);
        int i = 1;
        while (i <= numEntries) {
            final int entryId = i++;
            writer.write(DLMTestUtil.getLogRecordInstance(entryId)).whenComplete(new FutureEventListener<DLSN>(){

                @Override
                public void onSuccess(DLSN value) {
                    logger.info("Completed entry {} : {}.", (Object)entryId, (Object)value);
                    latch.countDown();
                }

                @Override
                public void onFailure(Throwable cause) {
                }
            });
        }
        latch.await();
        writer.closeAndComplete();
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        Assert.assertEquals((long)1L, (long)segments.size());
        dlm.close();
    }

    @Test(timeout=600000L)
    public void testLastDLSNInRollingLogSegments() throws Exception {
        final HashMap lastDLSNs = new HashMap();
        String name = "distrlog-lastdlsn-in-rolling-log-segments";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(conf);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setOutputBufferSize(0);
        confLocal.setLogSegmentRollingIntervalMinutes(0);
        confLocal.setMaxLogSegmentBytes(40L);
        int numEntries = 100;
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
        final CountDownLatch latch = new CountDownLatch(numEntries);
        for (int i = 1; i <= numEntries; ++i) {
            final int entryId = i;
            CompletionStage writeFuture = writer.write(DLMTestUtil.getLogRecordInstance(entryId)).whenComplete(new FutureEventListener<DLSN>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onSuccess(DLSN value) {
                    logger.info("Completed entry {} : {}.", (Object)entryId, (Object)value);
                    Map map = lastDLSNs;
                    synchronized (map) {
                        DLSN lastDLSN = (DLSN)lastDLSNs.get(value.getLogSegmentSequenceNo());
                        if (null == lastDLSN || lastDLSN.compareTo(value) < 0) {
                            lastDLSNs.put(value.getLogSegmentSequenceNo(), value);
                        }
                    }
                    latch.countDown();
                }

                @Override
                public void onFailure(Throwable cause) {
                }
            });
            if (i != 1) continue;
            Utils.ioResult(writeFuture);
        }
        latch.await();
        writer.closeAndComplete();
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        logger.info("lastDLSNs after writes {} {}", (Object)lastDLSNs.size(), lastDLSNs);
        logger.info("segments after writes {} {}", (Object)segments.size(), segments);
        Assert.assertTrue((segments.size() >= 2 ? 1 : 0) != 0);
        Assert.assertTrue((lastDLSNs.size() >= 2 ? 1 : 0) != 0);
        Assert.assertEquals((long)lastDLSNs.size(), (long)segments.size());
        for (LogSegmentMetadata segment : segments) {
            DLSN dlsnInMetadata = segment.getLastDLSN();
            DLSN dlsnSeen = (DLSN)lastDLSNs.get(segment.getLogSegmentSequenceNumber());
            Assert.assertNotNull((Object)dlsnInMetadata);
            Assert.assertNotNull((Object)dlsnSeen);
            if (dlsnInMetadata.compareTo(dlsnSeen) != 0) {
                logger.error("Last dlsn recorded in log segment {} is different from the one already seen {}.", (Object)dlsnInMetadata, (Object)dlsnSeen);
            }
            Assert.assertEquals((long)0L, (long)dlsnInMetadata.compareTo(dlsnSeen));
        }
        dlm.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testUnableToRollLogSegments() throws Exception {
        String name = "distrlog-unable-to-roll-log-segments";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(conf);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setOutputBufferSize(0);
        confLocal.setLogSegmentRollingIntervalMinutes(0);
        confLocal.setMaxLogSegmentBytes(1L);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
        long txId = 1L;
        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
        FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate, FailpointUtils.FailPointActions.FailPointAction_Throw);
        try {
            int numRecords = 10;
            final CountDownLatch latch = new CountDownLatch(10);
            for (int i = 0; i < 10; ++i) {
                writer.write(DLMTestUtil.getLogRecordInstance(++txId)).whenComplete(new FutureEventListener<DLSN>(){

                    @Override
                    public void onSuccess(DLSN value) {
                        logger.info("Completed entry : {}.", (Object)value);
                        latch.countDown();
                    }

                    @Override
                    public void onFailure(Throwable cause) {
                        logger.error("Failed to write entries : ", cause);
                    }
                });
            }
            latch.await();
            writer.close();
            List<LogSegmentMetadata> segments = dlm.getLogSegments();
            logger.info("LogSegments: {}", segments);
            Assert.assertEquals((long)1L, (long)segments.size());
            long expectedTxID = 1L;
            LogReader reader = dlm.getInputStream(DLSN.InitialDLSN);
            LogRecordWithDLSN record = reader.readNext(false);
            while (null != record) {
                DLMTestUtil.verifyLogRecord(record);
                Assert.assertEquals((long)expectedTxID++, (long)record.getTransactionId());
                Assert.assertEquals((long)(record.getTransactionId() - 1L), (long)record.getSequenceId());
                record = reader.readNext(false);
            }
            Assert.assertEquals((long)12L, (long)expectedTxID);
            reader.close();
            dlm.close();
        }
        finally {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate);
        }
    }

    @Test(timeout=60000L)
    public void testRollingLogSegments() throws Exception {
        logger.info("start testRollingLogSegments");
        String name = "distrlog-rolling-logsegments-hightraffic";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(conf);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setOutputBufferSize(0);
        confLocal.setLogSegmentRollingIntervalMinutes(0);
        confLocal.setMaxLogSegmentBytes(1L);
        confLocal.setLogSegmentRollingConcurrency(Integer.MAX_VALUE);
        int numLogSegments = 10;
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
        final CountDownLatch latch = new CountDownLatch(numLogSegments);
        long startTime = System.currentTimeMillis();
        for (int i = 1; i <= numLogSegments; ++i) {
            final int entryId = i;
            CompletionStage writeFuture = writer.write(DLMTestUtil.getLogRecordInstance(entryId)).whenComplete(new FutureEventListener<DLSN>(){

                @Override
                public void onSuccess(DLSN value) {
                    logger.info("Completed entry {} : {}.", (Object)entryId, (Object)value);
                    latch.countDown();
                }

                @Override
                public void onFailure(Throwable cause) {
                    logger.error("Failed to write entries", cause);
                }
            });
            if (i != 1) continue;
            Utils.ioResult(writeFuture);
        }
        latch.await();
        logger.info("Took {} ms to completed all requests.", (Object)(System.currentTimeMillis() - startTime));
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        logger.info("LogSegments : {}", segments);
        Assert.assertTrue((segments.size() >= 2 ? 1 : 0) != 0);
        TestRollLogSegments.ensureOnlyOneInprogressLogSegments(segments);
        int numSegmentsAfterAsyncWrites = segments.size();
        for (int i = 1; i <= numLogSegments; ++i) {
            DLSN newDLSN = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(numLogSegments + i)));
            logger.info("Completed entry {} : {}", (Object)(numLogSegments + i), (Object)newDLSN);
        }
        segments = dlm.getLogSegments();
        logger.info("LogSegments : {}", segments);
        Assert.assertEquals((long)(numSegmentsAfterAsyncWrites + numLogSegments / 2), (long)segments.size());
        TestRollLogSegments.ensureOnlyOneInprogressLogSegments(segments);
        writer.close();
        dlm.close();
    }

    private void checkAndWaitWriterReaderPosition(BKLogSegmentWriter writer, long expectedWriterPosition, BKAsyncLogReader reader, long expectedReaderPosition, LedgerHandle inspector, long expectedLac) throws Exception {
        while (this.getLedgerHandle(writer).getLastAddConfirmed() < expectedWriterPosition) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((long)expectedWriterPosition, (long)this.getLedgerHandle(writer).getLastAddConfirmed());
        Assert.assertEquals((long)expectedLac, (long)inspector.readLastConfirmed());
        EntryPosition readPosition = reader.getReadAheadReader().getNextEntryPosition();
        logger.info("ReadAhead moved read position {} : ", (Object)readPosition);
        while (readPosition.getEntryId() < expectedReaderPosition) {
            Thread.sleep(1000L);
            readPosition = reader.getReadAheadReader().getNextEntryPosition();
            logger.info("ReadAhead moved read position {} : ", (Object)readPosition);
        }
        Assert.assertEquals((long)expectedReaderPosition, (long)readPosition.getEntryId());
    }

    @Test(timeout=60000L)
    public void testCaughtUpReaderOnLogSegmentRolling() throws Exception {
        BKLogSegmentEntryReader entryReader;
        String name = "distrlog-caughtup-reader-on-logsegment-rolling";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(conf);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setOutputBufferSize(0x400000);
        confLocal.setTraceReadAheadMetadataChanges(true);
        confLocal.setEnsembleSize(1);
        confLocal.setWriteQuorumSize(1);
        confLocal.setAckQuorumSize(1);
        confLocal.setReadLACLongPollTimeout(99999999);
        confLocal.setReaderIdleWarnThresholdMillis(199999999);
        confLocal.setBKClientReadTimeout(100000000);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        BKSyncLogWriter writer = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned();
        int numEntries = 5;
        for (int i = 1; i <= 5; ++i) {
            writer.write(DLMTestUtil.getLogRecordInstance(i));
            writer.flush();
            writer.commit();
        }
        BKDistributedLogManager readDLM = this.createNewDLM(confLocal, name);
        BKAsyncLogReader reader = (BKAsyncLogReader)readDLM.getAsyncLogReader(DLSN.InitialDLSN);
        for (long i = 1L; i <= 5L; ++i) {
            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
            DLMTestUtil.verifyLogRecord(record);
            Assert.assertEquals((long)i, (long)record.getTransactionId());
            Assert.assertEquals((long)(record.getTransactionId() - 1L), (long)record.getSequenceId());
        }
        BKLogSegmentWriter perStreamWriter = writer.segmentWriter;
        BookKeeperClient bkc = DLMTestUtil.getBookKeeperClient(readDLM);
        LedgerHandle readLh = bkc.get().openLedgerNoRecovery(this.getLedgerHandle(perStreamWriter).getId(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
        this.checkAndWaitWriterReaderPosition(perStreamWriter, 9L, reader, 9L, readLh, 8L);
        writer.write(DLMTestUtil.getLogRecordInstance(6L));
        writer.flush();
        this.checkAndWaitWriterReaderPosition(perStreamWriter, 10L, reader, 10L, readLh, 9L);
        writer.write(DLMTestUtil.getLogRecordInstance(7L));
        writer.flush();
        this.checkAndWaitWriterReaderPosition(perStreamWriter, 11L, reader, 11L, readLh, 10L);
        while (null == (entryReader = (BKLogSegmentEntryReader)reader.getReadAheadReader().getCurrentSegmentReader().getEntryReader()) || null == entryReader.getOutstandingLongPoll()) {
            Thread.sleep(1000L);
        }
        logger.info("Waiting for long poll getting interrupted with metadata changed");
        BKLogWriteHandler writeHandler = writer.getCachedWriteHandler();
        writeHandler.completeAndCloseLogSegment(writeHandler.inprogressZNodeName(perStreamWriter.getLogSegmentId(), perStreamWriter.getStartTxId(), perStreamWriter.getLogSegmentSequenceNumber()), perStreamWriter.getLogSegmentSequenceNumber(), perStreamWriter.getLogSegmentId(), perStreamWriter.getStartTxId(), perStreamWriter.getLastTxId(), perStreamWriter.getPositionWithinLogSegment() - 1, 9L, 0L);
        BKSyncLogWriter anotherWriter = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned();
        anotherWriter.write(DLMTestUtil.getLogRecordInstance(8L));
        anotherWriter.flush();
        anotherWriter.commit();
        anotherWriter.closeAndComplete();
        for (long i = 6L; i <= 8L; ++i) {
            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
            DLMTestUtil.verifyLogRecord(record);
            Assert.assertEquals((long)i, (long)record.getTransactionId());
        }
        Utils.close(reader);
        readDLM.close();
    }
}

