package org.apache.bookkeeper.client;

import com.carrotsearch.hppc.IntArrayList;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/bookkeeper/client/PendingReadOp.class */
public class PendingReadOp implements Enumeration<LedgerEntry>, BookkeeperInternalCallbacks.ReadEntryCallback {
    private static final Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
    final int speculativeReadTimeout;
    private final ScheduledExecutorService scheduler;
    Queue<LedgerEntryRequest> seq;
    AsyncCallback.ReadCallback cb;
    Object ctx;
    LedgerHandle lh;
    long numPendingEntries;
    long startEntryId;
    long endEntryId;
    long requestTimeNanos;
    OpStatsLogger readOpLogger;
    private ScheduledFuture<?> speculativeTask = null;
    boolean isRecoveryRead = false;
    final int maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize() - getLedgerMetadata().getAckQuorumSize();
    Set<BookieSocketAddress> heardFromHosts = new HashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/bookkeeper/client/PendingReadOp$LedgerEntryRequest.class */
    public class LedgerEntryRequest extends LedgerEntry {
        static final int NOT_FOUND = -1;
        int nextReplicaIndexToReadFrom;
        AtomicBoolean complete;
        int firstError;
        int numMissedEntryReads;
        final ArrayList<BookieSocketAddress> ensemble;
        final IntArrayList writeSet;
        final BitSet sentReplicas;
        final BitSet erroredReplicas;

        LedgerEntryRequest(ArrayList<BookieSocketAddress> arrayList, long j, long j2) {
            super(j, j2);
            this.nextReplicaIndexToReadFrom = 0;
            this.complete = new AtomicBoolean(false);
            this.firstError = 0;
            this.numMissedEntryReads = 0;
            this.ensemble = arrayList;
            this.writeSet = PendingReadOp.this.lh.bk.placementPolicy.reorderReadSequence(arrayList, PendingReadOp.this.lh.distributionSchedule.getWriteSet(this.entryId));
            this.sentReplicas = new BitSet(PendingReadOp.this.lh.getLedgerMetadata().getWriteQuorumSize());
            this.erroredReplicas = new BitSet(PendingReadOp.this.lh.getLedgerMetadata().getWriteQuorumSize());
        }

        private int getReplicaIndex(BookieSocketAddress bookieSocketAddress) {
            int indexOf = this.ensemble.indexOf(bookieSocketAddress);
            if (indexOf == -1) {
                return -1;
            }
            return this.writeSet.indexOf(indexOf);
        }

        private BitSet getSentToBitSet() {
            BitSet bitSet = new BitSet(this.ensemble.size());
            for (int i = 0; i < this.sentReplicas.length(); i++) {
                if (this.sentReplicas.get(i)) {
                    bitSet.set(this.writeSet.get(i));
                }
            }
            return bitSet;
        }

        private BitSet getHeardFromBitSet(Set<BookieSocketAddress> set) {
            BitSet bitSet = new BitSet(this.ensemble.size());
            Iterator<BookieSocketAddress> it = set.iterator();
            while (it.hasNext()) {
                int indexOf = this.ensemble.indexOf(it.next());
                if (indexOf != -1) {
                    bitSet.set(indexOf);
                }
            }
            return bitSet;
        }

        private boolean readsOutstanding() {
            return this.sentReplicas.cardinality() - this.erroredReplicas.cardinality() > 0;
        }

        synchronized BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> set) {
            if (this.nextReplicaIndexToReadFrom >= PendingReadOp.this.getLedgerMetadata().getWriteQuorumSize()) {
                return null;
            }
            BitSet sentToBitSet = getSentToBitSet();
            sentToBitSet.and(getHeardFromBitSet(set));
            if (sentToBitSet.cardinality() == 0) {
                return sendNextRead();
            }
            return null;
        }

        synchronized BookieSocketAddress sendNextRead() {
            if (this.nextReplicaIndexToReadFrom >= PendingReadOp.this.getLedgerMetadata().getWriteQuorumSize()) {
                if (-8 == this.firstError && this.numMissedEntryReads > PendingReadOp.this.maxMissedReadsAllowed) {
                    this.firstError = -13;
                }
                PendingReadOp.this.submitCallback(this.firstError);
                return null;
            }
            int i = this.nextReplicaIndexToReadFrom;
            int i2 = this.writeSet.get(this.nextReplicaIndexToReadFrom);
            this.nextReplicaIndexToReadFrom++;
            try {
                BookieSocketAddress bookieSocketAddress = this.ensemble.get(i2);
                PendingReadOp.this.sendReadTo(bookieSocketAddress, this);
                this.sentReplicas.set(i);
                return bookieSocketAddress;
            } catch (InterruptedException e) {
                PendingReadOp.LOG.error("Interrupted reading entry " + this, e);
                Thread.currentThread().interrupt();
                PendingReadOp.this.submitCallback(-1);
                return null;
            }
        }

        synchronized void logErrorAndReattemptRead(BookieSocketAddress bookieSocketAddress, String str, int i) {
            if (0 == this.firstError || -13 == this.firstError || -7 == this.firstError) {
                this.firstError = i;
            } else if (-8 == this.firstError && -13 != i && -7 != i) {
                this.firstError = i;
            }
            if (-13 == i || -7 == i) {
                this.numMissedEntryReads++;
                if (PendingReadOp.LOG.isDebugEnabled()) {
                    PendingReadOp.LOG.debug("No such entry found on bookie.  L{} E{} bookie: {}", new Object[]{Long.valueOf(PendingReadOp.this.lh.ledgerId), Long.valueOf(this.entryId), bookieSocketAddress});
                }
            } else if (PendingReadOp.LOG.isDebugEnabled()) {
                PendingReadOp.LOG.debug(str + " while reading L{} E{} from bookie: {}", new Object[]{Long.valueOf(PendingReadOp.this.lh.ledgerId), Long.valueOf(this.entryId), bookieSocketAddress});
            }
            int replicaIndex = getReplicaIndex(bookieSocketAddress);
            if (replicaIndex == -1) {
                PendingReadOp.LOG.error("Received error from a host which is not in the ensemble {} {}.", bookieSocketAddress, this.ensemble);
                return;
            }
            this.erroredReplicas.set(replicaIndex);
            if (readsOutstanding()) {
                return;
            }
            sendNextRead();
        }

        boolean complete(BookieSocketAddress bookieSocketAddress, ByteBuf byteBuf) {
            try {
                ByteBuf verifyDigestAndReturnData = PendingReadOp.this.lh.macManager.verifyDigestAndReturnData(this.entryId, byteBuf);
                if (this.complete.getAndSet(true)) {
                    return false;
                }
                this.length = byteBuf.getLong(24);
                this.data = verifyDigestAndReturnData;
                return true;
            } catch (BKException.BKDigestMatchException e) {
                logErrorAndReattemptRead(bookieSocketAddress, "Mac mismatch", -5);
                return false;
            }
        }

        boolean isComplete() {
            return this.complete.get();
        }

        public String toString() {
            return String.format("L%d-E%d", Long.valueOf(this.ledgerId), Long.valueOf(this.entryId));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/bookkeeper/client/PendingReadOp$ReadContext.class */
    public static class ReadContext {
        final BookieSocketAddress to;
        final LedgerEntryRequest entry;

        ReadContext(BookieSocketAddress bookieSocketAddress, LedgerEntryRequest ledgerEntryRequest) {
            this.to = bookieSocketAddress;
            this.entry = ledgerEntryRequest;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PendingReadOp(LedgerHandle ledgerHandle, ScheduledExecutorService scheduledExecutorService, long j, long j2, AsyncCallback.ReadCallback readCallback, Object obj) {
        this.seq = new ArrayBlockingQueue((int) ((j2 + 1) - j));
        this.cb = readCallback;
        this.ctx = obj;
        this.lh = ledgerHandle;
        this.startEntryId = j;
        this.endEntryId = j2;
        this.scheduler = scheduledExecutorService;
        this.numPendingEntries = (j2 - j) + 1;
        this.speculativeReadTimeout = ledgerHandle.bk.getConf().getSpeculativeReadTimeout();
        this.readOpLogger = ledgerHandle.bk.getReadOpLogger();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PendingReadOp enableRecoveryFlag() {
        this.isRecoveryRead = true;
        return this;
    }

    protected LedgerMetadata getLedgerMetadata() {
        return this.lh.metadata;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cancelSpeculativeTask(boolean z) {
        if (this.speculativeTask != null) {
            this.speculativeTask.cancel(z);
            this.speculativeTask = null;
        }
    }

    public void initiate() throws InterruptedException {
        long j = this.startEntryId;
        long j2 = this.startEntryId;
        this.requestTimeNanos = MathUtils.nowInNano();
        ArrayList<BookieSocketAddress> arrayList = null;
        if (this.speculativeReadTimeout > 0) {
            try {
                this.speculativeTask = this.scheduler.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.bookkeeper.client.PendingReadOp.1
                    @Override // java.lang.Runnable
                    public void run() {
                        int i = 0;
                        for (LedgerEntryRequest ledgerEntryRequest : PendingReadOp.this.seq) {
                            if (!ledgerEntryRequest.isComplete()) {
                                if (null == ledgerEntryRequest.maybeSendSpeculativeRead(PendingReadOp.this.heardFromHosts)) {
                                    PendingReadOp.this.cancelSpeculativeTask(false);
                                } else {
                                    PendingReadOp.LOG.debug("Send speculative read for {}. Hosts heard are {}.", ledgerEntryRequest, PendingReadOp.this.heardFromHosts);
                                    i++;
                                }
                            }
                        }
                        if (i <= 0 || !PendingReadOp.LOG.isDebugEnabled()) {
                            return;
                        }
                        PendingReadOp.LOG.debug("Send {} speculative reads for ledger {} ({}, {}). Hosts heard are {}.", new Object[]{Integer.valueOf(i), Long.valueOf(PendingReadOp.this.lh.getId()), Long.valueOf(PendingReadOp.this.startEntryId), Long.valueOf(PendingReadOp.this.endEntryId), PendingReadOp.this.heardFromHosts});
                    }
                }, this.speculativeReadTimeout, this.speculativeReadTimeout, TimeUnit.MILLISECONDS);
            } catch (RejectedExecutionException e) {
                LOG.debug("Failed to schedule speculative reads for ledger {} ({}, {}) : ", new Object[]{Long.valueOf(this.lh.getId()), Long.valueOf(this.startEntryId), Long.valueOf(this.endEntryId), e});
            }
        }
        do {
            if (j2 == j) {
                arrayList = getLedgerMetadata().getEnsemble(j2);
                j = getLedgerMetadata().getNextEnsembleChange(j2);
            }
            LedgerEntryRequest ledgerEntryRequest = new LedgerEntryRequest(arrayList, this.lh.ledgerId, j2);
            this.seq.add(ledgerEntryRequest);
            j2++;
            ledgerEntryRequest.sendNextRead();
        } while (j2 <= this.endEntryId);
    }

    void sendReadTo(BookieSocketAddress bookieSocketAddress, LedgerEntryRequest ledgerEntryRequest) throws InterruptedException {
        if (this.lh.throttler != null) {
            this.lh.throttler.acquire();
        }
        this.lh.bk.bookieClient.readEntry(bookieSocketAddress, this.lh.ledgerId, ledgerEntryRequest.entryId, this, new ReadContext(bookieSocketAddress, ledgerEntryRequest), this.isRecoveryRead ? 2 : 0);
    }

    @Override // org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
    public void readEntryComplete(int i, long j, long j2, ByteBuf byteBuf, Object obj) {
        ReadContext readContext = (ReadContext) obj;
        LedgerEntryRequest ledgerEntryRequest = readContext.entry;
        if (i != 0) {
            ledgerEntryRequest.logErrorAndReattemptRead(readContext.to, "Error: " + BKException.getMessage(i), i);
            return;
        }
        this.heardFromHosts.add(readContext.to);
        if (ledgerEntryRequest.complete(readContext.to, byteBuf)) {
            this.numPendingEntries--;
            ledgerEntryRequest.data.retain();
            if (this.numPendingEntries == 0) {
                submitCallback(0);
            } else if (this.cb == null) {
                ledgerEntryRequest.data.release();
                ledgerEntryRequest.data = null;
            }
        }
        if (this.numPendingEntries < 0) {
            LOG.error("Read too many values");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submitCallback(int i) {
        if (this.cb == null) {
            return;
        }
        long elapsedNanos = MathUtils.elapsedNanos(this.requestTimeNanos);
        if (i != 0) {
            long j = -1;
            for (LedgerEntryRequest ledgerEntryRequest : this.seq) {
                if (!ledgerEntryRequest.isComplete() && j == -1) {
                    j = ledgerEntryRequest.getEntryId();
                }
                ReferenceCountUtil.release(ledgerEntryRequest.data);
                ledgerEntryRequest.data = null;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Read of ledger entry failed: L{} E{}-E{}, Heard from {}. First unread entry is {}", new Object[]{Long.valueOf(this.lh.getId()), Long.valueOf(this.startEntryId), Long.valueOf(this.endEntryId), this.heardFromHosts, Long.valueOf(j)});
            }
            this.readOpLogger.registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
        } else {
            this.readOpLogger.registerSuccessfulEvent(elapsedNanos, TimeUnit.NANOSECONDS);
        }
        cancelSpeculativeTask(true);
        this.cb.readComplete(i, this.lh, this, this.ctx);
        this.cb = null;
    }

    @Override // java.util.Enumeration
    public boolean hasMoreElements() {
        return !this.seq.isEmpty();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Enumeration
    public LedgerEntry nextElement() throws NoSuchElementException {
        return this.seq.remove();
    }

    public int size() {
        return this.seq.size();
    }
}
