package kafka.network;

import java.io.EOFException;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import kafka.network.RequestChannel;
import kafka.utils.SystemTime$;
import kafka.utils.Time;
import scala.Function0;
import scala.ScalaObject;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.ObjectRef;

/* compiled from: SocketServer.scala */
@ScalaSignature(bytes = "\u0006\u0001M4\u0011\"\u0001\u0002\u0005\u0002\u0003\u0005\t\u0001\u0002\u0004\u0003\u0013A\u0013xnY3tg>\u0014(BA\u0002\u0005\u0003\u001dqW\r^<pe.T\u0011!B\u0001\u0006W\u000647.Y\n\u0004\u0001\u001dY\u0001C\u0001\u0005\n\u001b\u0005\u0011\u0011B\u0001\u0006\u0003\u0005Q\t%m\u001d;sC\u000e$8+\u001a:wKJ$\u0006N]3bIB\u0011AbD\u0007\u0002\u001b)\ta\"A\u0003tG\u0006d\u0017-\u0003\u0002\u0011\u001b\tY1kY1mC>\u0013'.Z2u\u0011!\u0011\u0002A!b\u0001\n\u0003!\u0012AA5e\u0007\u0001)\u0012!\u0006\t\u0003\u0019YI!aF\u0007\u0003\u0007%sG\u000f\u0003\u0005\u001a\u0001\t\u0005\t\u0015!\u0003\u0016\u0003\rIG\r\t\u0005\t7\u0001\u0011)\u0019!C\u00019\u0005!A/[7f+\u0005i\u0002C\u0001\u0010\"\u001b\u0005y\"B\u0001\u0011\u0005\u0003\u0015)H/\u001b7t\u0013\t\u0011sD\u0001\u0003US6,\u0007\u0002\u0003\u0013\u0001\u0005\u0003\u0005\u000b\u0011B\u000f\u0002\u000bQLW.\u001a\u0011\t\u0011\u0019\u0002!Q1A\u0005\u0002Q\ta\"\\1y%\u0016\fX/Z:u'&TX\r\u0003\u0005)\u0001\t\u0005\t\u0015!\u0003\u0016\u0003=i\u0017\r\u001f*fcV,7\u000f^*ju\u0016\u0004\u0003\u0002\u0003\u0016\u0001\u0005\u000b\u0007I\u0011A\u0016\u0002\u001dI,\u0017/^3ti\u000eC\u0017M\u001c8fYV\tA\u0006\u0005\u0002\t[%\u0011aF\u0001\u0002\u000f%\u0016\fX/Z:u\u0007\"\fgN\\3m\u0011!\u0001\u0004A!A!\u0002\u0013a\u0013a\u0004:fcV,7\u000f^\"iC:tW\r\u001c\u0011\t\u000bI\u0002A\u0011A\u001a\u0002\rqJg.\u001b;?)\u0015!TGN\u001c9!\tA\u0001\u0001C\u0003\u0013c\u0001\u0007Q\u0003C\u0003\u001cc\u0001\u0007Q\u0004C\u0003'c\u0001\u0007Q\u0003C\u0003+c\u0001\u0007A\u0006C\u0004;\u0001\t\u0007I\u0011B\u001e\u0002\u001d9,woQ8o]\u0016\u001cG/[8ogV\tA\bE\u0002>\t\u001ak\u0011A\u0010\u0006\u0003\u007f\u0001\u000b!bY8oGV\u0014(/\u001a8u\u0015\t\t%)\u0001\u0003vi&d'\"A\"\u0002\t)\fg/Y\u0005\u0003\u000bz\u0012QcQ8oGV\u0014(/\u001a8u\u0019&t7.\u001a3Rk\u0016,X\r\u0005\u0002H\u00196\t\u0001J\u0003\u0002J\u0015\u0006A1\r[1o]\u0016d7O\u0003\u0002L\u0005\u0006\u0019a.[8\n\u00055C%!D*pG.,Go\u00115b]:,G\u000e\u0003\u0004P\u0001\u0001\u0006I\u0001P\u0001\u0010]\u0016<8i\u001c8oK\u000e$\u0018n\u001c8tA!)\u0011\u000b\u0001C!%\u0006\u0019!/\u001e8\u0015\u0003M\u0003\"\u0001\u0004+\n\u0005Uk!\u0001B+oSRDQa\u0016\u0001\u0005\nI\u000b1\u0003\u001d:pG\u0016\u001c8OT3x%\u0016\u001c\bo\u001c8tKNDQ!\u0017\u0001\u0005\ni\u000bQa\u00197pg\u0016$\"aU.\t\u000bqC\u0006\u0019A/\u0002\u0007-,\u0017\u0010\u0005\u0002H=&\u0011q\f\u0013\u0002\r'\u0016dWm\u0019;j_:\\U-\u001f\u0005\u0006C\u0002!\tAY\u0001\u0007C\u000e\u001cW\r\u001d;\u0015\u0005M\u001b\u0007\"\u00023a\u0001\u00041\u0015!D:pG.,Go\u00115b]:,G\u000eC\u0003g\u0001\u0011%!+A\fd_:4\u0017nZ;sK:+woQ8o]\u0016\u001cG/[8og\")\u0001\u000e\u0001C\u0001S\u0006!!/Z1e)\t\u0019&\u000eC\u0003]O\u0002\u0007Q\fC\u0003m\u0001\u0011\u0005Q.A\u0003xe&$X\r\u0006\u0002T]\")Al\u001ba\u0001;\")\u0001\u000f\u0001C\u0005c\u0006Q1\r[1o]\u0016dgi\u001c:\u0015\u0005\u0019\u0013\b\"\u0002/p\u0001\u0004i\u0006")
/* loaded from: input_file:kafka/network/Processor.class */
public class Processor extends AbstractServerThread implements ScalaObject {
    private final int id;
    private final Time time;
    private final int maxRequestSize;
    private final RequestChannel requestChannel;
    private final ConcurrentLinkedQueue<SocketChannel> newConnections = new ConcurrentLinkedQueue<>();

    public int id() {
        return this.id;
    }

    public Time time() {
        return this.time;
    }

    public int maxRequestSize() {
        return this.maxRequestSize;
    }

    public RequestChannel requestChannel() {
        return this.requestChannel;
    }

    private ConcurrentLinkedQueue<SocketChannel> newConnections() {
        return this.newConnections;
    }

    @Override // java.lang.Runnable
    public void run() {
        startupComplete();
        loop0: while (isRunning()) {
            configureNewConnections();
            processNewResponses();
            long milliseconds = SystemTime$.MODULE$.milliseconds();
            int select = selector().select(300L);
            trace((Function0<String>) new Processor$$anonfun$run$7(this, milliseconds));
            if (select > 0) {
                Iterator<SelectionKey> it = selector().selectedKeys().iterator();
                while (it.hasNext() && isRunning()) {
                    ObjectRef objectRef = new ObjectRef((Object) null);
                    try {
                        objectRef.elem = it.next();
                        it.remove();
                        if (((SelectionKey) objectRef.elem).isReadable()) {
                            read((SelectionKey) objectRef.elem);
                        } else if (((SelectionKey) objectRef.elem).isWritable()) {
                            write((SelectionKey) objectRef.elem);
                        } else {
                            if (((SelectionKey) objectRef.elem).isValid()) {
                                throw new IllegalStateException("Unrecognized key state for processor thread.");
                                break loop0;
                            }
                            close((SelectionKey) objectRef.elem);
                        }
                    } catch (EOFException e) {
                        info((Function0<String>) new Processor$$anonfun$run$8(this, objectRef));
                        close((SelectionKey) objectRef.elem);
                    } catch (InvalidRequestException e2) {
                        info((Function0<String>) new Processor$$anonfun$run$9(this, objectRef, e2));
                        close((SelectionKey) objectRef.elem);
                    } catch (Throwable th) {
                        error(new Processor$$anonfun$run$10(this, objectRef), new Processor$$anonfun$run$11(this, th));
                        close((SelectionKey) objectRef.elem);
                    }
                }
            }
        }
        debug((Function0<String>) new Processor$$anonfun$run$12(this));
        swallowError(new Processor$$anonfun$run$3(this));
        shutdownComplete();
    }

    /*  JADX ERROR: JadxRuntimeException in pass: BlockSplitter
        jadx.core.utils.exceptions.JadxRuntimeException: Unexpected missing predecessor for block: B:3:0x0016
        	at jadx.core.dex.visitors.blocks.BlockSplitter.addTempConnectionsForExcHandlers(BlockSplitter.java:275)
        	at jadx.core.dex.visitors.blocks.BlockSplitter.visit(BlockSplitter.java:68)
        */
    private void processNewResponses() {
        /*
            Method dump skipped, instructions count: 226
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.network.Processor.processNewResponses():void");
    }

    private void close(SelectionKey selectionKey) {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        debug((Function0<String>) new Processor$$anonfun$close$4(this, socketChannel));
        swallowError(new Processor$$anonfun$close$1(this, socketChannel));
        swallowError(new Processor$$anonfun$close$2(this, socketChannel));
        selectionKey.attach(null);
        swallowError(new Processor$$anonfun$close$3(this, selectionKey));
    }

    public void accept(SocketChannel socketChannel) {
        newConnections().add(socketChannel);
        wakeup();
    }

    private void configureNewConnections() {
        while (newConnections().size() > 0) {
            SocketChannel poll = newConnections().poll();
            debug((Function0<String>) new Processor$$anonfun$configureNewConnections$1(this, poll));
            poll.register(selector(), 1);
        }
    }

    public void read(SelectionKey selectionKey) {
        SocketChannel kafka$network$Processor$$channelFor = kafka$network$Processor$$channelFor(selectionKey);
        Receive receive = (Receive) selectionKey.attachment();
        if (selectionKey.attachment() == null) {
            receive = new BoundedByteBufferReceive(maxRequestSize());
            selectionKey.attach(receive);
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        int readFrom = receive.readFrom(kafka$network$Processor$$channelFor);
        SocketAddress remoteSocketAddress = kafka$network$Processor$$channelFor.socket().getRemoteSocketAddress();
        trace((Function0<String>) new Processor$$anonfun$read$1(this, readFrom, remoteSocketAddress));
        if (readFrom < 0) {
            close(selectionKey);
            return;
        }
        if (!receive.complete()) {
            trace((Function0<String>) new Processor$$anonfun$read$2(this, kafka$network$Processor$$channelFor));
            selectionKey.interestOps(1);
            wakeup();
        } else {
            requestChannel().sendRequest(new RequestChannel.Request(id(), selectionKey, receive.buffer(), time().milliseconds(), remoteSocketAddress));
            selectionKey.attach(null);
            selectionKey.interestOps(selectionKey.interestOps() & (1 ^ (-1)));
        }
    }

    public void write(SelectionKey selectionKey) {
        SocketChannel kafka$network$Processor$$channelFor = kafka$network$Processor$$channelFor(selectionKey);
        RequestChannel.Response response = (RequestChannel.Response) selectionKey.attachment();
        Send responseSend = response.responseSend();
        if (responseSend == null) {
            throw new IllegalStateException("Registered for write interest but no response attached to key.");
        }
        trace((Function0<String>) new Processor$$anonfun$write$1(this, selectionKey, kafka$network$Processor$$channelFor, responseSend.writeTo(kafka$network$Processor$$channelFor)));
        if (!responseSend.complete()) {
            trace((Function0<String>) new Processor$$anonfun$write$3(this, kafka$network$Processor$$channelFor));
            selectionKey.interestOps(4);
            wakeup();
        } else {
            response.request().updateRequestMetrics();
            selectionKey.attach(null);
            trace((Function0<String>) new Processor$$anonfun$write$2(this, kafka$network$Processor$$channelFor));
            selectionKey.interestOps(1);
        }
    }

    public final SocketChannel kafka$network$Processor$$channelFor(SelectionKey selectionKey) {
        return (SocketChannel) selectionKey.channel();
    }

    public Processor(int i, Time time, int i2, RequestChannel requestChannel) {
        this.id = i;
        this.time = time;
        this.maxRequestSize = i2;
        this.requestChannel = requestChannel;
    }
}
