/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.bookkeeper.proto;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.shade.com.google.common.base.Stopwatch;
import org.apache.pulsar.shade.com.google.protobuf.ByteString;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.channel.Channel;
import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.shade.org.apache.bookkeeper.bookie.Bookie;
import org.apache.pulsar.shade.org.apache.bookkeeper.bookie.BookieException;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.BookieRequestProcessor;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.PacketProcessorBaseV3;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.RequestUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ReadEntryProcessorV3
extends PacketProcessorBaseV3 {
    private static final Logger LOG = LoggerFactory.getLogger(ReadEntryProcessorV3.class);
    protected Stopwatch lastPhaseStartTime;
    private final ExecutorService fenceThreadPool;
    private CompletableFuture<Boolean> fenceResult = null;
    protected final BookkeeperProtocol.ReadRequest readRequest;
    protected final long ledgerId;
    protected final long entryId;
    protected final OpStatsLogger readStats;
    protected final OpStatsLogger reqStats;

    public ReadEntryProcessorV3(BookkeeperProtocol.Request request, Channel channel, BookieRequestProcessor requestProcessor, ExecutorService fenceThreadPool) {
        super(request, channel, requestProcessor);
        requestProcessor.onReadRequestStart(channel);
        this.readRequest = request.getReadRequest();
        this.ledgerId = this.readRequest.getLedgerId();
        this.entryId = this.readRequest.getEntryId();
        if (RequestUtils.isFenceRequest(this.readRequest)) {
            this.readStats = requestProcessor.getRequestStats().getFenceReadEntryStats();
            this.reqStats = requestProcessor.getRequestStats().getFenceReadRequestStats();
        } else if (this.readRequest.hasPreviousLAC()) {
            this.readStats = requestProcessor.getRequestStats().getLongPollReadStats();
            this.reqStats = requestProcessor.getRequestStats().getLongPollReadRequestStats();
        } else {
            this.readStats = requestProcessor.getRequestStats().getReadEntryStats();
            this.reqStats = requestProcessor.getRequestStats().getReadRequestStats();
        }
        this.fenceThreadPool = fenceThreadPool;
        this.lastPhaseStartTime = Stopwatch.createStarted();
    }

    protected Long getPreviousLAC() {
        if (this.readRequest.hasPreviousLAC()) {
            return this.readRequest.getPreviousLAC();
        }
        return null;
    }

    protected void handleReadResultForFenceRead(final ByteBuf entryBody, final BookkeeperProtocol.ReadResponse.Builder readResponseBuilder, final long entryId, final Stopwatch startTimeSw) {
        this.lastPhaseStartTime.reset().start();
        if (null != this.fenceThreadPool) {
            this.fenceResult.whenCompleteAsync(new FutureEventListener<Boolean>(){

                @Override
                public void onSuccess(Boolean result) {
                    ReadEntryProcessorV3.this.sendFenceResponse(readResponseBuilder, entryBody, result, startTimeSw);
                }

                @Override
                public void onFailure(Throwable t) {
                    LOG.error("Fence request for ledgerId {} entryId {} encountered exception", new Object[]{ReadEntryProcessorV3.this.ledgerId, entryId, t});
                    ReadEntryProcessorV3.this.sendFenceResponse(readResponseBuilder, entryBody, false, startTimeSw);
                }
            }, (Executor)this.fenceThreadPool);
        } else {
            boolean success = false;
            try {
                success = this.fenceResult.get(1000L, TimeUnit.MILLISECONDS);
            }
            catch (Throwable t) {
                LOG.error("Fence request for ledgerId {} entryId {} encountered exception : ", new Object[]{this.readRequest.getLedgerId(), this.readRequest.getEntryId(), t});
            }
            this.sendFenceResponse(readResponseBuilder, entryBody, success, startTimeSw);
        }
    }

    protected BookkeeperProtocol.ReadResponse readEntry(BookkeeperProtocol.ReadResponse.Builder readResponseBuilder, long entryId, Stopwatch startTimeSw) throws IOException, BookieException {
        return this.readEntry(readResponseBuilder, entryId, false, startTimeSw);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected BookkeeperProtocol.ReadResponse readEntry(BookkeeperProtocol.ReadResponse.Builder readResponseBuilder, long entryId, boolean readLACPiggyBack, Stopwatch startTimeSw) throws IOException, BookieException {
        ByteBuf entryBody = this.requestProcessor.getBookie().readEntry(this.ledgerId, entryId);
        if (null != this.fenceResult) {
            this.handleReadResultForFenceRead(entryBody, readResponseBuilder, entryId, startTimeSw);
            return null;
        }
        try {
            readResponseBuilder.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
            if (readLACPiggyBack) {
                readResponseBuilder.setEntryId(entryId);
            } else {
                long knownLAC = this.requestProcessor.getBookie().readLastAddConfirmed(this.ledgerId);
                readResponseBuilder.setMaxLAC(knownLAC);
            }
            this.registerSuccessfulEvent(this.readStats, startTimeSw);
            readResponseBuilder.setStatus(BookkeeperProtocol.StatusCode.EOK);
            BookkeeperProtocol.ReadResponse readResponse = readResponseBuilder.build();
            return readResponse;
        }
        finally {
            ReferenceCountUtil.release(entryBody);
        }
    }

    protected BookkeeperProtocol.ReadResponse getReadResponse() {
        Stopwatch startTimeSw = Stopwatch.createStarted();
        BookkeeperProtocol.ReadResponse.Builder readResponse = BookkeeperProtocol.ReadResponse.newBuilder().setLedgerId(this.ledgerId).setEntryId(this.entryId);
        try {
            if (RequestUtils.isFenceRequest(this.readRequest)) {
                LOG.info("Ledger fence request received for ledger: {} from address: {}", (Object)this.ledgerId, (Object)this.channel.remoteAddress());
                if (!this.readRequest.hasMasterKey()) {
                    LOG.error("Fence ledger request received without master key for ledger:{} from address: {}", (Object)this.ledgerId, (Object)this.channel.remoteAddress());
                    throw BookieException.create(-1);
                }
                byte[] masterKey = this.readRequest.getMasterKey().toByteArray();
                this.fenceResult = this.requestProcessor.bookie.fenceLedger(this.ledgerId, masterKey);
            }
            return this.readEntry(readResponse, this.entryId, startTimeSw);
        }
        catch (Bookie.NoLedgerException e) {
            if (RequestUtils.isFenceRequest(this.readRequest)) {
                LOG.info("No ledger found reading entry {} when fencing ledger {}", (Object)this.entryId, (Object)this.ledgerId);
            } else if (this.entryId != -1L) {
                LOG.info("No ledger found while reading entry: {} from ledger: {}", (Object)this.entryId, (Object)this.ledgerId);
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("No ledger found while reading entry: {} from ledger: {}", (Object)this.entryId, (Object)this.ledgerId);
            }
            return this.buildResponse(readResponse, BookkeeperProtocol.StatusCode.ENOLEDGER, startTimeSw);
        }
        catch (Bookie.NoEntryException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("No entry found while reading entry: {} from ledger: {}", (Object)this.entryId, (Object)this.ledgerId);
            }
            return this.buildResponse(readResponse, BookkeeperProtocol.StatusCode.ENOENTRY, startTimeSw);
        }
        catch (IOException e) {
            LOG.error("IOException while reading entry: {} from ledger {} ", new Object[]{this.entryId, this.ledgerId, e});
            return this.buildResponse(readResponse, BookkeeperProtocol.StatusCode.EIO, startTimeSw);
        }
        catch (BookieException.DataUnknownException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Ledger has unknown state for entry: {} from ledger {}", (Object)this.entryId, (Object)this.ledgerId);
            }
            return this.buildResponse(readResponse, BookkeeperProtocol.StatusCode.EUNKNOWNLEDGERSTATE, startTimeSw);
        }
        catch (BookieException e) {
            LOG.error("Unauthorized access to ledger:{} while reading entry:{} in request from address: {}", new Object[]{this.ledgerId, this.entryId, this.channel.remoteAddress()});
            return this.buildResponse(readResponse, BookkeeperProtocol.StatusCode.EUA, startTimeSw);
        }
    }

    @Override
    public void safeRun() {
        this.requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent(MathUtils.elapsedNanos(this.enqueueNanos), TimeUnit.NANOSECONDS);
        if (!this.channel.isOpen()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping read request for closed channel: {}", (Object)this.channel);
            }
            this.requestProcessor.onReadRequestFinish();
            return;
        }
        if (!this.isVersionCompatible()) {
            BookkeeperProtocol.ReadResponse readResponse = BookkeeperProtocol.ReadResponse.newBuilder().setLedgerId(this.ledgerId).setEntryId(this.entryId).setStatus(BookkeeperProtocol.StatusCode.EBADVERSION).build();
            this.sendResponse(readResponse);
            return;
        }
        this.executeOp();
    }

    protected void executeOp() {
        BookkeeperProtocol.ReadResponse readResponse = this.getReadResponse();
        if (null != readResponse) {
            this.sendResponse(readResponse);
        }
    }

    private void getFenceResponse(BookkeeperProtocol.ReadResponse.Builder readResponse, ByteBuf entryBody, boolean fenceResult) {
        BookkeeperProtocol.StatusCode status;
        if (!fenceResult) {
            status = BookkeeperProtocol.StatusCode.EIO;
            this.registerFailedEvent(this.requestProcessor.getRequestStats().getFenceReadWaitStats(), this.lastPhaseStartTime);
        } else {
            status = BookkeeperProtocol.StatusCode.EOK;
            readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
            this.registerSuccessfulEvent(this.requestProcessor.getRequestStats().getFenceReadWaitStats(), this.lastPhaseStartTime);
        }
        if (null != entryBody) {
            ReferenceCountUtil.release(entryBody);
        }
        readResponse.setStatus(status);
    }

    private void sendFenceResponse(BookkeeperProtocol.ReadResponse.Builder readResponse, ByteBuf entryBody, boolean fenceResult, Stopwatch startTimeSw) {
        this.getFenceResponse(readResponse, entryBody, fenceResult);
        this.registerEvent(!fenceResult, this.requestProcessor.getRequestStats().getFenceReadEntryStats(), startTimeSw);
        this.sendResponse(readResponse.build());
    }

    protected BookkeeperProtocol.ReadResponse buildResponse(BookkeeperProtocol.ReadResponse.Builder readResponseBuilder, BookkeeperProtocol.StatusCode statusCode, Stopwatch startTimeSw) {
        this.registerEvent(!statusCode.equals(BookkeeperProtocol.StatusCode.EOK), this.readStats, startTimeSw);
        readResponseBuilder.setStatus(statusCode);
        return readResponseBuilder.build();
    }

    protected void sendResponse(BookkeeperProtocol.ReadResponse readResponse) {
        BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder().setHeader(this.getHeader()).setStatus(readResponse.getStatus()).setReadResponse(readResponse);
        this.sendResponse(response.getStatus(), response.build(), this.reqStats);
        this.requestProcessor.onReadRequestFinish();
    }

    protected void registerSuccessfulEvent(OpStatsLogger statsLogger, Stopwatch startTime) {
        this.registerEvent(false, statsLogger, startTime);
    }

    protected void registerFailedEvent(OpStatsLogger statsLogger, Stopwatch startTime) {
        this.registerEvent(true, statsLogger, startTime);
    }

    protected void registerEvent(boolean failed, OpStatsLogger statsLogger, Stopwatch startTime) {
        if (failed) {
            statsLogger.registerFailedEvent(startTime.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
        } else {
            statsLogger.registerSuccessfulEvent(startTime.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
        }
    }

    @Override
    public String toString() {
        return RequestUtils.toSafeString(this.request);
    }
}

