/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog;

import java.io.ByteArrayInputStream;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.AppendOnlyStreamReader;
import org.apache.distributedlog.AppendOnlyStreamWriter;
import org.apache.distributedlog.BKDistributedLogManager;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestAppendOnlyStreamWriter
extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger(TestAppendOnlyStreamWriter.class);
    @Rule
    public TestName testNames = new TestName();

    @Test(timeout=60000L)
    public void testBasicReadAndWriteBehavior() throws Exception {
        String name = this.testNames.getMethodName();
        BKDistributedLogManager dlmwrite = this.createNewDLM(conf, name);
        BKDistributedLogManager dlmreader = this.createNewDLM(conf, name);
        byte[] byteStream = DLMTestUtil.repeatString("abc", 51).getBytes();
        long txid = 1L;
        AppendOnlyStreamWriter writer = dlmwrite.getAppendOnlyStreamWriter();
        writer.write(DLMTestUtil.repeatString("abc", 11).getBytes());
        writer.write(DLMTestUtil.repeatString("abc", 40).getBytes());
        writer.force(false);
        writer.close();
        AppendOnlyStreamReader reader = dlmreader.getAppendOnlyStreamReader();
        byte[] bytesIn = new byte[byteStream.length];
        int read = reader.read(bytesIn, 0, 23);
        Assert.assertEquals((long)23L, (long)read);
        read = reader.read(bytesIn, 23, 31);
        Assert.assertEquals((long)read, (long)31L);
        byte[] bytesInTemp = new byte[byteStream.length];
        read = reader.read(bytesInTemp, 0, byteStream.length);
        Assert.assertEquals((long)read, (long)(byteStream.length - 23 - 31));
        read = new ByteArrayInputStream(bytesInTemp).read(bytesIn, 54, byteStream.length - 23 - 31);
        Assert.assertEquals((long)read, (long)(byteStream.length - 23 - 31));
        Assert.assertArrayEquals((byte[])bytesIn, (byte[])byteStream);
        reader.close();
        dlmreader.close();
        dlmwrite.close();
    }

    @Test(timeout=60000L)
    public void testWriteFutureDoesNotCompleteUntilWritePersisted() throws Exception {
        String name = this.testNames.getMethodName();
        DistributedLogConfiguration conf = new DistributedLogConfiguration();
        conf.setPeriodicFlushFrequencyMilliSeconds(Integer.MAX_VALUE);
        conf.setImmediateFlushEnabled(false);
        BKDistributedLogManager dlmwriter = this.createNewDLM(conf, name);
        BKDistributedLogManager dlmreader = this.createNewDLM(conf, name);
        byte[] byteStream = DLMTestUtil.repeatString("abc", 51).getBytes();
        AppendOnlyStreamWriter writer = dlmwriter.getAppendOnlyStreamWriter();
        CompletableFuture<DLSN> dlsnFuture = writer.write(DLMTestUtil.repeatString("abc", 11).getBytes());
        Thread.sleep(1000L);
        Assert.assertFalse((boolean)dlsnFuture.isDone());
        writer.force(false);
        Utils.ioResult(dlsnFuture, 5L, TimeUnit.SECONDS);
        writer.close();
        dlmwriter.close();
        AppendOnlyStreamReader reader = dlmreader.getAppendOnlyStreamReader();
        byte[] bytesIn = new byte[byteStream.length];
        int read = reader.read(bytesIn, 0, 31);
        Assert.assertEquals((long)31L, (long)read);
        reader.close();
        dlmreader.close();
    }

    @Test(timeout=60000L)
    public void testPositionUpdatesOnlyAfterWriteCompletion() throws Exception {
        String name = this.testNames.getMethodName();
        DistributedLogConfiguration conf = new DistributedLogConfiguration();
        conf.setPeriodicFlushFrequencyMilliSeconds(10000);
        conf.setImmediateFlushEnabled(false);
        BKDistributedLogManager dlmwriter = this.createNewDLM(conf, name);
        BKDistributedLogManager dlmreader = this.createNewDLM(conf, name);
        byte[] byteStream = DLMTestUtil.repeatString("abc", 11).getBytes();
        AppendOnlyStreamWriter writer = dlmwriter.getAppendOnlyStreamWriter();
        CompletableFuture<DLSN> dlsnFuture = writer.write(byteStream);
        Thread.sleep(100L);
        Assert.assertFalse((boolean)dlsnFuture.isDone());
        Assert.assertEquals((long)0L, (long)writer.position());
        writer.force(false);
        Assert.assertEquals((long)byteStream.length, (long)writer.position());
        writer.close();
        dlmwriter.close();
        AppendOnlyStreamReader reader = dlmreader.getAppendOnlyStreamReader();
        byte[] bytesIn = new byte[byteStream.length];
        int read = reader.read(bytesIn, 0, byteStream.length);
        Assert.assertEquals((long)byteStream.length, (long)read);
        Assert.assertEquals((long)byteStream.length, (long)reader.position());
        reader.close();
        dlmreader.close();
    }

    @Test(timeout=60000L)
    public void testPositionDoesntUpdateBeforeWriteCompletion() throws Exception {
        String name = this.testNames.getMethodName();
        DistributedLogConfiguration conf = new DistributedLogConfiguration();
        conf.setPeriodicFlushFrequencyMilliSeconds(100000);
        conf.setImmediateFlushEnabled(false);
        conf.setOutputBufferSize(0x100000);
        BKDistributedLogManager dlmwriter = this.createNewDLM(conf, name);
        byte[] byteStream = DLMTestUtil.repeatString("abc", 11).getBytes();
        AppendOnlyStreamWriter writer = dlmwriter.getAppendOnlyStreamWriter();
        Assert.assertEquals((long)0L, (long)writer.position());
        Thread.sleep(500L);
        CompletableFuture<DLSN> dlsnFuture = writer.write(byteStream);
        Assert.assertEquals((long)0L, (long)writer.position());
        writer.close();
        dlmwriter.close();
    }

    @Test(timeout=60000L)
    public void testPositionUpdatesOnlyAfterWriteCompletionWithoutFsync() throws Exception {
        String name = this.testNames.getMethodName();
        DistributedLogConfiguration conf = new DistributedLogConfiguration();
        conf.setPeriodicFlushFrequencyMilliSeconds(1000);
        conf.setImmediateFlushEnabled(false);
        conf.setOutputBufferSize(0x100000);
        BKDistributedLogManager dlmwriter = this.createNewDLM(conf, name);
        byte[] byteStream = DLMTestUtil.repeatString("abc", 11).getBytes();
        AppendOnlyStreamWriter writer = dlmwriter.getAppendOnlyStreamWriter();
        Assert.assertEquals((long)0L, (long)writer.position());
        Utils.ioResult(writer.write(byteStream));
        Thread.sleep(100L);
        Assert.assertEquals((long)33L, (long)writer.position());
        writer.close();
        dlmwriter.close();
    }

    @Test(timeout=60000L)
    public void testWriterStartsAtTxidZeroForEmptyStream() throws Exception {
        String name = this.testNames.getMethodName();
        DistributedLogConfiguration conf = new DistributedLogConfiguration();
        conf.setImmediateFlushEnabled(true);
        conf.setOutputBufferSize(1024);
        BKDistributedLogManager dlm = this.createNewDLM(conf, name);
        URI uri = this.createDLMURI("/" + name);
        Utils.ioResult(dlm.getWriterMetadataStore().getLog(uri, name, true, true));
        AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter();
        byte[] byteStream = DLMTestUtil.repeatString("a", 1025).getBytes();
        Utils.ioResult(writer.write(byteStream));
        writer.close();
        dlm.close();
    }

    @Test(timeout=60000L)
    public void testOffsetGapAfterSegmentWriterFailure() throws Exception {
        String name = this.testNames.getMethodName();
        DistributedLogConfiguration conf = new DistributedLogConfiguration();
        conf.setImmediateFlushEnabled(false);
        conf.setPeriodicFlushFrequencyMilliSeconds(60000);
        conf.setOutputBufferSize(0x100000);
        conf.setLogSegmentSequenceNumberValidationEnabled(false);
        int writeLen = 5;
        int sectionWrites = 10;
        long read = this.writeRecordsAndReadThemBackAfterInjectingAFailedTransmit(conf, name, 5, 10);
        Assert.assertEquals((long)105L, (long)read);
    }

    @Test(timeout=60000L)
    public void testNoOffsetGapAfterSegmentWriterFailure() throws Exception {
        String name = this.testNames.getMethodName();
        DistributedLogConfiguration conf = new DistributedLogConfiguration();
        conf.setImmediateFlushEnabled(false);
        conf.setPeriodicFlushFrequencyMilliSeconds(60000);
        conf.setOutputBufferSize(0x100000);
        conf.setDisableRollingOnLogSegmentError(true);
        int writeLen = 5;
        int sectionWrites = 10;
        try {
            this.writeRecordsAndReadThemBackAfterInjectingAFailedTransmit(conf, name, 5, 10);
            Assert.fail((String)"should have thrown");
        }
        catch (BKTransmitException bKTransmitException) {
            // empty catch block
        }
        BKDistributedLogManager dlm = this.createNewDLM(conf, name);
        long length = dlm.getLastTxId();
        long read = this.read(dlm, length);
        Assert.assertEquals((long)length, (long)read);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    long writeRecordsAndReadThemBackAfterInjectingAFailedTransmit(DistributedLogConfiguration conf, String name, int writeLen, int sectionWrites) throws Exception {
        BKDistributedLogManager dlm = this.createNewDLM(conf, name);
        URI uri = this.createDLMURI("/" + name);
        Utils.ioResult(dlm.getWriterMetadataStore().getLog(uri, name, true, true));
        AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter();
        byte[] byteStream = DLMTestUtil.repeatString("A", writeLen).getBytes();
        for (int i = 0; i < sectionWrites; ++i) {
            writer.write(byteStream);
        }
        writer.force(false);
        long read = this.read(dlm, 1 * sectionWrites * writeLen);
        Assert.assertEquals((long)(1 * sectionWrites * writeLen), (long)read);
        for (int i = 0; i < sectionWrites; ++i) {
            writer.write(byteStream);
        }
        try {
            FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer, FailpointUtils.FailPointActions.FailPointAction_Throw);
            writer.force(false);
            Assert.fail((String)"should have thown \u2299\ufe4f\u2299");
        }
        catch (WriteException i) {
        }
        finally {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer);
        }
        writer.write(byteStream);
        for (int i = 0; i < sectionWrites; ++i) {
            writer.write(byteStream);
        }
        writer.force(false);
        writer.markEndOfStream();
        writer.close();
        long length = dlm.getLastTxId();
        Assert.assertEquals((long)(3 * sectionWrites * writeLen + 5), (long)length);
        read = this.read(dlm, length);
        dlm.close();
        return read;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    long read(DistributedLogManager dlm, long n) throws Exception {
        long offset;
        byte[] bytesIn = new byte[1];
        try (AppendOnlyStreamReader reader = dlm.getAppendOnlyStreamReader();){
            int read;
            for (offset = 0L; offset < n; offset += (long)read) {
                read = reader.read(bytesIn, 0, 1);
            }
        }
        return offset;
    }
}

