/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog;

import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.distributedlog.BKLogSegmentWriter;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.common.util.PermitLimiter;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.WriteCancelledException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.lock.ZKDistributedLock;
import org.apache.distributedlog.lock.ZKSessionLockFactory;
import org.apache.distributedlog.util.ConfUtils;
import org.apache.distributedlog.util.Utils;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.shade.org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooDefs;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

public class TestBKLogSegmentWriter
extends TestDistributedLogBase {
    @Rule
    public TestName runtime = new TestName();
    private OrderedScheduler scheduler;
    private OrderedScheduler lockStateExecutor;
    private ZooKeeperClient zkc;
    private ZooKeeperClient zkc0;
    private BookKeeperClient bkc;

    @Override
    @Before
    public void setup() throws Exception {
        super.setup();
        this.scheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(1).build();
        this.lockStateExecutor = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(1).build();
        URI uri = this.createDLMURI("");
        this.zkc = TestZooKeeperClientBuilder.newBuilder(conf).name("test-zkc").uri(uri).build();
        this.zkc0 = TestZooKeeperClientBuilder.newBuilder(conf).name("test-zkc0").uri(uri).build();
        BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(this.zkc, uri);
        this.bkc = BookKeeperClientBuilder.newBuilder().dlConfig(conf).name("test-bkc").ledgersPath(bkdlConfig.getBkLedgersPath()).zkServers(BKNamespaceDriver.getZKServersFromDLUri(uri)).build();
    }

    @Override
    @After
    public void teardown() throws Exception {
        if (null != this.bkc) {
            this.bkc.close();
        }
        if (null != this.zkc) {
            this.zkc.close();
        }
        if (null != this.lockStateExecutor) {
            this.lockStateExecutor.shutdown();
        }
        if (null != this.scheduler) {
            this.scheduler.shutdown();
        }
        super.teardown();
    }

    private DistributedLogConfiguration newLocalConf() {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(conf);
        return confLocal;
    }

    private ZKDistributedLock createLock(String path, ZooKeeperClient zkClient, boolean acquireLock) throws Exception {
        try {
            Utils.ioResult(Utils.zkAsyncCreateFullPathOptimistic(zkClient, path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
        }
        catch (ZKException zKException) {
            // empty catch block
        }
        ZKSessionLockFactory lockFactory = new ZKSessionLockFactory(zkClient, "test-lock", this.lockStateExecutor, 0, Long.MAX_VALUE, conf.getZKSessionTimeoutMilliseconds(), NullStatsLogger.INSTANCE);
        ZKDistributedLock lock = new ZKDistributedLock(this.lockStateExecutor, lockFactory, path, Long.MAX_VALUE, NullStatsLogger.INSTANCE);
        if (acquireLock) {
            return Utils.ioResult(lock.asyncAcquire());
        }
        return lock;
    }

    private void closeWriterAndLock(BKLogSegmentWriter writer, ZKDistributedLock lock) throws Exception {
        try {
            Utils.ioResult(writer.asyncClose());
        }
        finally {
            Utils.closeQuietly(lock);
        }
    }

    private void abortWriterAndLock(BKLogSegmentWriter writer, ZKDistributedLock lock) throws IOException {
        try {
            Utils.abort(writer, false);
        }
        finally {
            Utils.closeQuietly(lock);
        }
    }

    private BKLogSegmentWriter createLogSegmentWriter(DistributedLogConfiguration conf, long logSegmentSequenceNumber, long startTxId, ZKDistributedLock lock) throws Exception {
        LedgerHandle lh = this.bkc.get().createLedger(3, 2, 2, BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
        return new BKLogSegmentWriter(this.runtime.getMethodName(), this.runtime.getMethodName(), conf, LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION, new BKLogSegmentEntryWriter(lh), lock, startTxId, logSegmentSequenceNumber, this.scheduler, NullStatsLogger.INSTANCE, NullStatsLogger.INSTANCE, new AlertStatsLogger(NullStatsLogger.INSTANCE, "test"), PermitLimiter.NULL_PERMIT_LIMITER, new SettableFeatureProvider("", 0), ConfUtils.getConstDynConf(conf));
    }

    private LedgerHandle openLedgerNoRecovery(LedgerHandle lh) throws Exception {
        return this.bkc.get().openLedgerNoRecovery(lh.getId(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
    }

    private LedgerHandle openLedger(LedgerHandle lh) throws Exception {
        return this.bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
    }

    private void fenceLedger(LedgerHandle lh) throws Exception {
        this.bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
    }

    @Test(timeout=60000L)
    public void testCloseShouldFlush() throws Exception {
        DistributedLogConfiguration confLocal = this.newLocalConf();
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setOutputBufferSize(Integer.MAX_VALUE);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        ZKDistributedLock lock = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter writer = this.createLogSegmentWriter(confLocal, 0L, -1L, lock);
        ZKDistributedLock lock0 = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc0, false);
        CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
        int numRecords = 10;
        ArrayList futureList = new ArrayList(numRecords);
        for (int i = 0; i < numRecords; ++i) {
            futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
        }
        Assert.assertEquals((String)("Last tx id should be " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)"Last acked tx id should be -1", (long)-1L, (long)writer.getLastTxIdAcknowledged());
        Assert.assertEquals((String)("Last DLSN should be " + DLSN.InvalidDLSN), (Object)DLSN.InvalidDLSN, (Object)writer.getLastDLSN());
        Assert.assertEquals((String)("Position should be " + numRecords), (long)10L, (long)writer.getPositionWithinLogSegment());
        this.closeWriterAndLock(writer, lock);
        Utils.ioResult(lockFuture0);
        lock0.checkOwnership();
        Assert.assertEquals((String)("Last tx id should still be " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)("Last acked tx id should become " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxIdAcknowledged());
        Assert.assertEquals((String)("Position should still be " + numRecords), (long)10L, (long)writer.getPositionWithinLogSegment());
        List dlsns = Utils.ioResult(FutureUtils.collect(futureList));
        Assert.assertEquals((String)"All records should be written", (long)numRecords, (long)dlsns.size());
        for (int i = 0; i < numRecords; ++i) {
            DLSN dlsn = (DLSN)dlsns.get(i);
            Assert.assertEquals((String)"Incorrent ledger sequence number", (long)0L, (long)dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals((String)"Incorrent entry id", (long)0L, (long)dlsn.getEntryId());
            Assert.assertEquals((String)"Inconsistent slot id", (long)i, (long)dlsn.getSlotId());
        }
        Assert.assertEquals((String)("Last DLSN should be " + dlsns.get(dlsns.size() - 1)), dlsns.get(dlsns.size() - 1), (Object)writer.getLastDLSN());
        LedgerHandle lh = this.getLedgerHandle(writer);
        LedgerHandle readLh = this.openLedgerNoRecovery(lh);
        Assert.assertTrue((String)("Ledger " + lh.getId() + " should be closed"), (boolean)readLh.isClosed());
        Assert.assertEquals((String)("There should be two entries in ledger " + lh.getId()), (long)1L, (long)readLh.getLastAddConfirmed());
    }

    @Test(timeout=60000L)
    public void testAbortShouldNotFlush() throws Exception {
        int i;
        DistributedLogConfiguration confLocal = this.newLocalConf();
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setOutputBufferSize(Integer.MAX_VALUE);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        ZKDistributedLock lock = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter writer = this.createLogSegmentWriter(confLocal, 0L, -1L, lock);
        ZKDistributedLock lock0 = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc0, false);
        CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
        int numRecords = 10;
        ArrayList<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
        for (i = 0; i < numRecords; ++i) {
            futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
        }
        Assert.assertEquals((String)("Last tx id should be " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)"Last acked tx id should be -1", (long)-1L, (long)writer.getLastTxIdAcknowledged());
        Assert.assertEquals((String)("Last DLSN should be " + DLSN.InvalidDLSN), (Object)DLSN.InvalidDLSN, (Object)writer.getLastDLSN());
        Assert.assertEquals((String)("Position should be " + numRecords), (long)10L, (long)writer.getPositionWithinLogSegment());
        this.abortWriterAndLock(writer, lock);
        Utils.ioResult(lockFuture0);
        lock0.checkOwnership();
        Assert.assertEquals((String)("Last tx id should still be " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)("Last acked tx id should still be " + (numRecords - 1)), (long)-1L, (long)writer.getLastTxIdAcknowledged());
        Assert.assertEquals((String)("Last DLSN should still be " + DLSN.InvalidDLSN), (Object)DLSN.InvalidDLSN, (Object)writer.getLastDLSN());
        Assert.assertEquals((String)("Position should still be " + numRecords), (long)10L, (long)writer.getPositionWithinLogSegment());
        for (i = 0; i < numRecords; ++i) {
            try {
                Utils.ioResult((CompletableFuture)futureList.get(i));
                Assert.fail((String)("Should be aborted record " + i + " with transmit exception"));
                continue;
            }
            catch (WriteCancelledException wce) {
                Assert.assertTrue((String)("Record " + i + " should be aborted because of ledger fenced"), (boolean)(wce.getCause() instanceof BKTransmitException));
                BKTransmitException bkte = (BKTransmitException)wce.getCause();
                Assert.assertEquals((String)("Record " + i + " should be aborted"), (long)-15L, (long)bkte.getBKResultCode());
            }
        }
        LedgerHandle lh = this.getLedgerHandle(writer);
        LedgerHandle readLh = this.openLedgerNoRecovery(lh);
        Assert.assertTrue((String)("Ledger " + lh.getId() + " should not be closed"), (boolean)readLh.isClosed());
        Assert.assertEquals((String)("There should be no entries in ledger " + lh.getId()), (long)-1L, (long)readLh.getLastAddConfirmed());
    }

    @Test(timeout=60000L)
    public void testCloseShouldNotFlushIfLedgerFenced() throws Exception {
        this.testCloseShouldNotFlushIfInErrorState(-101);
    }

    void testCloseShouldNotFlushIfInErrorState(int rcToFailComplete) throws Exception {
        DistributedLogConfiguration confLocal = this.newLocalConf();
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setOutputBufferSize(Integer.MAX_VALUE);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        ZKDistributedLock lock = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter writer = this.createLogSegmentWriter(confLocal, 0L, -1L, lock);
        ZKDistributedLock lock0 = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc0, false);
        CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
        int numRecords = 10;
        ArrayList<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
        for (int i = 0; i < numRecords; ++i) {
            futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
        }
        Assert.assertEquals((String)("Last tx id should be " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)"Last acked tx id should be -1", (long)-1L, (long)writer.getLastTxIdAcknowledged());
        Assert.assertEquals((String)("Last DLSN should be " + DLSN.InvalidDLSN), (Object)DLSN.InvalidDLSN, (Object)writer.getLastDLSN());
        Assert.assertEquals((String)("Position should be " + numRecords), (long)10L, (long)writer.getPositionWithinLogSegment());
        writer.setTransmitResult(rcToFailComplete);
        try {
            this.closeWriterAndLock(writer, lock);
            Assert.fail((String)"Close a log segment writer in error state should throw exception");
        }
        catch (BKTransmitException bkte) {
            Assert.assertEquals((String)"Inconsistent rc is thrown", (long)rcToFailComplete, (long)bkte.getBKResultCode());
        }
        Utils.ioResult(lockFuture0);
        lock0.checkOwnership();
        Assert.assertEquals((String)("Last tx id should still be " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)("Last acked tx id should still be " + (numRecords - 1)), (long)-1L, (long)writer.getLastTxIdAcknowledged());
        Assert.assertEquals((String)("Last DLSN should still be " + DLSN.InvalidDLSN), (Object)DLSN.InvalidDLSN, (Object)writer.getLastDLSN());
        Assert.assertEquals((String)("Position should still be " + numRecords), (long)10L, (long)writer.getPositionWithinLogSegment());
        for (int i = 0; i < numRecords; ++i) {
            try {
                Utils.ioResult((CompletableFuture)futureList.get(i));
                Assert.fail((String)("Should be aborted record " + i + " with transmit exception"));
                continue;
            }
            catch (WriteCancelledException wce) {
                Assert.assertTrue((String)("Record " + i + " should be aborted because of ledger fenced"), (boolean)(wce.getCause() instanceof BKTransmitException));
                BKTransmitException bkte = (BKTransmitException)wce.getCause();
                Assert.assertEquals((String)("Record " + i + " should be aborted"), (long)rcToFailComplete, (long)bkte.getBKResultCode());
            }
        }
        LedgerHandle lh = this.getLedgerHandle(writer);
        LedgerHandle readLh = this.openLedgerNoRecovery(lh);
        Assert.assertFalse((String)("Ledger " + lh.getId() + " should not be closed"), (boolean)readLh.isClosed());
        Assert.assertEquals((String)("There should be no entries in ledger " + lh.getId()), (long)-1L, (long)readLh.getLastAddConfirmed());
    }

    @Test(timeout=60000L)
    public void testCloseShouldFailIfLedgerFenced() throws Exception {
        DistributedLogConfiguration confLocal = this.newLocalConf();
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setOutputBufferSize(Integer.MAX_VALUE);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        ZKDistributedLock lock = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter writer = this.createLogSegmentWriter(confLocal, 0L, -1L, lock);
        ZKDistributedLock lock0 = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc0, false);
        CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
        int numRecords = 10;
        ArrayList<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
        for (int i = 0; i < numRecords; ++i) {
            futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
        }
        Assert.assertEquals((String)("Last tx id should be " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)"Last acked tx id should be -1", (long)-1L, (long)writer.getLastTxIdAcknowledged());
        Assert.assertEquals((String)("Last DLSN should be " + DLSN.InvalidDLSN), (Object)DLSN.InvalidDLSN, (Object)writer.getLastDLSN());
        Assert.assertEquals((String)("Position should be " + numRecords), (long)10L, (long)writer.getPositionWithinLogSegment());
        this.fenceLedger(this.getLedgerHandle(writer));
        try {
            this.closeWriterAndLock(writer, lock);
            Assert.fail((String)"Close a log segment writer when ledger is fenced should throw exception");
        }
        catch (BKTransmitException bkte) {
            Assert.assertEquals((String)"Inconsistent rc is thrown", (long)-101L, (long)bkte.getBKResultCode());
        }
        Utils.ioResult(lockFuture0);
        lock0.checkOwnership();
        Assert.assertEquals((String)("Last tx id should still be " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)("Last acked tx id should still be " + (numRecords - 1)), (long)-1L, (long)writer.getLastTxIdAcknowledged());
        Assert.assertEquals((String)("Last DLSN should still be " + DLSN.InvalidDLSN), (Object)DLSN.InvalidDLSN, (Object)writer.getLastDLSN());
        Assert.assertEquals((String)("Position should still be " + numRecords), (long)10L, (long)writer.getPositionWithinLogSegment());
        for (int i = 0; i < numRecords; ++i) {
            try {
                Utils.ioResult((CompletableFuture)futureList.get(i));
                Assert.fail((String)("Should be aborted record " + i + " with transmit exception"));
                continue;
            }
            catch (BKTransmitException bkte) {
                Assert.assertEquals((String)("Record " + i + " should be aborted"), (long)-101L, (long)bkte.getBKResultCode());
            }
        }
        LedgerHandle lh = this.getLedgerHandle(writer);
        LedgerHandle readLh = this.openLedgerNoRecovery(lh);
        Assert.assertTrue((String)("Ledger " + lh.getId() + " should be closed"), (boolean)readLh.isClosed());
        Assert.assertEquals((String)("There should be no entries in ledger " + lh.getId()), (long)-1L, (long)readLh.getLastAddConfirmed());
    }

    @Test(timeout=60000L)
    public void testAbortShouldFailAllWrites() throws Exception {
        int i;
        DistributedLogConfiguration confLocal = this.newLocalConf();
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setOutputBufferSize(Integer.MAX_VALUE);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        ZKDistributedLock lock = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter writer = this.createLogSegmentWriter(confLocal, 0L, -1L, lock);
        ZKDistributedLock lock0 = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc0, false);
        CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
        int numRecords = 10;
        ArrayList futureList = new ArrayList(numRecords);
        for (int i2 = 0; i2 < numRecords; ++i2) {
            futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i2)));
        }
        Assert.assertEquals((String)("Last tx id should be " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)"Last acked tx id should be -1", (long)-1L, (long)writer.getLastTxIdAcknowledged());
        Assert.assertEquals((String)("Last DLSN should be " + DLSN.InvalidDLSN), (Object)DLSN.InvalidDLSN, (Object)writer.getLastDLSN());
        Assert.assertEquals((String)("Position should be " + numRecords), (long)numRecords, (long)writer.getPositionWithinLogSegment());
        CountDownLatch deferLatch = new CountDownLatch(1);
        writer.getFuturePool().submit(() -> {
            try {
                deferLatch.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.warn("Interrupted on deferring completion : ", (Throwable)e);
            }
        });
        Utils.ioResult(writer.flush());
        ArrayList<CompletableFuture<DLSN>> anotherFutureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
        for (int i3 = numRecords; i3 < 2 * numRecords; ++i3) {
            anotherFutureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i3)));
        }
        Assert.assertEquals((String)("Last tx id should become " + (2 * numRecords - 1)), (long)(2 * numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)("Last acked tx id should become " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxIdAcknowledged());
        Assert.assertEquals((String)("Last DLSN should still be " + DLSN.InvalidDLSN), (Object)DLSN.InvalidDLSN, (Object)writer.getLastDLSN());
        Assert.assertEquals((String)("Position should become " + 2 * numRecords), (long)(2 * numRecords), (long)writer.getPositionWithinLogSegment());
        this.abortWriterAndLock(writer, lock);
        Utils.ioResult(lockFuture0);
        lock0.checkOwnership();
        deferLatch.countDown();
        List dlsns = Utils.ioResult(FutureUtils.collect(futureList));
        Assert.assertEquals((String)"All first 10 records should be written", (long)numRecords, (long)dlsns.size());
        for (i = 0; i < numRecords; ++i) {
            DLSN dlsn = (DLSN)dlsns.get(i);
            Assert.assertEquals((String)"Incorrent ledger sequence number", (long)0L, (long)dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals((String)"Incorrent entry id", (long)0L, (long)dlsn.getEntryId());
            Assert.assertEquals((String)"Inconsistent slot id", (long)i, (long)dlsn.getSlotId());
        }
        for (i = 0; i < numRecords; ++i) {
            try {
                Utils.ioResult((CompletableFuture)anotherFutureList.get(i));
                Assert.fail((String)("Should be aborted record " + (numRecords + i) + " with transmit exception"));
                continue;
            }
            catch (WriteCancelledException dlsn) {
                // empty catch block
            }
        }
        Assert.assertEquals((String)("Last tx id should still be " + (2 * numRecords - 1)), (long)(2 * numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)("Last acked tx id should be still " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxIdAcknowledged());
        Assert.assertEquals((String)("Last DLSN should become " + futureList.get(futureList.size() - 1)), dlsns.get(futureList.size() - 1), (Object)writer.getLastDLSN());
        Assert.assertEquals((String)("Position should become " + 2 * numRecords), (long)(2 * numRecords), (long)writer.getPositionWithinLogSegment());
        LedgerHandle lh = this.getLedgerHandle(writer);
        LedgerHandle readLh = this.openLedgerNoRecovery(lh);
        Assert.assertTrue((String)("Ledger " + lh.getId() + " should not be closed"), (boolean)readLh.isClosed());
        Assert.assertEquals((String)("Only one entry is written for ledger " + lh.getId()), (long)0L, (long)lh.getLastAddPushed());
        Assert.assertEquals((String)("Only one entry is written for ledger " + lh.getId()), (long)0L, (long)readLh.getLastAddConfirmed());
    }

    @Test(timeout=60000L)
    public void testUpdateLastTxIdForUserRecords() throws Exception {
        DistributedLogConfiguration confLocal = this.newLocalConf();
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setOutputBufferSize(Integer.MAX_VALUE);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        ZKDistributedLock lock = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter writer = this.createLogSegmentWriter(confLocal, 0L, -1L, lock);
        int numRecords = 10;
        ArrayList futureList = new ArrayList(numRecords);
        for (int i = 0; i < numRecords; ++i) {
            futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
        }
        LogRecord controlRecord = DLMTestUtil.getLogRecordInstance(9999L);
        controlRecord.setControl();
        futureList.add(writer.asyncWrite(controlRecord));
        Assert.assertEquals((String)("Last tx id should be " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)("Last DLSN should be " + DLSN.InvalidDLSN), (Object)DLSN.InvalidDLSN, (Object)writer.getLastDLSN());
        Assert.assertEquals((String)("Position should be " + numRecords), (long)numRecords, (long)writer.getPositionWithinLogSegment());
        this.closeWriterAndLock(writer, lock);
        List dlsns = Utils.ioResult(FutureUtils.collect(futureList));
        Assert.assertEquals((String)"All 11 records should be written", (long)(numRecords + 1), (long)dlsns.size());
        for (int i = 0; i < numRecords; ++i) {
            DLSN dlsn = (DLSN)dlsns.get(i);
            Assert.assertEquals((String)"Incorrent ledger sequence number", (long)0L, (long)dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals((String)"Incorrent entry id", (long)0L, (long)dlsn.getEntryId());
            Assert.assertEquals((String)"Inconsistent slot id", (long)i, (long)dlsn.getSlotId());
        }
        DLSN dlsn = (DLSN)dlsns.get(numRecords);
        Assert.assertEquals((String)"Incorrent ledger sequence number", (long)0L, (long)dlsn.getLogSegmentSequenceNo());
        Assert.assertEquals((String)"Incorrent entry id", (long)1L, (long)dlsn.getEntryId());
        Assert.assertEquals((String)"Inconsistent slot id", (long)0L, (long)dlsn.getSlotId());
        Assert.assertEquals((String)("Last tx id should be " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxId());
        Assert.assertEquals((String)("Last acked tx id should be " + (numRecords - 1)), (long)(numRecords - 1), (long)writer.getLastTxIdAcknowledged());
        Assert.assertEquals((String)("Position should be " + numRecords), (long)numRecords, (long)writer.getPositionWithinLogSegment());
        Assert.assertEquals((String)("Last DLSN should be " + dlsn), dlsns.get(numRecords - 1), (Object)writer.getLastDLSN());
    }

    @Test(timeout=60000L)
    public void testNondurableWriteAfterWriterIsClosed() throws Exception {
        DistributedLogConfiguration confLocal = this.newLocalConf();
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setOutputBufferSize(Integer.MAX_VALUE);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setDurableWriteEnabled(false);
        ZKDistributedLock lock = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter writer = this.createLogSegmentWriter(confLocal, 0L, -1L, lock);
        this.closeWriterAndLock(writer, lock);
        Utils.ioResult(writer.asyncClose());
        try {
            Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1L)));
            Assert.fail((String)"Should fail the write if the writer is closed");
        }
        catch (WriteException writeException) {
            // empty catch block
        }
    }

    @Test(timeout=60000L)
    public void testNondurableWriteAfterEndOfStream() throws Exception {
        DistributedLogConfiguration confLocal = this.newLocalConf();
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setOutputBufferSize(Integer.MAX_VALUE);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setDurableWriteEnabled(false);
        ZKDistributedLock lock = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter writer = this.createLogSegmentWriter(confLocal, 0L, -1L, lock);
        Utils.ioResult(writer.markEndOfStream());
        try {
            Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1L)));
            Assert.fail((String)"Should fail the write if the writer is marked as end of stream");
        }
        catch (EndOfStreamException endOfStreamException) {
            // empty catch block
        }
        this.closeWriterAndLock(writer, lock);
    }

    @Test(timeout=60000L)
    public void testNondurableWriteAfterLedgerIsFenced() throws Exception {
        DistributedLogConfiguration confLocal = this.newLocalConf();
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setOutputBufferSize(Integer.MAX_VALUE);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setDurableWriteEnabled(false);
        ZKDistributedLock lock = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter writer = this.createLogSegmentWriter(confLocal, 0L, -1L, lock);
        this.fenceLedger(this.getLedgerHandle(writer));
        LogRecord record = DLMTestUtil.getLogRecordInstance(1L);
        record.setControl();
        try {
            Utils.ioResult(writer.asyncWrite(record));
            Assert.fail((String)"Should fail the writer if the log segment is already fenced");
        }
        catch (BKTransmitException bkte) {
            Assert.assertEquals((long)-101L, (long)bkte.getBKResultCode());
        }
        try {
            Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2L)));
            Assert.fail((String)"Should fail the writer if the log segment is already fenced");
        }
        catch (WriteException writeException) {
            // empty catch block
        }
        this.abortWriterAndLock(writer, lock);
    }

    @Test(timeout=60000L)
    public void testNondurableWrite() throws Exception {
        DistributedLogConfiguration confLocal = this.newLocalConf();
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setOutputBufferSize(Integer.MAX_VALUE);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setDurableWriteEnabled(false);
        ZKDistributedLock lock = this.createLock("/test/lock-" + this.runtime.getMethodName(), this.zkc, true);
        BKLogSegmentWriter writer = this.createLogSegmentWriter(confLocal, 0L, -1L, lock);
        Assert.assertEquals((Object)DLSN.InvalidDLSN, (Object)Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2L))));
        Assert.assertEquals((long)-1L, (long)((BKLogSegmentEntryWriter)writer.getEntryWriter()).getLedgerHandle().getLastAddPushed());
        this.closeWriterAndLock(writer, lock);
    }
}

