package org.apache.bookkeeper.proto;

import java.util.concurrent.RejectedExecutionException;
import org.apache.bookkeeper.bookie.BookKeeperServerStats;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.shade.io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/proto/BookieRequestProcessor.class */
public class BookieRequestProcessor implements RequestProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
    private final ServerConfiguration serverCfg;
    final Bookie bookie;
    private final OrderedSafeExecutor readThreadPool;
    private final OrderedSafeExecutor writeThreadPool;
    private final BKStats bkStats = BKStats.getInstance();
    private final boolean statsEnabled;
    final OpStatsLogger addRequestStats;
    final OpStatsLogger addEntryStats;
    final OpStatsLogger readRequestStats;
    final OpStatsLogger readEntryStats;

    public BookieRequestProcessor(ServerConfiguration serverConfiguration, Bookie bookie, StatsLogger statsLogger) {
        this.serverCfg = serverConfiguration;
        this.bookie = bookie;
        this.readThreadPool = createExecutor(this.serverCfg.getNumReadWorkerThreads(), "BookieReadThread-" + serverConfiguration.getBookiePort(), serverConfiguration.getMaxPendingReadRequestPerThread());
        this.writeThreadPool = createExecutor(this.serverCfg.getNumAddWorkerThreads(), "BookieWriteThread-" + serverConfiguration.getBookiePort(), serverConfiguration.getMaxPendingAddRequestPerThread());
        this.statsEnabled = serverConfiguration.isStatisticsEnabled();
        this.addEntryStats = statsLogger.getOpStatsLogger("ADD_ENTRY");
        this.addRequestStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.ADD_ENTRY_REQUEST);
        this.readEntryStats = statsLogger.getOpStatsLogger("READ_ENTRY");
        this.readRequestStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.READ_ENTRY_REQUEST);
    }

    @Override // org.apache.bookkeeper.processor.RequestProcessor
    public void close() {
        shutdownExecutor(this.writeThreadPool);
        shutdownExecutor(this.readThreadPool);
    }

    private OrderedSafeExecutor createExecutor(int i, String str, int i2) {
        if (i <= 0) {
            return null;
        }
        return new OrderedSafeExecutor(i, str, i2);
    }

    private void shutdownExecutor(OrderedSafeExecutor orderedSafeExecutor) {
        if (null != orderedSafeExecutor) {
            orderedSafeExecutor.shutdown();
        }
    }

    @Override // org.apache.bookkeeper.processor.RequestProcessor
    public void processRequest(Object obj, Channel channel) {
        if (!(obj instanceof BookkeeperProtocol.Request)) {
            BookieProtocol.Request request = (BookieProtocol.Request) obj;
            switch (request.getOpCode()) {
                case 1:
                    processAddRequest(request, channel);
                    return;
                case 2:
                    processReadRequest(request, channel);
                    return;
                default:
                    LOG.error("Unknown op type {}, sending error", Byte.valueOf(request.getOpCode()));
                    channel.writeAndFlush(ResponseBuilder.buildErrorResponse(100, request));
                    if (this.statsEnabled) {
                        this.bkStats.getOpStats(2).incrementFailedOps();
                        return;
                    }
                    return;
            }
        }
        BookkeeperProtocol.Request request2 = (BookkeeperProtocol.Request) obj;
        BookkeeperProtocol.BKPacketHeader header = request2.getHeader();
        switch (header.getOperation()) {
            case ADD_ENTRY:
                processAddRequestV3(request2, channel);
                return;
            case READ_ENTRY:
                processReadRequestV3(request2, channel);
                return;
            default:
                LOG.info("Unknown operation type {}", header.getOperation());
                channel.writeAndFlush(BookkeeperProtocol.Response.newBuilder().setHeader(request2.getHeader()).setStatus(BookkeeperProtocol.StatusCode.EBADREQ).build());
                if (this.statsEnabled) {
                    this.bkStats.getOpStats(2).incrementFailedOps();
                    return;
                }
                return;
        }
    }

    private void processAddRequestV3(BookkeeperProtocol.Request request, Channel channel) {
        WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, channel, this);
        boolean z = request.getAddRequest().hasFlag() && request.getAddRequest().getFlag() == BookkeeperProtocol.AddRequest.Flag.RECOVERY_ADD;
        if (null == this.writeThreadPool || z) {
            writeEntryProcessorV3.run();
            return;
        }
        try {
            this.writeThreadPool.submitOrdered(request.getAddRequest().getLedgerId(), writeEntryProcessorV3);
        } catch (RejectedExecutionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests", Long.valueOf(request.getAddRequest().getLedgerId()), Long.valueOf(request.getAddRequest().getEntryId()));
            }
            BookkeeperProtocol.AddResponse.Builder status = BookkeeperProtocol.AddResponse.newBuilder().setLedgerId(request.getAddRequest().getLedgerId()).setEntryId(request.getAddRequest().getEntryId()).setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
            writeEntryProcessorV3.sendResponse(status.getStatus(), BookkeeperProtocol.Response.newBuilder().setHeader(writeEntryProcessorV3.getHeader()).setStatus(status.getStatus()).setAddResponse(status).build(), this.addRequestStats);
        }
    }

    private void processReadRequestV3(BookkeeperProtocol.Request request, Channel channel) {
        ReadEntryProcessorV3 readEntryProcessorV3 = new ReadEntryProcessorV3(request, channel, this);
        boolean z = request.getReadRequest().hasFlag() && (request.getReadRequest().getFlag() == BookkeeperProtocol.ReadRequest.Flag.RECOVERY_READ || request.getReadRequest().getFlag() == BookkeeperProtocol.ReadRequest.Flag.FENCE_LEDGER);
        if (null == this.readThreadPool || z) {
            readEntryProcessorV3.run();
            return;
        }
        try {
            this.readThreadPool.submitOrdered(request.getReadRequest().getLedgerId(), readEntryProcessorV3);
        } catch (RejectedExecutionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests", Long.valueOf(request.getReadRequest().getLedgerId()), Long.valueOf(request.getReadRequest().getEntryId()));
            }
            BookkeeperProtocol.ReadResponse.Builder status = BookkeeperProtocol.ReadResponse.newBuilder().setLedgerId(request.getAddRequest().getLedgerId()).setEntryId(request.getAddRequest().getEntryId()).setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
            readEntryProcessorV3.sendResponse(status.getStatus(), BookkeeperProtocol.Response.newBuilder().setHeader(readEntryProcessorV3.getHeader()).setStatus(status.getStatus()).setReadResponse(status).build(), this.readRequestStats);
        }
    }

    private void processAddRequest(BookieProtocol.Request request, Channel channel) {
        WriteEntryProcessor create = WriteEntryProcessor.create(request, channel, this);
        boolean z = (request.flags & 2) == 2;
        if (null == this.writeThreadPool || z) {
            create.run();
            return;
        }
        try {
            this.writeThreadPool.submitOrdered(request.getLedgerId(), create);
        } catch (RejectedExecutionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests", Long.valueOf(request.ledgerId), Long.valueOf(request.entryId));
            }
            create.sendResponse(106, ResponseBuilder.buildErrorResponse(106, request), this.addRequestStats);
        }
    }

    private void processReadRequest(BookieProtocol.Request request, Channel channel) {
        ReadEntryProcessor create = ReadEntryProcessor.create(request, channel, this);
        boolean z = (request.flags & 2) == 2 || (request.flags & 1) == 1;
        if (null == this.readThreadPool || z) {
            create.run();
            return;
        }
        try {
            this.readThreadPool.submitOrdered(request.getLedgerId(), create);
        } catch (RejectedExecutionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests", Long.valueOf(request.ledgerId), Long.valueOf(request.entryId));
            }
            create.sendResponse(106, ResponseBuilder.buildErrorResponse(106, request), this.readRequestStats);
        }
    }
}
