/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.transaction.coordinator.impl;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.io.netty.util.Recycler;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.Timer;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.transaction.coordinator.impl.TxnBatchedPositionImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TxnLogBufferedWriter<T>
implements AsyncCallbacks.AddEntryCallback,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(TxnLogBufferedWriter.class);
    public static final short BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER = 3585;
    public static final int BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER_LEN = 2;
    public static final short BATCHED_ENTRY_DATA_PREFIX_VERSION = 1;
    public static final short BATCHED_ENTRY_DATA_PREFIX_VERSION_LEN = 2;
    private static final ManagedLedgerException BUFFERED_WRITER_CLOSED_EXCEPTION = new ManagedLedgerException.ManagedLedgerFencedException(new Exception("Transaction log buffered write has closed"));
    private final boolean batchEnabled;
    private final ManagedLedger managedLedger;
    private final Timer timer;
    private final ExecutorService singleThreadExecutorForWrite;
    private final DataSerializer<T> dataSerializer;
    private Timeout timeout;
    private final int batchedWriteMaxRecords;
    private final int batchedWriteMaxSize;
    private final int batchedWriteMaxDelayInMillis;
    private final ArrayList<T> dataArray;
    private FlushContext flushContext;
    private long bytesSize;
    private volatile State state;
    private static final AtomicReferenceFieldUpdater<TxnLogBufferedWriter, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(TxnLogBufferedWriter.class, State.class, "state");
    private final TimerTask timingFlush = timeout -> {
        if (timeout.isCancelled()) {
            return;
        }
        this.trigFlush(false, true);
    };

    public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor orderedExecutor, Timer timer, DataSerializer<T> dataSerializer, int batchedWriteMaxRecords, int batchedWriteMaxSize, int batchedWriteMaxDelayInMillis, boolean batchEnabled) {
        this.batchEnabled = batchEnabled;
        this.managedLedger = managedLedger;
        this.singleThreadExecutorForWrite = orderedExecutor.chooseThread(managedLedger.getName() == null ? UUID.randomUUID().toString() : managedLedger.getName());
        this.dataSerializer = dataSerializer;
        this.batchedWriteMaxRecords = batchedWriteMaxRecords;
        this.batchedWriteMaxSize = batchedWriteMaxSize;
        this.batchedWriteMaxDelayInMillis = batchedWriteMaxDelayInMillis;
        this.flushContext = FlushContext.newInstance();
        this.dataArray = new ArrayList();
        this.state = State.OPEN;
        this.timer = timer;
        if (this.batchEnabled) {
            this.nextTimingTrigger();
        }
    }

    private void nextTimingTrigger() {
        try {
            if (this.state == State.CLOSING || this.state == State.CLOSED) {
                return;
            }
            this.timeout = this.timer.newTimeout(this.timingFlush, this.batchedWriteMaxDelayInMillis, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            log.error("Start timing flush trigger failed. managedLedger: " + this.managedLedger.getName(), (Throwable)e);
        }
    }

    public void asyncAddData(T data, AddDataCallback callback, Object ctx) {
        if (!this.batchEnabled) {
            if (this.state == State.CLOSING || this.state == State.CLOSED) {
                callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
                return;
            }
            ByteBuf byteBuf = this.dataSerializer.serialize(data);
            this.managedLedger.asyncAddEntry(byteBuf, (AsyncCallbacks.AddEntryCallback)DisabledBatchCallback.INSTANCE, (Object)AsyncAddArgs.newInstance(callback, ctx, System.currentTimeMillis(), byteBuf));
            return;
        }
        this.singleThreadExecutorForWrite.execute(() -> this.internalAsyncAddData(data, callback, ctx));
    }

    private void internalAsyncAddData(T data, AddDataCallback callback, Object ctx) {
        if (this.state == State.CLOSING || this.state == State.CLOSED) {
            callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
            return;
        }
        int len = this.dataSerializer.getSerializedSize(data);
        if (len >= this.batchedWriteMaxSize) {
            if (!this.flushContext.asyncAddArgsList.isEmpty()) {
                this.doTrigFlush(true, false);
            }
            ByteBuf byteBuf = this.dataSerializer.serialize(data);
            this.managedLedger.asyncAddEntry(byteBuf, (AsyncCallbacks.AddEntryCallback)DisabledBatchCallback.INSTANCE, (Object)AsyncAddArgs.newInstance(callback, ctx, System.currentTimeMillis(), byteBuf));
            return;
        }
        this.dataArray.add(data);
        AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, System.currentTimeMillis());
        this.flushContext.asyncAddArgsList.add(asyncAddArgs);
        this.bytesSize += (long)len;
        this.doTrigFlush(false, false);
    }

    public void trigFlush(boolean force, boolean byScheduleThreads) {
        this.singleThreadExecutorForWrite.execute(() -> this.doTrigFlush(force, byScheduleThreads));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doTrigFlush(boolean force, boolean byScheduleThreads) {
        try {
            if (this.flushContext.asyncAddArgsList.isEmpty()) {
                return;
            }
            if (force) {
                this.doFlush();
                return;
            }
            if (byScheduleThreads) {
                this.doFlush();
                return;
            }
            AsyncAddArgs firstAsyncAddArgs = this.flushContext.asyncAddArgsList.get(0);
            if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >= (long)this.batchedWriteMaxDelayInMillis) {
                this.doFlush();
                return;
            }
            if (this.flushContext.asyncAddArgsList.size() >= this.batchedWriteMaxRecords) {
                this.doFlush();
                return;
            }
            if (this.bytesSize >= (long)this.batchedWriteMaxSize) {
                this.doFlush();
            }
        }
        finally {
            if (byScheduleThreads) {
                this.nextTimingTrigger();
            }
        }
    }

    private void doFlush() {
        ByteBuf pairByteBuf;
        ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
        prefix.writeShort(3585);
        prefix.writeShort(1);
        ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
        this.flushContext.byteBuf = pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix, actualContent);
        if (State.CLOSING == this.state || State.CLOSED == this.state) {
            this.failureCallbackByContextAndRecycle(this.flushContext, BUFFERED_WRITER_CLOSED_EXCEPTION);
        } else {
            this.managedLedger.asyncAddEntry(pairByteBuf, (AsyncCallbacks.AddEntryCallback)this, (Object)this.flushContext);
        }
        this.dataArray.clear();
        this.flushContext = FlushContext.newInstance();
        this.bytesSize = 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addComplete(Position position, ByteBuf entryData, Object ctx) {
        FlushContext flushContext = (FlushContext)ctx;
        try {
            int batchSize = flushContext.asyncAddArgsList.size();
            for (int batchIndex = 0; batchIndex < batchSize; ++batchIndex) {
                AsyncAddArgs asyncAddArgs = flushContext.asyncAddArgsList.get(batchIndex);
                TxnBatchedPositionImpl txnBatchedPosition = new TxnBatchedPositionImpl(position, batchSize, batchIndex);
                try {
                    asyncAddArgs.callback.addComplete(txnBatchedPosition, asyncAddArgs.ctx);
                    continue;
                }
                catch (Exception e) {
                    log.error("After writing to the transaction batched log complete, the callback failed. managedLedger: " + this.managedLedger.getName(), (Throwable)e);
                }
            }
        }
        finally {
            flushContext.recycle();
        }
    }

    @Override
    public void addFailed(ManagedLedgerException exception, Object ctx) {
        FlushContext flushContext = (FlushContext)ctx;
        this.failureCallbackByContextAndRecycle(flushContext, exception);
    }

    @Override
    public void close() {
        if (!this.batchEnabled) {
            STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSED);
            return;
        }
        if (!STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSING)) {
            return;
        }
        this.singleThreadExecutorForWrite.execute(() -> {
            if (this.state == State.CLOSED) {
                return;
            }
            this.failureCallbackByContextAndRecycle(this.flushContext, BUFFERED_WRITER_CLOSED_EXCEPTION);
            if (this.timeout == null) {
                log.error("Cancel timeout-task that schedule at fixed rate trig flush failure. The field-timeout is null. managedLedger: " + this.managedLedger.getName());
            } else if (this.timeout.isCancelled()) {
                this.state = State.CLOSED;
            } else if (this.timeout.cancel()) {
                this.state = State.CLOSED;
            } else {
                log.error("Cancel timeout-task that schedule at fixed rate trig flush failure. The state will stay at CLOSING. managedLedger: " + this.managedLedger.getName());
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failureCallbackByContextAndRecycle(FlushContext flushContext, ManagedLedgerException ex) {
        if (flushContext == null || CollectionUtils.isEmpty(flushContext.asyncAddArgsList)) {
            return;
        }
        try {
            for (AsyncAddArgs asyncAddArgs : flushContext.asyncAddArgsList) {
                this.failureCallbackByArgs(asyncAddArgs, ex, false);
            }
        }
        finally {
            flushContext.recycle();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failureCallbackByArgs(AsyncAddArgs asyncAddArgs, ManagedLedgerException ex, boolean recycle) {
        if (asyncAddArgs == null) {
            return;
        }
        try {
            asyncAddArgs.callback.addFailed(ex, asyncAddArgs.ctx);
        }
        catch (Exception e) {
            log.error("After writing to the transaction batched log failure, the callback executed also failed. managedLedger: " + this.managedLedger.getName(), (Throwable)e);
        }
        finally {
            if (recycle) {
                asyncAddArgs.recycle();
            }
        }
    }

    public static interface DataSerializer<T> {
        public int getSerializedSize(T var1);

        public ByteBuf serialize(T var1);

        public ByteBuf serialize(ArrayList<T> var1);
    }

    private static class FlushContext {
        private static final Recycler<FlushContext> FLUSH_CONTEXT_RECYCLER = new Recycler<FlushContext>(){

            @Override
            protected FlushContext newObject(Recycler.Handle<FlushContext> handle) {
                return new FlushContext(handle);
            }
        };
        private final Recycler.Handle<FlushContext> handle;
        private final ArrayList<AsyncAddArgs> asyncAddArgsList;
        private ByteBuf byteBuf;

        private FlushContext(Recycler.Handle<FlushContext> handle) {
            this.handle = handle;
            this.asyncAddArgsList = new ArrayList(8);
        }

        private static FlushContext newInstance() {
            return FLUSH_CONTEXT_RECYCLER.get();
        }

        public void recycle() {
            for (AsyncAddArgs asyncAddArgs : this.asyncAddArgsList) {
                asyncAddArgs.recycle();
            }
            if (this.byteBuf != null) {
                this.byteBuf.release();
                this.byteBuf = null;
            }
            this.asyncAddArgsList.clear();
            this.handle.recycle(this);
        }
    }

    private static enum State {
        OPEN,
        CLOSING,
        CLOSED;

    }

    public static interface AddDataCallback {
        public void addComplete(Position var1, Object var2);

        public void addFailed(ManagedLedgerException var1, Object var2);
    }

    private static class DisabledBatchCallback
    implements AsyncCallbacks.AddEntryCallback {
        private static final DisabledBatchCallback INSTANCE = new DisabledBatchCallback();

        private DisabledBatchCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void addComplete(Position position, ByteBuf entryData, Object ctx) {
            AsyncAddArgs asyncAddArgs = (AsyncAddArgs)ctx;
            try {
                asyncAddArgs.callback.addComplete(position, asyncAddArgs.ctx);
            }
            finally {
                asyncAddArgs.recycle();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void addFailed(ManagedLedgerException exception, Object ctx) {
            AsyncAddArgs asyncAddArgs = (AsyncAddArgs)ctx;
            try {
                asyncAddArgs.callback.addFailed(exception, asyncAddArgs.ctx);
            }
            finally {
                asyncAddArgs.recycle();
            }
        }
    }

    private static class AsyncAddArgs {
        private static final Recycler<AsyncAddArgs> ASYNC_ADD_ARGS_RECYCLER = new Recycler<AsyncAddArgs>(){

            @Override
            protected AsyncAddArgs newObject(Recycler.Handle<AsyncAddArgs> handle) {
                return new AsyncAddArgs(handle);
            }
        };
        private final Recycler.Handle<AsyncAddArgs> handle;
        private AddDataCallback callback;
        private Object ctx;
        private long addedTime;
        private ByteBuf byteBuf;

        private static AsyncAddArgs newInstance(AddDataCallback callback, Object ctx, long addedTime) {
            AsyncAddArgs asyncAddArgs = ASYNC_ADD_ARGS_RECYCLER.get();
            asyncAddArgs.callback = callback;
            asyncAddArgs.ctx = ctx;
            asyncAddArgs.addedTime = addedTime;
            return asyncAddArgs;
        }

        private static AsyncAddArgs newInstance(AddDataCallback callback, Object ctx, long addedTime, ByteBuf byteBuf) {
            AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, addedTime);
            asyncAddArgs.byteBuf = byteBuf;
            return asyncAddArgs;
        }

        private AsyncAddArgs(Recycler.Handle<AsyncAddArgs> handle) {
            this.handle = handle;
        }

        public void recycle() {
            this.callback = null;
            this.ctx = null;
            this.addedTime = 0L;
            if (this.byteBuf != null) {
                this.byteBuf.release();
            }
            this.handle.recycle(this);
        }

        public String toString() {
            return "TxnLogBufferedWriter.AsyncAddArgs(handle=" + this.handle + ", callback=" + this.getCallback() + ", ctx=" + this.getCtx() + ", addedTime=" + this.getAddedTime() + ", byteBuf=" + this.byteBuf + ")";
        }

        public AddDataCallback getCallback() {
            return this.callback;
        }

        public Object getCtx() {
            return this.ctx;
        }

        public long getAddedTime() {
            return this.addedTime;
        }
    }
}

