/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.BKAsyncLogWriter;
import org.apache.distributedlog.BKDistributedLogManager;
import org.apache.distributedlog.BKLogWriteHandler;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.NonBlockingReadsTestUtil;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.exceptions.IdleReaderException;
import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Ignore
public class TestNonBlockingReads
extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger(TestNonBlockingReads.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=100000L)
    public void testNonBlockingRead() throws Exception {
        String name = "distrlog-non-blocking-reader";
        final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(conf);
        confLocal.setReadAheadBatchSize(1);
        confLocal.setReadAheadMaxRecords(1);
        confLocal.setReaderIdleWarnThresholdMillis(100);
        confLocal.setReadLACLongPollTimeout(49);
        final BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
        ScheduledFuture<?> writerClosedFuture = null;
        try {
            final Thread currentThread = Thread.currentThread();
            writerClosedFuture = executor.schedule(new Runnable(){

                @Override
                public void run() {
                    try {
                        NonBlockingReadsTestUtil.writeRecordsForNonBlockingReads(confLocal, dlm, false);
                    }
                    catch (Exception exc) {
                        currentThread.interrupt();
                    }
                }
            }, 100L, TimeUnit.MILLISECONDS);
            NonBlockingReadsTestUtil.readNonBlocking(dlm, false);
            Assert.assertFalse((boolean)currentThread.isInterrupted());
        }
        finally {
            if (writerClosedFuture != null) {
                writerClosedFuture.get();
            }
            executor.shutdown();
            dlm.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=100000L)
    public void testNonBlockingReadRecovery() throws Exception {
        String name = "distrlog-non-blocking-reader-recovery";
        final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(conf);
        confLocal.setReadAheadBatchSize(10);
        confLocal.setReadAheadMaxRecords(10);
        final BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
        ScheduledFuture<?> writerClosedFuture = null;
        try {
            final Thread currentThread = Thread.currentThread();
            writerClosedFuture = executor.schedule(new Runnable(){

                @Override
                public void run() {
                    try {
                        NonBlockingReadsTestUtil.writeRecordsForNonBlockingReads(confLocal, dlm, true);
                    }
                    catch (Exception exc) {
                        currentThread.interrupt();
                    }
                }
            }, 100L, TimeUnit.MILLISECONDS);
            NonBlockingReadsTestUtil.readNonBlocking(dlm, false);
            Assert.assertFalse((boolean)currentThread.isInterrupted());
        }
        finally {
            if (writerClosedFuture != null) {
                writerClosedFuture.get();
            }
            executor.shutdown();
            dlm.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=100000L)
    public void testNonBlockingReadIdleError() throws Exception {
        String name = "distrlog-non-blocking-reader-error";
        final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(conf);
        confLocal.setReadAheadBatchSize(1);
        confLocal.setReadAheadMaxRecords(1);
        confLocal.setReadLACLongPollTimeout(24);
        confLocal.setReaderIdleWarnThresholdMillis(50);
        confLocal.setReaderIdleErrorThresholdMillis(100);
        final BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
        ScheduledFuture<?> writerClosedFuture = null;
        try {
            final Thread currentThread = Thread.currentThread();
            writerClosedFuture = executor.schedule(new Runnable(){

                @Override
                public void run() {
                    try {
                        NonBlockingReadsTestUtil.writeRecordsForNonBlockingReads(confLocal, dlm, false);
                    }
                    catch (Exception exc) {
                        currentThread.interrupt();
                    }
                }
            }, 100L, TimeUnit.MILLISECONDS);
            boolean exceptionEncountered = false;
            try {
                NonBlockingReadsTestUtil.readNonBlocking(dlm, false, 1000L, true);
            }
            catch (IdleReaderException exc) {
                exceptionEncountered = true;
            }
            Assert.assertTrue((boolean)exceptionEncountered);
            Assert.assertFalse((boolean)currentThread.isInterrupted());
        }
        finally {
            if (writerClosedFuture != null) {
                writerClosedFuture.get();
            }
            executor.shutdown();
            dlm.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testNonBlockingReadAheadStall() throws Exception {
        String name = "distrlog-non-blocking-reader-stall";
        final DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(conf);
        confLocal.setReadAheadBatchSize(1);
        confLocal.setReadAheadMaxRecords(3);
        confLocal.setReadLACLongPollTimeout(249);
        confLocal.setReaderIdleWarnThresholdMillis(500);
        confLocal.setReaderIdleErrorThresholdMillis(30000);
        final BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
        ScheduledFuture<?> writerClosedFuture = null;
        try {
            final Thread currentThread = Thread.currentThread();
            writerClosedFuture = executor.schedule(new Runnable(){

                @Override
                public void run() {
                    try {
                        NonBlockingReadsTestUtil.writeRecordsForNonBlockingReads(confLocal, dlm, false, 3L);
                    }
                    catch (Exception exc) {
                        currentThread.interrupt();
                    }
                }
            }, 10L, TimeUnit.MILLISECONDS);
            boolean exceptionEncountered = false;
            try {
                NonBlockingReadsTestUtil.readNonBlocking(dlm, false, 3L, false);
            }
            catch (IdleReaderException exc) {
                LOG.info("Exception encountered", (Throwable)exc);
                exceptionEncountered = true;
            }
            Assert.assertFalse((boolean)exceptionEncountered);
            Assert.assertFalse((boolean)currentThread.isInterrupted());
        }
        finally {
            if (writerClosedFuture != null) {
                writerClosedFuture.get();
            }
            executor.shutdown();
            dlm.close();
        }
    }

    private long createStreamWithInconsistentMetadata(String name) throws Exception {
        BKDistributedLogManager dlm = this.createNewDLM(conf, name);
        ZooKeeperClient zkClient = TestZooKeeperClientBuilder.newBuilder().uri(this.createDLMURI("/")).build();
        long txid = 1L;
        long numRecordsWritten = 0L;
        int segmentSize = 10;
        for (long i = 0L; i < 3L; ++i) {
            BKAsyncLogWriter out = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
            for (long j = 1L; j <= (long)segmentSize; ++j) {
                LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
                Utils.ioResult(out.write(op));
                ++numRecordsWritten;
            }
            out.closeAndComplete();
        }
        BKLogWriteHandler blplm = dlm.createWriteHandler(true);
        String completedZNode = blplm.completedLedgerZNode(txid - (long)segmentSize, txid - 1L, 3L);
        LogSegmentMetadata metadata = Utils.ioResult(LogSegmentMetadata.read(zkClient, completedZNode));
        zkClient.get().delete(completedZNode, -1);
        LogSegmentMetadata metadataToChange = metadata.mutator().setLastEntryId(metadata.getLastEntryId() + 100L).setLastTxId(metadata.getLastTxId() + 100L).build();
        metadataToChange.write(zkClient);
        txid += 100L;
        for (long i = 0L; i < 3L; ++i) {
            BKAsyncLogWriter out = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
            for (long j = 1L; j <= (long)segmentSize; ++j) {
                LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
                Utils.ioResult(out.write(op));
                ++numRecordsWritten;
            }
            out.closeAndComplete();
        }
        dlm.close();
        return numRecordsWritten;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testHandleInconsistentMetadata() throws Exception {
        String name = "distrlog-inconsistent-metadata-blocking-read";
        long numRecordsWritten = this.createStreamWithInconsistentMetadata(name);
        try (BKDistributedLogManager dlm = this.createNewDLM(conf, name);){
            LogReader reader = dlm.getInputStream(45L);
            long numRecordsRead = 0L;
            LogRecordWithDLSN record = reader.readNext(false);
            long lastTxId = -1L;
            while (numRecordsRead < numRecordsWritten / 2L) {
                if (null != record) {
                    DLMTestUtil.verifyLogRecord(record);
                    Assert.assertTrue((lastTxId < record.getTransactionId() ? 1 : 0) != 0);
                    lastTxId = record.getTransactionId();
                    ++numRecordsRead;
                } else {
                    Thread.sleep(1L);
                }
                record = reader.readNext(false);
            }
            reader.close();
            Assert.assertEquals((long)(numRecordsWritten / 2L), (long)numRecordsRead);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=15000L)
    public void testHandleInconsistentMetadataNonBlocking() throws Exception {
        String name = "distrlog-inconsistent-metadata-nonblocking-read";
        long numRecordsWritten = this.createStreamWithInconsistentMetadata(name);
        try (BKDistributedLogManager dlm = this.createNewDLM(conf, name);){
            LogReader reader = dlm.getInputStream(45L);
            long numRecordsRead = 0L;
            long lastTxId = -1L;
            while (numRecordsRead < numRecordsWritten / 2L) {
                LogRecordWithDLSN record = reader.readNext(false);
                if (record != null) {
                    DLMTestUtil.verifyLogRecord(record);
                    Assert.assertTrue((lastTxId < record.getTransactionId() ? 1 : 0) != 0);
                    lastTxId = record.getTransactionId();
                    ++numRecordsRead;
                    continue;
                }
                Thread.sleep(1L);
            }
            reader.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=15000L)
    public void testHandleInconsistentMetadataDLSNNonBlocking() throws Exception {
        String name = "distrlog-inconsistent-metadata-nonblocking-read-dlsn";
        long numRecordsWritten = this.createStreamWithInconsistentMetadata(name);
        try (BKDistributedLogManager dlm = this.createNewDLM(conf, name);){
            LogReader reader = dlm.getInputStream(DLSN.InitialDLSN);
            long numRecordsRead = 0L;
            long lastTxId = -1L;
            while (numRecordsRead < numRecordsWritten) {
                LogRecordWithDLSN record = reader.readNext(false);
                if (record != null) {
                    DLMTestUtil.verifyLogRecord(record);
                    Assert.assertTrue((lastTxId < record.getTransactionId() ? 1 : 0) != 0);
                    lastTxId = record.getTransactionId();
                    ++numRecordsRead;
                    continue;
                }
                Thread.sleep(1L);
            }
            reader.close();
        }
    }

    static {
        conf.setOutputBufferSize(0);
        conf.setImmediateFlushEnabled(true);
    }
}

