/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.transaction.coordinator.impl;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionLog;
import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MLTransactionLogImpl
implements TransactionLog {
    private static final Logger log = LoggerFactory.getLogger(MLTransactionLogImpl.class);
    private final ManagedLedger managedLedger;
    public static final String TRANSACTION_LOG_PREFIX = NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-";
    private final ManagedCursor cursor;
    private static final String TRANSACTION_SUBSCRIPTION_NAME = "transaction.subscription";
    private final SpscArrayQueue<Entry> entryQueue;
    private final PositionImpl lastConfirmedEntry;
    private PositionImpl currentLoadPosition;
    private final long tcId;
    private final String topicName;

    public MLTransactionLogImpl(TransactionCoordinatorID tcID, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig) throws Exception {
        this.topicName = TRANSACTION_LOG_PREFIX + tcID;
        this.tcId = tcID.getId();
        this.managedLedger = managedLedgerFactory.open(this.topicName, managedLedgerConfig);
        this.cursor = this.managedLedger.openCursor(TRANSACTION_SUBSCRIPTION_NAME, PulsarApi.CommandSubscribe.InitialPosition.Earliest);
        this.currentLoadPosition = (PositionImpl)this.cursor.getMarkDeletedPosition();
        this.entryQueue = new SpscArrayQueue(2000);
        this.lastConfirmedEntry = (PositionImpl)this.managedLedger.getLastConfirmedEntry();
    }

    @Override
    public void replayAsync(TransactionLogReplayCallback transactionLogReplayCallback) {
        new TransactionLogReplayer(transactionLogReplayCallback).start();
    }

    private void readAsync(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback readEntriesCallback) {
        this.cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, System.nanoTime());
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        this.managedLedger.asyncClose(new AsyncCallbacks.CloseCallback(){

            @Override
            public void closeComplete(Object ctx) {
                log.info("Transaction log with tcId : {} close managedLedger successful!", (Object)MLTransactionLogImpl.this.tcId);
                completableFuture.complete(null);
            }

            @Override
            public void closeFailed(ManagedLedgerException exception, Object ctx) {
                log.error("Transaction log with tcId : {} close managedLedger fail!", (Object)MLTransactionLogImpl.this.tcId);
                completableFuture.completeExceptionally(exception);
            }
        }, null);
        return completableFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Position> append(PulsarTransactionMetadata.TransactionMetadataEntry transactionMetadataEntry) {
        int transactionMetadataEntrySize = transactionMetadataEntry.getSerializedSize();
        final ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize);
        ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(buf);
        final CompletableFuture<Position> completableFuture = new CompletableFuture<Position>();
        try {
            transactionMetadataEntry.writeTo(outStream);
            this.managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback(){

                @Override
                public void addComplete(Position position, Object ctx) {
                    buf.release();
                    completableFuture.complete(position);
                }

                @Override
                public void addFailed(ManagedLedgerException exception, Object ctx) {
                    log.error("Transaction log write transaction operation error", (Throwable)exception);
                    buf.release();
                    completableFuture.completeExceptionally(exception);
                }
            }, null);
        }
        catch (IOException e) {
            log.error("Transaction log write transaction operation error", (Throwable)e);
            completableFuture.completeExceptionally(e);
        }
        finally {
            outStream.recycle();
        }
        return completableFuture;
    }

    public CompletableFuture<Void> deletePosition(List<Position> positions) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        this.cursor.asyncDelete(positions, new AsyncCallbacks.DeleteCallback(){

            @Override
            public void deleteComplete(Object position) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{}] Deleted message at {}", new Object[]{MLTransactionLogImpl.this.topicName, MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, position});
                }
                completableFuture.complete(null);
            }

            @Override
            public void deleteFailed(ManagedLedgerException exception, Object ctx) {
                log.warn("[{}][{}] Failed to delete message at {}", new Object[]{MLTransactionLogImpl.this.topicName, MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, ctx, exception});
                completableFuture.completeExceptionally(exception);
            }
        }, null);
        return completableFuture;
    }

    class FillEntryQueueCallback
    implements AsyncCallbacks.ReadEntriesCallback {
        private final AtomicLong outstandingReadsRequests = new AtomicLong(0L);

        FillEntryQueueCallback() {
        }

        void fillQueue() {
            if (MLTransactionLogImpl.this.entryQueue.size() < MLTransactionLogImpl.this.entryQueue.capacity() && this.outstandingReadsRequests.get() == 0L && MLTransactionLogImpl.this.cursor.hasMoreEntries()) {
                this.outstandingReadsRequests.incrementAndGet();
                MLTransactionLogImpl.this.readAsync(100, this);
            }
        }

        @Override
        public void readEntriesComplete(final List<Entry> entries, Object ctx) {
            MLTransactionLogImpl.this.entryQueue.fill((MessagePassingQueue.Supplier)new MessagePassingQueue.Supplier<Entry>(){
                private int i = 0;

                public Entry get() {
                    Entry entry = (Entry)entries.get(this.i);
                    ++this.i;
                    return entry;
                }
            }, entries.size());
            this.outstandingReadsRequests.decrementAndGet();
        }

        @Override
        public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
            log.error("Transaction log init fail error!", (Throwable)exception);
            this.outstandingReadsRequests.decrementAndGet();
        }
    }

    class TransactionLogReplayer {
        private final FillEntryQueueCallback fillEntryQueueCallback;
        private final TransactionLogReplayCallback transactionLogReplayCallback;

        TransactionLogReplayer(TransactionLogReplayCallback transactionLogReplayCallback) {
            this.fillEntryQueueCallback = new FillEntryQueueCallback();
            this.transactionLogReplayCallback = transactionLogReplayCallback;
        }

        public void start() {
            while (MLTransactionLogImpl.this.lastConfirmedEntry.compareTo(MLTransactionLogImpl.this.currentLoadPosition) > 0) {
                this.fillEntryQueueCallback.fillQueue();
                Entry entry = (Entry)MLTransactionLogImpl.this.entryQueue.poll();
                if (entry != null) {
                    PulsarTransactionMetadata.TransactionMetadataEntry transactionMetadataEntry;
                    ByteBuf buffer = entry.getDataBuffer();
                    MLTransactionLogImpl.this.currentLoadPosition = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                    ByteBufCodedInputStream stream = ByteBufCodedInputStream.get(buffer);
                    PulsarTransactionMetadata.TransactionMetadataEntry.Builder transactionMetadataEntryBuilder = PulsarTransactionMetadata.TransactionMetadataEntry.newBuilder();
                    try {
                        transactionMetadataEntry = transactionMetadataEntryBuilder.mergeFrom(stream, null).build();
                    }
                    catch (IOException e) {
                        log.error(e.getMessage(), (Throwable)e);
                        throw new RuntimeException("TransactionLog convert entry error : ", e);
                    }
                    this.transactionLogReplayCallback.handleMetadataEntry(entry.getPosition(), transactionMetadataEntry);
                    entry.release();
                    transactionMetadataEntry.recycle();
                    transactionMetadataEntryBuilder.recycle();
                    stream.recycle();
                    continue;
                }
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException interruptedException) {}
            }
            this.transactionLogReplayCallback.replayComplete();
        }
    }
}

