/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog;

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiConsumer;
import org.apache.distributedlog.BKAsyncLogWriter;
import org.apache.distributedlog.BKDistributedLogManager;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.util.Utils;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestSequenceID
extends TestDistributedLogBase {
    private static final Logger logger = LoggerFactory.getLogger(TestSequenceID.class);

    @Test(timeout=60000L)
    public void testCompleteV4LogSegmentAsV4() throws Exception {
        this.completeSingleInprogressSegment(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value, LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value);
    }

    @Test(timeout=60000L)
    public void testCompleteV4LogSegmentAsV5() throws Exception {
        this.completeSingleInprogressSegment(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value, LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value);
    }

    @Test(timeout=60000L)
    public void testCompleteV5LogSegmentAsV4() throws Exception {
        this.completeSingleInprogressSegment(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value, LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value);
    }

    @Test(timeout=60000L)
    public void testCompleteV5LogSegmentAsV5() throws Exception {
        this.completeSingleInprogressSegment(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value, LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value);
    }

    private void completeSingleInprogressSegment(int writeVersion, int completeVersion) throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(conf);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setOutputBufferSize(0);
        confLocal.setDLLedgerMetadataLayoutVersion(writeVersion);
        String name = "distrlog-complete-single-inprogress-segment-versions-write-" + writeVersion + "-complete-" + completeVersion;
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(0L)));
        dlm.close();
        DistributedLogConfiguration confLocal2 = new DistributedLogConfiguration();
        confLocal2.addConfiguration(confLocal);
        confLocal2.setDLLedgerMetadataLayoutVersion(completeVersion);
        BKDistributedLogManager dlm2 = this.createNewDLM(confLocal2, name);
        dlm2.startAsyncLogSegmentNonPartitioned();
        List<LogSegmentMetadata> segments = dlm2.getLogSegments();
        Assert.assertEquals((long)1L, (long)segments.size());
        if (LogSegmentMetadata.supportsSequenceId(writeVersion)) {
            if (LogSegmentMetadata.supportsSequenceId(completeVersion)) {
                Assert.assertEquals((long)0L, (long)segments.get(0).getStartSequenceId());
            } else {
                Assert.assertTrue((segments.get(0).getStartSequenceId() < 0L ? 1 : 0) != 0);
            }
        } else {
            Assert.assertTrue((segments.get(0).getStartSequenceId() < 0L ? 1 : 0) != 0);
        }
        dlm2.close();
    }

    @Test(timeout=60000L)
    public void testSequenceID() throws Exception {
        DistributedLogConfiguration confLocalv4 = new DistributedLogConfiguration();
        confLocalv4.addConfiguration(conf);
        confLocalv4.setImmediateFlushEnabled(true);
        confLocalv4.setOutputBufferSize(0);
        confLocalv4.setDLLedgerMetadataLayoutVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value);
        String name = "distrlog-sequence-id";
        BKDistributedLogManager readDLM = this.createNewDLM(conf, name);
        AsyncLogReader reader = null;
        final LinkedBlockingQueue readRecords = new LinkedBlockingQueue();
        BKDistributedLogManager dlm = this.createNewDLM(confLocalv4, name);
        long txId = 0L;
        for (int i = 0; i < 3; ++i) {
            BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
            for (int j = 0; j < 2; ++j) {
                Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
                if (null != reader) continue;
                final AsyncLogReader r = reader = readDLM.getAsyncLogReader(DLSN.InitialDLSN);
                reader.readNext().whenComplete(new FutureEventListener<LogRecordWithDLSN>(){

                    @Override
                    public void onSuccess(LogRecordWithDLSN record) {
                        readRecords.add(record);
                        r.readNext().whenComplete((BiConsumer)this);
                    }

                    @Override
                    public void onFailure(Throwable cause) {
                        logger.error("Encountered exception on reading next : ", cause);
                    }
                });
            }
            writer.closeAndComplete();
        }
        BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        Assert.assertEquals((long)4L, (long)segments.size());
        for (int i = 0; i < 3; ++i) {
            Assert.assertFalse((boolean)segments.get(i).isInProgress());
            Assert.assertTrue((segments.get(i).getStartSequenceId() < 0L ? 1 : 0) != 0);
        }
        Assert.assertTrue((boolean)segments.get(3).isInProgress());
        Assert.assertTrue((segments.get(3).getStartSequenceId() < 0L ? 1 : 0) != 0);
        dlm.close();
        DistributedLogConfiguration confLocalv5 = new DistributedLogConfiguration();
        confLocalv5.addConfiguration(conf);
        confLocalv5.setImmediateFlushEnabled(true);
        confLocalv5.setOutputBufferSize(0);
        confLocalv5.setDLLedgerMetadataLayoutVersion(LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V5_SEQUENCE_ID.value);
        BKDistributedLogManager dlmv5 = this.createNewDLM(confLocalv5, name);
        for (int i = 0; i < 3; ++i) {
            BKAsyncLogWriter writerv5 = dlmv5.startAsyncLogSegmentNonPartitioned();
            for (int j = 0; j < 2; ++j) {
                Utils.ioResult(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++)));
            }
            writerv5.closeAndComplete();
        }
        BKAsyncLogWriter writerv5 = dlmv5.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++)));
        List<LogSegmentMetadata> segmentsv5 = dlmv5.getLogSegments();
        Assert.assertEquals((long)8L, (long)segmentsv5.size());
        Assert.assertFalse((boolean)segmentsv5.get(3).isInProgress());
        Assert.assertTrue((segmentsv5.get(3).getStartSequenceId() < 0L ? 1 : 0) != 0);
        long startSequenceId = 0L;
        for (int i = 4; i < 7; ++i) {
            Assert.assertFalse((boolean)segmentsv5.get(i).isInProgress());
            Assert.assertEquals((long)startSequenceId, (long)segmentsv5.get(i).getStartSequenceId());
            startSequenceId += 2L;
        }
        Assert.assertTrue((boolean)segmentsv5.get(7).isInProgress());
        Assert.assertEquals((long)startSequenceId, (long)segmentsv5.get(7).getStartSequenceId());
        dlmv5.close();
        BKDistributedLogManager dlmv4 = this.createNewDLM(confLocalv4, name);
        for (int i = 0; i < 3; ++i) {
            BKAsyncLogWriter writerv4 = dlmv4.startAsyncLogSegmentNonPartitioned();
            for (int j = 0; j < 2; ++j) {
                Utils.ioResult(writerv4.write(DLMTestUtil.getLogRecordInstance(txId++)));
            }
            writerv4.closeAndComplete();
        }
        List<LogSegmentMetadata> segmentsv4 = dlmv4.getLogSegments();
        Assert.assertEquals((long)11L, (long)segmentsv4.size());
        for (int i = 7; i < 11; ++i) {
            Assert.assertFalse((boolean)segmentsv4.get(i).isInProgress());
            Assert.assertTrue((segmentsv4.get(i).getStartSequenceId() < 0L ? 1 : 0) != 0);
        }
        dlmv4.close();
        while ((long)readRecords.size() < txId) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((long)txId, (long)readRecords.size());
        long sequenceId = Long.MIN_VALUE;
        for (LogRecordWithDLSN record : readRecords) {
            if (record.getDlsn().getLogSegmentSequenceNo() <= 4L) {
                Assert.assertTrue((record.getSequenceId() < 0L ? 1 : 0) != 0);
                Assert.assertTrue((record.getSequenceId() > sequenceId ? 1 : 0) != 0);
                sequenceId = record.getSequenceId();
                continue;
            }
            if (record.getDlsn().getLogSegmentSequenceNo() <= 7L) {
                if (sequenceId < 0L) {
                    sequenceId = 0L;
                }
                Assert.assertEquals((long)sequenceId, (long)record.getSequenceId());
                ++sequenceId;
                continue;
            }
            if (record.getDlsn().getLogSegmentSequenceNo() < 9L) continue;
            if (sequenceId > 0L) {
                sequenceId = Long.MIN_VALUE;
            }
            Assert.assertTrue((record.getSequenceId() < 0L ? 1 : 0) != 0);
            Assert.assertTrue((record.getSequenceId() > sequenceId ? 1 : 0) != 0);
            sequenceId = record.getSequenceId();
        }
        readDLM.close();
    }
}

