package kafka.network;

import java.io.EOFException;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import kafka.network.RequestChannel;
import kafka.utils.SystemTime$;
import kafka.utils.Time;
import scala.Function0;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.ObjectRef;

/* compiled from: SocketServer.scala */
@ScalaSignature(bytes = "\u0006\u0001A4Q!\u0001\u0002\u0001\t\u0019\u0011\u0011\u0002\u0015:pG\u0016\u001c8o\u001c:\u000b\u0005\r!\u0011a\u00028fi^|'o\u001b\u0006\u0002\u000b\u0005)1.\u00194lCN\u0011\u0001a\u0002\t\u0003\u0011%i\u0011AA\u0005\u0003\u0015\t\u0011A#\u00112tiJ\f7\r^*feZ,'\u000f\u00165sK\u0006$\u0007\u0002\u0003\u0007\u0001\u0005\u000b\u0007I\u0011\u0001\b\u0002\u0005%$7\u0001A\u000b\u0002\u001fA\u0011\u0001cE\u0007\u0002#)\t!#A\u0003tG\u0006d\u0017-\u0003\u0002\u0015#\t\u0019\u0011J\u001c;\t\u0011Y\u0001!\u0011!Q\u0001\n=\t1!\u001b3!\u0011!A\u0002A!b\u0001\n\u0003I\u0012\u0001\u0002;j[\u0016,\u0012A\u0007\t\u00037yi\u0011\u0001\b\u0006\u0003;\u0011\tQ!\u001e;jYNL!a\b\u000f\u0003\tQKW.\u001a\u0005\tC\u0001\u0011\t\u0011)A\u00055\u0005)A/[7fA!A1\u0005\u0001BC\u0002\u0013\u0005a\"\u0001\bnCb\u0014V-];fgR\u001c\u0016N_3\t\u0011\u0015\u0002!\u0011!Q\u0001\n=\tq\"\\1y%\u0016\fX/Z:u'&TX\r\t\u0005\tO\u0001\u0011)\u0019!C\u0001Q\u0005q!/Z9vKN$8\t[1o]\u0016dW#A\u0015\u0011\u0005!Q\u0013BA\u0016\u0003\u00059\u0011V-];fgR\u001c\u0005.\u00198oK2D\u0001\"\f\u0001\u0003\u0002\u0003\u0006I!K\u0001\u0010e\u0016\fX/Z:u\u0007\"\fgN\\3mA!)q\u0006\u0001C\u0001a\u00051A(\u001b8jiz\"R!\r\u001a4iU\u0002\"\u0001\u0003\u0001\t\u000b1q\u0003\u0019A\b\t\u000baq\u0003\u0019\u0001\u000e\t\u000b\rr\u0003\u0019A\b\t\u000b\u001dr\u0003\u0019A\u0015\t\u000f]\u0002!\u0019!C\u0005q\u0005qa.Z<D_:tWm\u0019;j_:\u001cX#A\u001d\u0011\u0007i\n5)D\u0001<\u0015\taT(\u0001\u0006d_:\u001cWO\u001d:f]RT!AP \u0002\tU$\u0018\u000e\u001c\u0006\u0002\u0001\u0006!!.\u0019<b\u0013\t\u00115HA\u000bD_:\u001cWO\u001d:f]Rd\u0015N\\6fIF+X-^3\u0011\u0005\u0011KU\"A#\u000b\u0005\u0019;\u0015\u0001C2iC:tW\r\\:\u000b\u0005!{\u0014a\u00018j_&\u0011!*\u0012\u0002\u000e'>\u001c7.\u001a;DQ\u0006tg.\u001a7\t\r1\u0003\u0001\u0015!\u0003:\u0003=qWm^\"p]:,7\r^5p]N\u0004\u0003\"\u0002(\u0001\t\u0003z\u0015a\u0001:v]R\t\u0001\u000b\u0005\u0002\u0011#&\u0011!+\u0005\u0002\u0005+:LG\u000fC\u0003U\u0001\u0011%q*A\nqe>\u001cWm]:OK^\u0014Vm\u001d9p]N,7\u000fC\u0003W\u0001\u0011%q+A\u0003dY>\u001cX\r\u0006\u0002Q1\")\u0011,\u0016a\u00015\u0006\u00191.Z=\u0011\u0005\u0011[\u0016B\u0001/F\u00051\u0019V\r\\3di&|gnS3z\u0011\u0015q\u0006\u0001\"\u0001`\u0003\u0019\t7mY3qiR\u0011\u0001\u000b\u0019\u0005\u0006Cv\u0003\raQ\u0001\u000eg>\u001c7.\u001a;DQ\u0006tg.\u001a7\t\u000b\r\u0004A\u0011B(\u0002/\r|gNZ5hkJ,g*Z<D_:tWm\u0019;j_:\u001c\b\"B3\u0001\t\u00031\u0017\u0001\u0002:fC\u0012$\"\u0001U4\t\u000be#\u0007\u0019\u0001.\t\u000b%\u0004A\u0011\u00016\u0002\u000b]\u0014\u0018\u000e^3\u0015\u0005A[\u0007\"B-i\u0001\u0004Q\u0006\"B7\u0001\t\u0013q\u0017AC2iC:tW\r\u001c$peR\u00111i\u001c\u0005\u000632\u0004\rA\u0017")
/* loaded from: input_file:kafka/network/Processor.class */
public class Processor extends AbstractServerThread {
    private final int id;
    private final Time time;
    private final int maxRequestSize;
    private final RequestChannel requestChannel;
    private final ConcurrentLinkedQueue<SocketChannel> newConnections = new ConcurrentLinkedQueue<>();

    public int id() {
        return this.id;
    }

    public Time time() {
        return this.time;
    }

    public int maxRequestSize() {
        return this.maxRequestSize;
    }

    public RequestChannel requestChannel() {
        return this.requestChannel;
    }

    private ConcurrentLinkedQueue<SocketChannel> newConnections() {
        return this.newConnections;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v20, types: [T, java.nio.channels.SelectionKey] */
    @Override // java.lang.Runnable
    public void run() {
        startupComplete();
        loop0: while (isRunning()) {
            configureNewConnections();
            processNewResponses();
            long milliseconds = SystemTime$.MODULE$.milliseconds();
            int select = selector().select(300L);
            trace((Function0<String>) new Processor$$anonfun$run$7(this, milliseconds));
            if (select > 0) {
                Iterator<SelectionKey> it2 = selector().selectedKeys().iterator();
                while (it2.hasNext() && isRunning()) {
                    ObjectRef objectRef = new ObjectRef(null);
                    try {
                        objectRef.elem = it2.next();
                        it2.remove();
                        if (((SelectionKey) objectRef.elem).isReadable()) {
                            read((SelectionKey) objectRef.elem);
                        } else if (((SelectionKey) objectRef.elem).isWritable()) {
                            write((SelectionKey) objectRef.elem);
                        } else {
                            if (((SelectionKey) objectRef.elem).isValid()) {
                                throw new IllegalStateException("Unrecognized key state for processor thread.");
                                break loop0;
                            }
                            close((SelectionKey) objectRef.elem);
                        }
                    } catch (EOFException e) {
                        info((Function0<String>) new Processor$$anonfun$run$8(this, objectRef));
                        close((SelectionKey) objectRef.elem);
                    } catch (InvalidRequestException e2) {
                        info((Function0<String>) new Processor$$anonfun$run$9(this, objectRef, e2));
                        close((SelectionKey) objectRef.elem);
                    } catch (Throwable th) {
                        error(new Processor$$anonfun$run$10(this, objectRef), new Processor$$anonfun$run$11(this, th));
                        close((SelectionKey) objectRef.elem);
                    }
                }
            }
        }
        debug((Function0<String>) new Processor$$anonfun$run$12(this));
        swallowError(new Processor$$anonfun$run$3(this));
        shutdownComplete();
    }

    /*  JADX ERROR: JadxRuntimeException in pass: BlockSplitter
        jadx.core.utils.exceptions.JadxRuntimeException: Unexpected missing predecessor for block: B:3:0x0016
        	at jadx.core.dex.visitors.blocks.BlockSplitter.addTempConnectionsForExcHandlers(BlockSplitter.java:275)
        	at jadx.core.dex.visitors.blocks.BlockSplitter.visit(BlockSplitter.java:68)
        */
    private void processNewResponses() {
        /*
            Method dump skipped, instructions count: 374
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.network.Processor.processNewResponses():void");
    }

    private void close(SelectionKey selectionKey) {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        debug((Function0<String>) new Processor$$anonfun$close$4(this, socketChannel));
        swallowError(new Processor$$anonfun$close$1(this, socketChannel));
        swallowError(new Processor$$anonfun$close$2(this, socketChannel));
        selectionKey.attach(null);
        swallowError(new Processor$$anonfun$close$3(this, selectionKey));
    }

    public void accept(SocketChannel socketChannel) {
        newConnections().add(socketChannel);
        wakeup();
    }

    private void configureNewConnections() {
        while (newConnections().size() > 0) {
            SocketChannel poll = newConnections().poll();
            debug((Function0<String>) new Processor$$anonfun$configureNewConnections$1(this, poll));
            poll.register(selector(), 1);
        }
    }

    public void read(SelectionKey selectionKey) {
        SocketChannel kafka$network$Processor$$channelFor = kafka$network$Processor$$channelFor(selectionKey);
        Receive receive = (Receive) selectionKey.attachment();
        if (selectionKey.attachment() == null) {
            receive = new BoundedByteBufferReceive(maxRequestSize());
            selectionKey.attach(receive);
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        int readFrom = receive.readFrom(kafka$network$Processor$$channelFor);
        SocketAddress remoteSocketAddress = kafka$network$Processor$$channelFor.socket().getRemoteSocketAddress();
        trace((Function0<String>) new Processor$$anonfun$read$1(this, readFrom, remoteSocketAddress));
        if (readFrom < 0) {
            close(selectionKey);
            return;
        }
        if (!receive.complete()) {
            trace((Function0<String>) new Processor$$anonfun$read$2(this, kafka$network$Processor$$channelFor));
            selectionKey.interestOps(1);
            wakeup();
        } else {
            requestChannel().sendRequest(new RequestChannel.Request(id(), selectionKey, receive.buffer(), time().milliseconds(), remoteSocketAddress));
            selectionKey.attach(null);
            selectionKey.interestOps(selectionKey.interestOps() & (1 ^ (-1)));
        }
    }

    public void write(SelectionKey selectionKey) {
        SocketChannel kafka$network$Processor$$channelFor = kafka$network$Processor$$channelFor(selectionKey);
        RequestChannel.Response response = (RequestChannel.Response) selectionKey.attachment();
        Send responseSend = response.responseSend();
        if (responseSend == null) {
            throw new IllegalStateException("Registered for write interest but no response attached to key.");
        }
        trace((Function0<String>) new Processor$$anonfun$write$1(this, selectionKey, kafka$network$Processor$$channelFor, responseSend.writeTo(kafka$network$Processor$$channelFor)));
        if (!responseSend.complete()) {
            trace((Function0<String>) new Processor$$anonfun$write$3(this, kafka$network$Processor$$channelFor));
            selectionKey.interestOps(4);
            wakeup();
        } else {
            response.request().updateRequestMetrics();
            selectionKey.attach(null);
            trace((Function0<String>) new Processor$$anonfun$write$2(this, kafka$network$Processor$$channelFor));
            selectionKey.interestOps(1);
        }
    }

    public SocketChannel kafka$network$Processor$$channelFor(SelectionKey selectionKey) {
        return (SocketChannel) selectionKey.channel();
    }

    public Processor(int i, Time time, int i2, RequestChannel requestChannel) {
        this.id = i;
        this.time = time;
        this.maxRequestSize = i2;
        this.requestChannel = requestChannel;
    }
}
