/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.bookkeeper.client;

import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.pulsar.shade.com.google.common.util.concurrent.RateLimiter;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.AsyncCallback;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerEntry;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerFragment;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.MetadataUpdateLoop;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerManager;
import org.apache.pulsar.shade.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.Counter;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.ByteBufList;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.MathUtils;
import org.apache.pulsar.shade.org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@StatsDoc(name="replication_worker", help="Ledger fragment replicator related stats")
public class LedgerFragmentReplicator {
    private BookKeeper bkc;
    private StatsLogger statsLogger;
    @StatsDoc(name="NUM_ENTRIES_READ", help="Number of entries read by the replicator")
    private final Counter numEntriesRead;
    @StatsDoc(name="NUM_BYTES_READ", help="The distribution of size of entries read by the replicator")
    private final OpStatsLogger numBytesRead;
    @StatsDoc(name="NUM_ENTRIES_WRITTEN", help="Number of entries written by the replicator")
    private final Counter numEntriesWritten;
    @StatsDoc(name="NUM_BYTES_WRITTEN", help="The distribution of size of entries written by the replicator")
    private final OpStatsLogger numBytesWritten;
    @StatsDoc(name="READ_DATA_LATENCY", help="The distribution of latency of read entries by the replicator")
    private final OpStatsLogger readDataLatency;
    @StatsDoc(name="WRITE_DATA_LATENCY", help="The distribution of latency of write entries by the replicator")
    private final OpStatsLogger writeDataLatency;
    protected Throttler replicationThrottle = null;
    private AtomicInteger averageEntrySize;
    private static final int INITIAL_AVERAGE_ENTRY_SIZE = 1024;
    private static final double AVERAGE_ENTRY_SIZE_RATIO = 0.8;
    private static final Logger LOG = LoggerFactory.getLogger(LedgerFragmentReplicator.class);

    public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger, ClientConfiguration conf) {
        this.bkc = bkc;
        this.statsLogger = statsLogger;
        this.numEntriesRead = this.statsLogger.getCounter("NUM_ENTRIES_READ");
        this.numBytesRead = this.statsLogger.getOpStatsLogger("NUM_BYTES_READ");
        this.numEntriesWritten = this.statsLogger.getCounter("NUM_ENTRIES_WRITTEN");
        this.numBytesWritten = this.statsLogger.getOpStatsLogger("NUM_BYTES_WRITTEN");
        this.readDataLatency = this.statsLogger.getOpStatsLogger("READ_DATA_LATENCY");
        this.writeDataLatency = this.statsLogger.getOpStatsLogger("WRITE_DATA_LATENCY");
        if (conf.getReplicationRateByBytes() > 0) {
            this.replicationThrottle = new Throttler(conf.getReplicationRateByBytes());
        }
        this.averageEntrySize = new AtomicInteger(1024);
    }

    public LedgerFragmentReplicator(BookKeeper bkc, ClientConfiguration conf) {
        this(bkc, NullStatsLogger.INSTANCE, conf);
    }

    private void replicateFragmentInternal(LedgerHandle lh, LedgerFragment lf, AsyncCallback.VoidCallback ledgerFragmentMcb, Set<BookieId> newBookies, BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException {
        if (!lf.isClosed()) {
            LOG.error("Trying to replicate an unclosed fragment; This is not safe {}", (Object)lf);
            ledgerFragmentMcb.processResult(-103, null, null);
            return;
        }
        Long startEntryId = lf.getFirstStoredEntryId();
        Long endEntryId = lf.getLastStoredEntryId();
        if (startEntryId == -1L ^ endEntryId == -1L) {
            LOG.error("For LedgerFragment: {}, seeing inconsistent firstStoredEntryId: {} and lastStoredEntryId: {}", new Object[]{lf, startEntryId, endEntryId});
            assert (false);
        }
        if (startEntryId > endEntryId || endEntryId <= -1L) {
            ledgerFragmentMcb.processResult(0, null, null);
            return;
        }
        LinkedList<Long> entriesToReplicate = new LinkedList<Long>();
        long lastStoredEntryId = lf.getLastStoredEntryId();
        for (long i = lf.getFirstStoredEntryId(); i <= lastStoredEntryId; ++i) {
            entriesToReplicate.add(i);
        }
        BookkeeperInternalCallbacks.MultiCallback ledgerFragmentEntryMcb = new BookkeeperInternalCallbacks.MultiCallback(entriesToReplicate.size(), ledgerFragmentMcb, null, 0, -10);
        for (Long entryId : entriesToReplicate) {
            this.recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb, newBookies, onReadEntryFailureCallback);
        }
    }

    void replicate(LedgerHandle lh, LedgerFragment lf, AsyncCallback.VoidCallback ledgerFragmentMcb, Set<BookieId> targetBookieAddresses, BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException {
        Set<LedgerFragment> partionedFragments = LedgerFragmentReplicator.splitIntoSubFragments(lh, lf, this.bkc.getConf().getRereplicationEntryBatchSize());
        LOG.info("Replicating fragment {} in {} sub fragments.", (Object)lf, (Object)partionedFragments.size());
        this.replicateNextBatch(lh, partionedFragments.iterator(), ledgerFragmentMcb, targetBookieAddresses, onReadEntryFailureCallback);
    }

    private void replicateNextBatch(final LedgerHandle lh, final Iterator<LedgerFragment> fragments, final AsyncCallback.VoidCallback ledgerFragmentMcb, final Set<BookieId> targetBookieAddresses, final BiConsumer<Long, Long> onReadEntryFailureCallback) {
        if (fragments.hasNext()) {
            try {
                this.replicateFragmentInternal(lh, fragments.next(), new AsyncCallback.VoidCallback(){

                    @Override
                    public void processResult(int rc, String v, Object ctx) {
                        if (rc != 0) {
                            ledgerFragmentMcb.processResult(rc, null, null);
                        } else {
                            LedgerFragmentReplicator.this.replicateNextBatch(lh, fragments, ledgerFragmentMcb, targetBookieAddresses, onReadEntryFailureCallback);
                        }
                    }
                }, targetBookieAddresses, onReadEntryFailureCallback);
            }
            catch (InterruptedException e) {
                ledgerFragmentMcb.processResult(-15, null, null);
                Thread.currentThread().interrupt();
            }
        } else {
            ledgerFragmentMcb.processResult(0, null, null);
        }
    }

    static Set<LedgerFragment> splitIntoSubFragments(LedgerHandle lh, LedgerFragment ledgerFragment, long rereplicationEntryBatchSize) {
        long numberOfEntriesToReplicate;
        long splitsWithFullEntries;
        long lastEntryId;
        HashSet<LedgerFragment> fragments = new HashSet<LedgerFragment>();
        if (rereplicationEntryBatchSize <= 0L) {
            fragments.add(ledgerFragment);
            return fragments;
        }
        long firstEntryId = ledgerFragment.getFirstStoredEntryId();
        if (firstEntryId == -1L ^ (lastEntryId = ledgerFragment.getLastStoredEntryId()) == -1L) {
            LOG.error("For LedgerFragment: {}, seeing inconsistent firstStoredEntryId: {} and lastStoredEntryId: {}", new Object[]{ledgerFragment, firstEntryId, lastEntryId});
            assert (false);
        }
        if ((splitsWithFullEntries = (numberOfEntriesToReplicate = lastEntryId - firstEntryId + 1L) / rereplicationEntryBatchSize) == 0L) {
            fragments.add(ledgerFragment);
            return fragments;
        }
        long fragmentSplitLastEntry = 0L;
        int i = 0;
        while ((long)i < splitsWithFullEntries) {
            fragmentSplitLastEntry = firstEntryId + rereplicationEntryBatchSize - 1L;
            fragments.add(new LedgerFragment(lh, firstEntryId, fragmentSplitLastEntry, ledgerFragment.getBookiesIndexes()));
            firstEntryId = fragmentSplitLastEntry + 1L;
            ++i;
        }
        long lastSplitWithPartialEntries = numberOfEntriesToReplicate % rereplicationEntryBatchSize;
        if (lastSplitWithPartialEntries > 0L) {
            fragments.add(new LedgerFragment(lh, firstEntryId, firstEntryId + lastSplitWithPartialEntries - 1L, ledgerFragment.getBookiesIndexes()));
        }
        return fragments;
    }

    private void recoverLedgerFragmentEntry(final Long entryId, LedgerHandle lh, final AsyncCallback.VoidCallback ledgerFragmentEntryMcb, final Set<BookieId> newBookies, final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException {
        final long ledgerId = lh.getId();
        final AtomicInteger numCompleted = new AtomicInteger(0);
        final AtomicBoolean completed = new AtomicBoolean(false);
        if (this.replicationThrottle != null) {
            this.replicationThrottle.acquire(this.averageEntrySize.get());
        }
        final BookkeeperInternalCallbacks.WriteCallback multiWriteCallback = new BookkeeperInternalCallbacks.WriteCallback(){

            @Override
            public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
                if (rc != 0) {
                    LOG.error("BK error writing entry for ledgerId: {}, entryId: {}, bookie: {}", new Object[]{ledgerId, entryId, addr, BKException.create(rc)});
                    if (completed.compareAndSet(false, true)) {
                        ledgerFragmentEntryMcb.processResult(rc, null, null);
                    }
                } else {
                    LedgerFragmentReplicator.this.numEntriesWritten.inc();
                    if (ctx instanceof Long) {
                        LedgerFragmentReplicator.this.numBytesWritten.registerSuccessfulValue((Long)ctx);
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Success writing ledger id {}, entry id {} to a new bookie {}!", new Object[]{ledgerId, entryId, addr});
                    }
                    if (numCompleted.incrementAndGet() == newBookies.size() && completed.compareAndSet(false, true)) {
                        ledgerFragmentEntryMcb.processResult(rc, null, null);
                    }
                }
            }
        };
        final long startReadEntryTime = MathUtils.nowInNano();
        lh.asyncReadEntries(entryId, entryId, new AsyncCallback.ReadCallback(){

            @Override
            public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                if (rc != 0) {
                    LOG.error("BK error reading ledger entry: " + entryId, (Throwable)BKException.create(rc));
                    onReadEntryFailureCallback.accept(ledgerId, entryId);
                    ledgerFragmentEntryMcb.processResult(rc, null, null);
                    return;
                }
                LedgerFragmentReplicator.this.readDataLatency.registerSuccessfulEvent(MathUtils.elapsedNanos(startReadEntryTime), TimeUnit.NANOSECONDS);
                LedgerEntry entry = seq.nextElement();
                byte[] data = entry.getEntry();
                long dataLength = data.length;
                LedgerFragmentReplicator.this.numEntriesRead.inc();
                LedgerFragmentReplicator.this.numBytesRead.registerSuccessfulValue(dataLength);
                ByteBufList toSend = lh.getDigestManager().computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), entry.getLength(), Unpooled.wrappedBuffer(data, 0, data.length));
                if (LedgerFragmentReplicator.this.replicationThrottle != null) {
                    LedgerFragmentReplicator.this.updateAverageEntrySize(toSend.readableBytes());
                }
                for (BookieId newBookie : newBookies) {
                    long startWriteEntryTime = MathUtils.nowInNano();
                    LedgerFragmentReplicator.this.bkc.getBookieClient().addEntry(newBookie, lh.getId(), lh.getLedgerKey(), entryId, ByteBufList.clone(toSend), multiWriteCallback, dataLength, 2, false, WriteFlag.NONE);
                    LedgerFragmentReplicator.this.writeDataLatency.registerSuccessfulEvent(MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS);
                }
                toSend.release();
            }
        }, null);
    }

    private void updateAverageEntrySize(int toSendSize) {
        this.averageEntrySize.updateAndGet(value -> (int)((double)value * 0.8 + 0.19999999999999996 * (double)toSendSize));
    }

    private static void updateEnsembleInfo(LedgerManager ledgerManager, AsyncCallback.VoidCallback ensembleUpdatedCb, long fragmentStartId, LedgerHandle lh, Map<BookieId, BookieId> oldBookie2NewBookie) {
        MetadataUpdateLoop updateLoop = new MetadataUpdateLoop(ledgerManager, lh.getId(), lh::getVersionedLedgerMetadata, metadata -> {
            List ensemble = (List)metadata.getAllEnsembles().get(fragmentStartId);
            return oldBookie2NewBookie.keySet().stream().anyMatch(ensemble::contains);
        }, currentMetadata -> {
            List<BookieId> newEnsemble = ((List)currentMetadata.getAllEnsembles().get(fragmentStartId)).stream().map(bookie -> oldBookie2NewBookie.getOrDefault(bookie, (BookieId)bookie)).collect(Collectors.toList());
            return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(fragmentStartId, newEnsemble).build();
        }, lh::setLedgerMetadata);
        updateLoop.run().whenComplete((result, ex) -> {
            if (ex == null) {
                LOG.info("Updated ZK to point ledger fragments from old bookies to new bookies: {}", (Object)oldBookie2NewBookie);
                ensembleUpdatedCb.processResult(0, null, null);
            } else {
                LOG.error("Error updating ledger config metadata for ledgerId {}", (Object)lh.getId(), ex);
                ensembleUpdatedCb.processResult(BKException.getExceptionCode(ex, -999), null, null);
            }
        });
    }

    static class Throttler {
        private final RateLimiter rateLimiter;

        Throttler(int throttleBytes) {
            this.rateLimiter = RateLimiter.create(throttleBytes);
        }

        void acquire(int permits) {
            this.rateLimiter.acquire(permits);
        }
    }

    static class SingleFragmentCallback
    implements AsyncCallback.VoidCallback {
        final AsyncCallback.VoidCallback ledgerFragmentsMcb;
        final LedgerHandle lh;
        final LedgerManager ledgerManager;
        final long fragmentStartId;
        final Map<BookieId, BookieId> oldBookie2NewBookie;

        SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb, LedgerHandle lh, LedgerManager ledgerManager, long fragmentStartId, Map<BookieId, BookieId> oldBookie2NewBookie) {
            this.ledgerFragmentsMcb = ledgerFragmentsMcb;
            this.lh = lh;
            this.ledgerManager = ledgerManager;
            this.fragmentStartId = fragmentStartId;
            this.oldBookie2NewBookie = oldBookie2NewBookie;
        }

        @Override
        public void processResult(int rc, String path, Object ctx) {
            if (rc != 0) {
                LOG.error("BK error replicating ledger fragments for ledger: " + this.lh.getId(), (Throwable)BKException.create(rc));
                this.ledgerFragmentsMcb.processResult(rc, null, null);
                return;
            }
            LedgerFragmentReplicator.updateEnsembleInfo(this.ledgerManager, this.ledgerFragmentsMcb, this.fragmentStartId, this.lh, this.oldBookie2NewBookie);
        }
    }
}

