package org.apache.samza.system.kafka;

import java.lang.Thread;
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import kafka.api.FetchResponse;
import kafka.api.FetchResponsePartitionData;
import kafka.api.Request$;
import kafka.common.ErrorMapping$;
import kafka.common.NotLeaderForPartitionException;
import kafka.common.TopicAndPartition;
import kafka.common.UnknownTopicOrPartitionException;
import kafka.message.MessageAndOffset;
import kafka.message.MessageSet;
import org.apache.samza.SamzaException;
import org.apache.samza.util.ExponentialSleepStrategy;
import org.apache.samza.util.ExponentialSleepStrategy$;
import org.apache.samza.util.KafkaUtil$;
import org.apache.samza.util.Logging;
import org.apache.samza.util.ThreadNamePrefix$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.concurrent.Map;
import scala.collection.immutable.List;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;
import scala.runtime.LongRef;
import scala.runtime.Nothing$;

/* compiled from: BrokerProxy.scala */
@ScalaSignature(bytes = "\u0006\u0001\t5t!B\u0001\u0003\u0011\u0003i\u0011a\u0003\"s_.,'\u000f\u0015:pqfT!a\u0001\u0003\u0002\u000b-\fgm[1\u000b\u0005\u00151\u0011AB:zgR,WN\u0003\u0002\b\u0011\u0005)1/Y7{C*\u0011\u0011BC\u0001\u0007CB\f7\r[3\u000b\u0003-\t1a\u001c:h\u0007\u0001\u0001\"AD\b\u000e\u0003\t1Q\u0001\u0005\u0002\t\u0002E\u00111B\u0011:pW\u0016\u0014\bK]8ysN\u0011qB\u0005\t\u0003'Yi\u0011\u0001\u0006\u0006\u0002+\u0005)1oY1mC&\u0011q\u0003\u0006\u0002\u0007\u0003:L(+\u001a4\t\u000beyA\u0011\u0001\u000e\u0002\rqJg.\u001b;?)\u0005i\u0001b\u0002\u000f\u0010\u0005\u0004%\t!H\u0001 \u0005J{5*\u0012*`!J{\u0005,W0U\u0011J+\u0015\tR0O\u00036+u\f\u0015*F\r&CV#\u0001\u0010\u0011\u0005}!S\"\u0001\u0011\u000b\u0005\u0005\u0012\u0013\u0001\u00027b]\u001eT\u0011aI\u0001\u0005U\u00064\u0018-\u0003\u0002&A\t11\u000b\u001e:j]\u001eDaaJ\b!\u0002\u0013q\u0012\u0001\t\"S\u001f.+%k\u0018)S\u001fbKv\f\u0016%S\u000b\u0006#uLT!N\u000b~\u0003&+\u0012$J1\u0002Bq!K\b\u0012\u0002\u0013\u0005!&A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$HeN\u000b\u0002W)\u0012Af\f\t\u0003'5J!A\f\u000b\u0003\u0007%sGoK\u00011!\t\td'D\u00013\u0015\t\u0019D'A\u0005v]\u000eDWmY6fI*\u0011Q\u0007F\u0001\u000bC:tw\u000e^1uS>t\u0017BA\u001c3\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a\u0005\bs=\t\n\u0011\"\u0001+\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%q!91hDI\u0001\n\u0003a\u0014a\u0007\u0013mKN\u001c\u0018N\\5uI\u001d\u0014X-\u0019;fe\u0012\"WMZ1vYR$\u0013(F\u0001>U\tqt\u0006\u0005\u0002\u000f\u007f%\u0011\u0001I\u0001\u0002\u0011'R\u0014X-Y7GKR\u001c\u0007nU5{KNDqAQ\b\u0012\u0002\u0013\u0005!&\u0001\u000f%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H%\r\u0019\t\u000f\u0011{\u0011\u0013!C\u0001U\u0005aB\u0005\\3tg&t\u0017\u000e\u001e\u0013he\u0016\fG/\u001a:%I\u00164\u0017-\u001e7uIE\n\u0004b\u0002$\u0010#\u0003%\taR\u0001\u001dI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000fJ\u00193+\u0005A%FA%0!\tq!*\u0003\u0002L\u0005\tIq)\u001a;PM\u001a\u001cX\r\u001e\u0004\u0005!\t\u0001Qj\u0005\u0003M%9\u000b\u0006C\u0001\bP\u0013\t\u0001&A\u0001\u0003U_N\u001c\bC\u0001*V\u001b\u0005\u0019&B\u0001+\u0007\u0003\u0011)H/\u001b7\n\u0005Y\u001b&a\u0002'pO\u001eLgn\u001a\u0005\t12\u0013)\u0019!C\u00013\u0006!\u0001n\\:u+\u0005Q\u0006CA.c\u001d\ta\u0006\r\u0005\u0002^)5\taL\u0003\u0002`\u0019\u00051AH]8pizJ!!\u0019\u000b\u0002\rA\u0013X\rZ3g\u0013\t)3M\u0003\u0002b)!AQ\r\u0014B\u0001B\u0003%!,A\u0003i_N$\b\u0005\u0003\u0005h\u0019\n\u0015\r\u0011\"\u0001i\u0003\u0011\u0001xN\u001d;\u0016\u00031B\u0001B\u001b'\u0003\u0002\u0003\u0006I\u0001L\u0001\u0006a>\u0014H\u000f\t\u0005\t\u000b1\u0013)\u0019!C\u00013\"AQ\u000e\u0014B\u0001B\u0003%!,A\u0004tsN$X-\u001c\u0011\t\u0011=d%Q1A\u0005\u0002e\u000b\u0001b\u00197jK:$\u0018\n\u0012\u0005\tc2\u0013\t\u0011)A\u00055\u0006I1\r\\5f]RLE\t\t\u0005\tg2\u0013)\u0019!C\u0001i\u00069Q.\u001a;sS\u000e\u001cX#A;\u0011\u000591\u0018BA<\u0003\u0005iY\u0015MZ6b'f\u001cH/Z7D_:\u001cX/\\3s\u001b\u0016$(/[2t\u0011!IHJ!A!\u0002\u0013)\u0018\u0001C7fiJL7m\u001d\u0011\t\u0011md%Q1A\u0005\u0002q\f1\"\\3tg\u0006<WmU5oWV\tQ\u0010\u0005\u0002\u000f}&\u0011qP\u0001\u0002\f\u001b\u0016\u001c8/Y4f'&t7\u000eC\u0005\u0002\u00041\u0013\t\u0011)A\u0005{\u0006aQ.Z:tC\u001e,7+\u001b8lA!I\u0011q\u0001'\u0003\u0006\u0004%\t\u0001[\u0001\bi&lWm\\;u\u0011%\tY\u0001\u0014B\u0001B\u0003%A&\u0001\u0005uS6,w.\u001e;!\u0011%\ty\u0001\u0014BC\u0002\u0013\u0005\u0001.\u0001\u0006ck\u001a4WM]*ju\u0016D\u0011\"a\u0005M\u0005\u0003\u0005\u000b\u0011\u0002\u0017\u0002\u0017\t,hMZ3s'&TX\r\t\u0005\u000b\u0003/a%Q1A\u0005\u0002\u0005e\u0011!\u00034fi\u000eD7+\u001b>f+\u0005q\u0004\"CA\u000f\u0019\n\u0005\t\u0015!\u0003?\u0003)1W\r^2i'&TX\r\t\u0005\n\u0003Ca%Q1A\u0005\u0002!\fqbY8ogVlWM]'j]NK'0\u001a\u0005\n\u0003Ka%\u0011!Q\u0001\n1\n\u0001cY8ogVlWM]'j]NK'0\u001a\u0011\t\u0013\u0005%BJ!b\u0001\n\u0003A\u0017aD2p]N,X.\u001a:NCb<\u0016-\u001b;\t\u0013\u00055BJ!A!\u0002\u0013a\u0013\u0001E2p]N,X.\u001a:NCb<\u0016-\u001b;!\u0011%\t\t\u0004\u0014B\u0001B\u0003%\u0011*\u0001\u0007pM\u001a\u001cX\r^$fiR,'\u000f\u0003\u0004\u001a\u0019\u0012\u0005\u0011Q\u0007\u000b\u001b\u0003o\tI$a\u000f\u0002>\u0005}\u0012\u0011IA\"\u0003\u000b\n9%!\u0013\u0002L\u00055\u0013q\n\t\u0003\u001d1Ca\u0001WA\u001a\u0001\u0004Q\u0006BB4\u00024\u0001\u0007A\u0006\u0003\u0004\u0006\u0003g\u0001\rA\u0017\u0005\u0007_\u0006M\u0002\u0019\u0001.\t\rM\f\u0019\u00041\u0001v\u0011\u0019Y\u00181\u0007a\u0001{\"I\u0011qAA\u001a!\u0003\u0005\r\u0001\f\u0005\n\u0003\u001f\t\u0019\u0004%AA\u00021B\u0011\"a\u0006\u00024A\u0005\t\u0019\u0001 \t\u0013\u0005\u0005\u00121\u0007I\u0001\u0002\u0004a\u0003\"CA\u0015\u0003g\u0001\n\u00111\u0001-\u0011%\t\t$a\r\u0011\u0002\u0003\u0007\u0011\n\u0003\u0005\u0002T1\u0013\r\u0011\"\u0001i\u0003u\u0019H.Z3q\u001bN;\u0006.\u001b7f\u001d>$v\u000e]5d!\u0006\u0014H/\u001b;j_:\u001c\bbBA,\u0019\u0002\u0006I\u0001L\u0001\u001fg2,W\r]'T/\"LG.\u001a(p)>\u0004\u0018n\u0019)beRLG/[8og\u0002B\u0011\"a\u0017M\u0005\u0004%\t!!\u0018\u0002\u00179,\u0007\u0010^(gMN,Go]\u000b\u0003\u0003?\u0002\u0002\"!\u0019\u0002l\u0005=\u0014QP\u0007\u0003\u0003GRA!!\u001a\u0002h\u0005Q1m\u001c8dkJ\u0014XM\u001c;\u000b\u0007\u0005%D#\u0001\u0006d_2dWm\u0019;j_:LA!!\u001c\u0002d\t\u0019Q*\u00199\u0011\t\u0005E\u0014\u0011P\u0007\u0003\u0003gRA!!\u001e\u0002x\u000511m\\7n_:T\u0011aA\u0005\u0005\u0003w\n\u0019HA\tU_BL7-\u00118e!\u0006\u0014H/\u001b;j_:\u00042aEA@\u0013\r\t\t\t\u0006\u0002\u0005\u0019>tw\r\u0003\u0005\u0002\u00062\u0003\u000b\u0011BA0\u00031qW\r\u001f;PM\u001a\u001cX\r^:!\u0011%\tI\t\u0014b\u0001\n\u0003\tY)\u0001\tgSJ\u001cHoQ1mY\n\u000b'O]5feV\u0011\u0011Q\u0012\t\u0005\u0003\u001f\u000b)*\u0004\u0002\u0002\u0012*!\u0011QMAJ\u0015\t!&%\u0003\u0003\u0002\u0018\u0006E%AD\"pk:$Hi\\<o\u0019\u0006$8\r\u001b\u0005\t\u00037c\u0005\u0015!\u0003\u0002\u000e\u0006\tb-\u001b:ti\u000e\u000bG\u000e\u001c\"beJLWM\u001d\u0011\t\u0013\u0005}E\n1A\u0005\u0002\u0005\u0005\u0016!\u00034jeN$8)\u00197m+\t\t\u0019\u000bE\u0002\u0014\u0003KK1!a*\u0015\u0005\u001d\u0011un\u001c7fC:D\u0011\"a+M\u0001\u0004%\t!!,\u0002\u001b\u0019L'o\u001d;DC2dw\fJ3r)\u0011\ty+!.\u0011\u0007M\t\t,C\u0002\u00024R\u0011A!\u00168ji\"Q\u0011qWAU\u0003\u0003\u0005\r!a)\u0002\u0007a$\u0013\u0007\u0003\u0005\u0002<2\u0003\u000b\u0015BAR\u0003)1\u0017N]:u\u0007\u0006dG\u000e\t\u0005\n\u0003\u007fc\u0005\u0019!C\u0001\u0003\u0003\fab]5na2,7i\u001c8tk6,'/\u0006\u0002\u0002DB\u0019a\"!2\n\u0007\u0005\u001d'A\u0001\u000eEK\u001a\fW\u000f\u001c;GKR\u001c\u0007nU5na2,7i\u001c8tk6,'\u000fC\u0005\u0002L2\u0003\r\u0011\"\u0001\u0002N\u0006\u00112/[7qY\u0016\u001cuN\\:v[\u0016\u0014x\fJ3r)\u0011\ty+a4\t\u0015\u0005]\u0016\u0011ZA\u0001\u0002\u0004\t\u0019\r\u0003\u0005\u0002T2\u0003\u000b\u0015BAb\u0003=\u0019\u0018.\u001c9mK\u000e{gn];nKJ\u0004\u0003bBAl\u0019\u0012\u0005\u0011\u0011\\\u0001\u0015GJ,\u0017\r^3TS6\u0004H.Z\"p]N,X.\u001a:\u0015\u0005\u0005\r\u0007bBAo\u0019\u0012\u0005\u0011q\\\u0001\u0012C\u0012$Gk\u001c9jGB\u000b'\u000f^5uS>tG#\u0002\u0017\u0002b\u0006\u0015\b\u0002CAr\u00037\u0004\r!a\u001c\u0002\u0005Q\u0004\b\u0002CAt\u00037\u0004\r!!;\u0002\u00159,\u0007\u0010^(gMN,G\u000f\u0005\u0003\u0014\u0003WT\u0016bAAw)\t1q\n\u001d;j_:Dq!!=M\t\u0003\t\u00190\u0001\u000bsK6|g/\u001a+pa&\u001c\u0007+\u0019:uSRLwN\u001c\u000b\u0005\u0003k\f9\u0010E\u0003\u0014\u0003W\fi\b\u0003\u0005\u0002d\u0006=\b\u0019AA8\u0011%\tY\u0010\u0014b\u0001\n\u0003\ti0\u0001\u0004uQJ,\u0017\rZ\u000b\u0003\u0003\u007f\u00042a\bB\u0001\u0013\r\u0011\u0019\u0001\t\u0002\u0007)\"\u0014X-\u00193\t\u0011\t\u001dA\n)A\u0005\u0003\u007f\fq\u0001\u001e5sK\u0006$\u0007\u0005C\u0004\u0003\f1#IA!\u0004\u0002\u001b\u0019,Go\u00195NKN\u001c\u0018mZ3t)\t\ty\u000bC\u0004\u0003\u00121#\tAa\u0005\u0002\u0011\u0005\u0014G-[2bi\u0016$B!a,\u0003\u0016!A\u00111\u001dB\b\u0001\u0004\ty\u0007C\u0004\u0003\u001a1#\tAa\u0007\u0002\u0017\u0005\u0014G-[2bi\u0016\fE\u000e\\\u000b\u0003\u0003_CqAa\bM\t\u0003\u0011\t#\u0001\u0007iC:$G.Z#se>\u00148\u000f\u0006\u0004\u00020\n\r\"q\b\u0005\t\u0005K\u0011i\u00021\u0001\u0003(\u0005qQM\u001d:peJ+7\u000f]8og\u0016\u001c\b#B.\u0003*\t5\u0012b\u0001B\u0016G\n\u00191+\u001a;\u0011\u000fM\u0011y#a\u001c\u00034%\u0019!\u0011\u0007\u000b\u0003\rQ+\b\u000f\\33!\u0011\u0011)Da\u000f\u000e\u0005\t]\"\u0002\u0002B\u001d\u0003o\n1!\u00199j\u0013\u0011\u0011iDa\u000e\u00035\u0019+Go\u00195SKN\u0004xN\\:f!\u0006\u0014H/\u001b;j_:$\u0015\r^1\t\u0011\t\u0005#Q\u0004a\u0001\u0005\u0007\n\u0001B]3ta>t7/\u001a\t\u0005\u0005k\u0011)%\u0003\u0003\u0003H\t]\"!\u0004$fi\u000eD'+Z:q_:\u001cX\rC\u0004\u0003L1#\tA!\u0014\u000215|g/Z'fgN\fw-Z:U_RCW-\u001b:Rk\u0016,X\r\u0006\u0004\u0003P\tU#q\u000b\t\u0004'\tE\u0013b\u0001B*)\t1\u0011I\\=WC2D\u0001\"a9\u0003J\u0001\u0007\u0011q\u000e\u0005\t\u00053\u0012I\u00051\u0001\u00034\u0005!A-\u0019;b\u0011\u001d\u0011i\u0006\u0014C!\u0005?\n\u0001\u0002^8TiJLgn\u001a\u000b\u00025\"9!1\r'\u0005\u0002\tm\u0011!B:uCJ$\bb\u0002B4\u0019\u0012\u0005!1D\u0001\u0005gR|\u0007\u000fC\u0004\u0003l1#IAa\u0007\u0002+I,gM]3tQ2\u000bG/\u001a8ds6+GO]5dg\u0002")
/* loaded from: input_file:org/apache/samza/system/kafka/BrokerProxy.class */
public class BrokerProxy implements Toss, Logging {
    private final String host;
    private final int port;
    private final String system;
    private final String clientID;
    private final KafkaSystemConsumerMetrics metrics;
    private final MessageSink messageSink;
    private final int timeout;
    private final int bufferSize;
    private final StreamFetchSizes fetchSize;
    private final int consumerMinSize;
    private final int consumerMaxWait;
    private final GetOffset offsetGetter;
    private final int sleepMSWhileNoTopicPartitions;
    private final Map<TopicAndPartition, Object> nextOffsets;
    private final CountDownLatch firstCallBarrier;
    private boolean firstCall;
    private DefaultFetchSimpleConsumer simpleConsumer;
    private final Thread thread;
    private final String loggerName;
    private Logger logger;
    private final String startupLoggerName;
    private Logger startupLogger;
    private volatile byte bitmap$0;

    public static String BROKER_PROXY_THREAD_NAME_PREFIX() {
        return BrokerProxy$.MODULE$.BROKER_PROXY_THREAD_NAME_PREFIX();
    }

    public void startupLog(Function0<Object> function0) {
        Logging.startupLog$(this, function0);
    }

    public void trace(Function0<Object> function0) {
        Logging.trace$(this, function0);
    }

    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.trace$(this, function0, function02);
    }

    public void debug(Function0<Object> function0) {
        Logging.debug$(this, function0);
    }

    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.debug$(this, function0, function02);
    }

    public void info(Function0<Object> function0) {
        Logging.info$(this, function0);
    }

    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.info$(this, function0, function02);
    }

    public void warn(Function0<Object> function0) {
        Logging.warn$(this, function0);
    }

    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.warn$(this, function0, function02);
    }

    public void error(Function0<Object> function0) {
        Logging.error$(this, function0);
    }

    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.error$(this, function0, function02);
    }

    public void putMDC(Function0<String> function0, Function0<String> function02) {
        Logging.putMDC$(this, function0, function02);
    }

    public String getMDC(Function0<String> function0) {
        return Logging.getMDC$(this, function0);
    }

    public void removeMDC(Function0<String> function0) {
        Logging.removeMDC$(this, function0);
    }

    public void clearMDC() {
        Logging.clearMDC$(this);
    }

    @Override // org.apache.samza.system.kafka.Toss
    public Nothing$ toss(String str) {
        Nothing$ ssVar;
        ssVar = toss(str);
        return ssVar;
    }

    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.kafka.BrokerProxy] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.logger = Logging.logger$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? logger$lzycompute() : this.logger;
    }

    public String startupLoggerName() {
        return this.startupLoggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.kafka.BrokerProxy] */
    private Logger startupLogger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.startupLogger = Logging.startupLogger$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.startupLogger;
    }

    public Logger startupLogger() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? startupLogger$lzycompute() : this.startupLogger;
    }

    public void org$apache$samza$util$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    public void org$apache$samza$util$Logging$_setter_$startupLoggerName_$eq(String str) {
        this.startupLoggerName = str;
    }

    public String host() {
        return this.host;
    }

    public int port() {
        return this.port;
    }

    public String system() {
        return this.system;
    }

    public String clientID() {
        return this.clientID;
    }

    public KafkaSystemConsumerMetrics metrics() {
        return this.metrics;
    }

    public MessageSink messageSink() {
        return this.messageSink;
    }

    public int timeout() {
        return this.timeout;
    }

    public int bufferSize() {
        return this.bufferSize;
    }

    public StreamFetchSizes fetchSize() {
        return this.fetchSize;
    }

    public int consumerMinSize() {
        return this.consumerMinSize;
    }

    public int consumerMaxWait() {
        return this.consumerMaxWait;
    }

    public int sleepMSWhileNoTopicPartitions() {
        return this.sleepMSWhileNoTopicPartitions;
    }

    public Map<TopicAndPartition, Object> nextOffsets() {
        return this.nextOffsets;
    }

    public CountDownLatch firstCallBarrier() {
        return this.firstCallBarrier;
    }

    public boolean firstCall() {
        return this.firstCall;
    }

    public void firstCall_$eq(boolean z) {
        this.firstCall = z;
    }

    public DefaultFetchSimpleConsumer simpleConsumer() {
        return this.simpleConsumer;
    }

    public void simpleConsumer_$eq(DefaultFetchSimpleConsumer defaultFetchSimpleConsumer) {
        this.simpleConsumer = defaultFetchSimpleConsumer;
    }

    public DefaultFetchSimpleConsumer createSimpleConsumer() {
        String format = new StringOps(Predef$.MODULE$.augmentString("%s:%d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{host(), BoxesRunTime.boxToInteger(port())}));
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Creating new SimpleConsumer for host %s for system %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{format, this.system()}));
        });
        return new DefaultFetchSimpleConsumer(host(), port(), timeout(), bufferSize(), clientID(), fetchSize(), consumerMinSize(), consumerMaxWait());
    }

    public int addTopicPartition(TopicAndPartition topicAndPartition, Option<String> option) {
        long resetOffset;
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Adding new topic and partition %s to queue for %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topicAndPartition, this.host()}));
        });
        if (((java.util.Map) JavaConverters$.MODULE$.mapAsJavaConcurrentMapConverter(nextOffsets()).asJava()).containsKey(topicAndPartition)) {
            throw toss(new StringOps(Predef$.MODULE$.augmentString("Already consuming TopicPartition %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topicAndPartition})));
        }
        if (option.isDefined() && this.offsetGetter.isValidOffset(simpleConsumer(), topicAndPartition, (String) option.get())) {
            resetOffset = new StringOps(Predef$.MODULE$.augmentString((String) option.get())).toLong();
        } else {
            warn(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{option, topicAndPartition}));
            });
            resetOffset = this.offsetGetter.getResetOffset(simpleConsumer(), topicAndPartition);
        }
        long j = resetOffset;
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got offset %s for new topic and partition %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(j), topicAndPartition}));
        });
        nextOffsets().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicAndPartition), BoxesRunTime.boxToLong(j)));
        return BoxesRunTime.unboxToInt(metrics().topicPartitions().get(new Tuple2(host(), BoxesRunTime.boxToInteger(port()))).set(BoxesRunTime.boxToInteger(nextOffsets().size())));
    }

    public Option<Object> removeTopicPartition(TopicAndPartition topicAndPartition) {
        if (!((java.util.Map) JavaConverters$.MODULE$.mapAsJavaConcurrentMapConverter(nextOffsets()).asJava()).containsKey(topicAndPartition)) {
            warn(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Asked to remove topic and partition %s, but not in map (keys = %s)")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topicAndPartition, this.nextOffsets().keys().mkString(",")}));
            });
            return None$.MODULE$;
        }
        Option<Object> remove = nextOffsets().remove(topicAndPartition);
        metrics().topicPartitions().get(new Tuple2(host(), BoxesRunTime.boxToInteger(port()))).set(BoxesRunTime.boxToInteger(nextOffsets().size()));
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Removed %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topicAndPartition}));
        });
        return remove;
    }

    public Thread thread() {
        return this.thread;
    }

    public void org$apache$samza$system$kafka$BrokerProxy$$fetchMessages() {
        List list = nextOffsets().filterKeys(topicAndPartition -> {
            return BoxesRunTime.boxToBoolean($anonfun$fetchMessages$1(this, topicAndPartition));
        }).toList();
        if (list.size() <= 0) {
            refreshLatencyMetrics();
            debug(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.host(), BoxesRunTime.boxToInteger(this.port()), BoxesRunTime.boxToInteger(this.sleepMSWhileNoTopicPartitions())}));
            });
            metrics().brokerSkippedFetchRequests().get(new Tuple2(host(), BoxesRunTime.boxToInteger(port()))).inc();
            Thread.sleep(sleepMSWhileNoTopicPartitions());
            return;
        }
        metrics().brokerReads().get(new Tuple2(host(), BoxesRunTime.boxToInteger(port()))).inc();
        FetchResponse defaultFetch = simpleConsumer().defaultFetch(list);
        firstCall_$eq(false);
        firstCallBarrier().countDown();
        Tuple2 partition = defaultFetch.data().toSet().partition(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$fetchMessages$2(tuple2));
        });
        if (partition == null) {
            throw new MatchError(partition);
        }
        Tuple2 tuple22 = new Tuple2((Set) partition._1(), (Set) partition._2());
        Set set = (Set) tuple22._1();
        handleErrors((Set) tuple22._2(), defaultFetch);
        set.foreach(tuple23 -> {
            if (tuple23 != null) {
                return this.moveMessagesToTheirQueue((TopicAndPartition) tuple23._1(), (FetchResponsePartitionData) tuple23._2());
            }
            throw new MatchError(tuple23);
        });
    }

    public void abdicate(TopicAndPartition topicAndPartition) {
        Some removeTopicPartition = removeTopicPartition(topicAndPartition);
        if (removeTopicPartition instanceof Some) {
            messageSink().abdicate(topicAndPartition, BoxesRunTime.unboxToLong(removeTopicPartition.value()));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(removeTopicPartition)) {
                throw new MatchError(removeTopicPartition);
            }
            warn(() -> {
                return "Tried to abdicate for topic partition not in map. Removed in interim?";
            });
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public void abdicateAll() {
        nextOffsets().toMap(Predef$.MODULE$.$conforms()).keySet().foreach(topicAndPartition -> {
            this.abdicate(topicAndPartition);
            return BoxedUnit.UNIT;
        });
    }

    public void handleErrors(Set<Tuple2<TopicAndPartition, FetchResponsePartitionData>> set, FetchResponse fetchResponse) {
        new LazyRef();
        Tuple2 partition = ((Set) set.withFilter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$handleErrors$1(tuple2));
        }).flatMap(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError(tuple22);
            }
            TopicAndPartition topicAndPartition = (TopicAndPartition) tuple22._1();
            return Option$.MODULE$.option2Iterable(Option$.MODULE$.apply(BoxesRunTime.boxToShort(fetchResponse.errorCode(topicAndPartition.topic(), topicAndPartition.partition()))).flatMap(obj -> {
                return $anonfun$handleErrors$3(this, topicAndPartition, BoxesRunTime.unboxToShort(obj));
            }));
        }, Set$.MODULE$.canBuildFrom())).partition(brokerProxy$Error$3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$handleErrors$5(brokerProxy$Error$3));
        });
        if (partition == null) {
            throw new MatchError(partition);
        }
        Tuple2 tuple23 = new Tuple2((Set) partition._1(), (Set) partition._2());
        Set set2 = (Set) tuple23._1();
        Tuple2 partition2 = ((Set) tuple23._2()).partition(brokerProxy$Error$32 -> {
            return BoxesRunTime.boxToBoolean($anonfun$handleErrors$6(brokerProxy$Error$32));
        });
        if (partition2 == null) {
            throw new MatchError(partition2);
        }
        Tuple2 tuple24 = new Tuple2((Set) partition2._1(), (Set) partition2._2());
        Set set3 = (Set) tuple24._1();
        Set set4 = (Set) tuple24._2();
        set4.foreach(brokerProxy$Error$33 -> {
            $anonfun$handleErrors$7(this, set4, brokerProxy$Error$33);
            return BoxedUnit.UNIT;
        });
        set2.foreach(brokerProxy$Error$34 -> {
            $anonfun$handleErrors$9(this, brokerProxy$Error$34);
            return BoxedUnit.UNIT;
        });
        set3.foreach(brokerProxy$Error$35 -> {
            this.warn(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Received OffsetOutOfRange exception for %s. Current offset = %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{brokerProxy$Error$35.tp(), this.nextOffsets().getOrElse(brokerProxy$Error$35.tp(), () -> {
                    return "not found in map, likely removed in the interim";
                })}));
            });
            try {
                return this.nextOffsets().replace(brokerProxy$Error$35.tp(), BoxesRunTime.boxToLong(this.offsetGetter.getResetOffset(this.simpleConsumer(), brokerProxy$Error$35.tp())));
            } catch (Throwable th) {
                if (!(th instanceof UnknownTopicOrPartitionException ? true : th instanceof NotLeaderForPartitionException)) {
                    throw th;
                }
                this.warn(() -> {
                    return "Received (UnknownTopicOr|NotLeaderFor)Partition exception. Abdicating";
                });
                this.abdicate(brokerProxy$Error$35.tp());
                return BoxedUnit.UNIT;
            }
        });
    }

    public Object moveMessagesToTheirQueue(TopicAndPartition topicAndPartition, FetchResponsePartitionData fetchResponsePartitionData) {
        MessageSet messages = fetchResponsePartitionData.messages();
        LongRef create = LongRef.create(BoxesRunTime.unboxToLong(nextOffsets().apply(topicAndPartition)));
        messageSink().setIsAtHighWatermark(topicAndPartition, fetchResponsePartitionData.hw() == 0 || fetchResponsePartitionData.hw() == create.elem);
        Predef$.MODULE$.require(messages != null);
        messages.iterator().foreach(messageAndOffset -> {
            $anonfun$moveMessagesToTheirQueue$1(this, topicAndPartition, fetchResponsePartitionData, create, messageAndOffset);
            return BoxedUnit.UNIT;
        });
        nextOffsets().replace(topicAndPartition, BoxesRunTime.boxToLong(create.elem));
        long hw = fetchResponsePartitionData.hw();
        if (hw >= 0) {
            metrics().highWatermark().get(topicAndPartition).set(BoxesRunTime.boxToLong(hw));
            return metrics().lag().get(topicAndPartition).set(BoxesRunTime.boxToLong(hw - create.elem));
        }
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got a high water mark less than 0 (%d) for %s, so skipping.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(hw), topicAndPartition}));
        });
        return BoxedUnit.UNIT;
    }

    public String toString() {
        return new StringOps(Predef$.MODULE$.augmentString("BrokerProxy for %s:%d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{host(), BoxesRunTime.boxToInteger(port())}));
    }

    public void start() {
        if (thread().isAlive()) {
            debug(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Tried to start an already started broker proxy (%s). Ignoring.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.toString()}));
            });
            return;
        }
        info(() -> {
            return "Starting " + this.toString();
        });
        thread().setDaemon(true);
        thread().setName(ThreadNamePrefix$.MODULE$.SAMZA_THREAD_NAME_PREFIX() + BrokerProxy$.MODULE$.BROKER_PROXY_THREAD_NAME_PREFIX() + thread().getName());
        thread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(this) { // from class: org.apache.samza.system.kafka.BrokerProxy$$anon$2
            private final /* synthetic */ BrokerProxy $outer;

            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                this.$outer.error(() -> {
                    return "Uncaught exception in broker proxy:";
                }, () -> {
                    return th;
                });
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
        thread().start();
    }

    public void stop() {
        info(() -> {
            return "Shutting down " + this.toString();
        });
        if (simpleConsumer() != null) {
            info(() -> {
                return "closing simple consumer...";
            });
            simpleConsumer().close();
        }
        thread().interrupt();
        thread().join();
    }

    private void refreshLatencyMetrics() {
        nextOffsets().foreach(tuple2 -> {
            Object obj;
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            TopicAndPartition topicAndPartition = (TopicAndPartition) tuple2._1();
            long _2$mcJ$sp = tuple2._2$mcJ$sp();
            long earliestOrLatestOffset = this.simpleConsumer().earliestOrLatestOffset(topicAndPartition, -1L, Request$.MODULE$.OrdinaryConsumerId());
            this.trace(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("latest offset of %s is %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topicAndPartition, BoxesRunTime.boxToLong(earliestOrLatestOffset)}));
            });
            if (earliestOrLatestOffset >= 0) {
                if (this.metrics().highWatermark().containsKey(topicAndPartition)) {
                    this.metrics().highWatermark().get(topicAndPartition).set(BoxesRunTime.boxToLong(earliestOrLatestOffset));
                } else {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                }
                obj = this.metrics().lag().containsKey(topicAndPartition) ? this.metrics().lag().get(topicAndPartition).set(BoxesRunTime.boxToLong(earliestOrLatestOffset - _2$mcJ$sp)) : BoxedUnit.UNIT;
            } else {
                obj = BoxedUnit.UNIT;
            }
            return obj;
        });
    }

    public static final /* synthetic */ boolean $anonfun$fetchMessages$1(BrokerProxy brokerProxy, TopicAndPartition topicAndPartition) {
        return brokerProxy.messageSink().needsMoreMessages(topicAndPartition);
    }

    public static final /* synthetic */ boolean $anonfun$fetchMessages$2(Tuple2 tuple2) {
        return ((FetchResponsePartitionData) tuple2._2()).error() == ErrorMapping$.MODULE$.NoError();
    }

    private final /* synthetic */ BrokerProxy$Error$4$ Error$lzycompute$1(LazyRef lazyRef) {
        BrokerProxy$Error$4$ brokerProxy$Error$4$;
        synchronized (lazyRef) {
            brokerProxy$Error$4$ = lazyRef.initialized() ? (BrokerProxy$Error$4$) lazyRef.value() : (BrokerProxy$Error$4$) lazyRef.initialize(new BrokerProxy$Error$4$(this));
        }
        return brokerProxy$Error$4$;
    }

    private final BrokerProxy$Error$4$ Error$2(LazyRef lazyRef) {
        return lazyRef.initialized() ? (BrokerProxy$Error$4$) lazyRef.value() : Error$lzycompute$1(lazyRef);
    }

    public static final /* synthetic */ boolean $anonfun$handleErrors$1(Tuple2 tuple2) {
        return tuple2 != null;
    }

    public static final /* synthetic */ Option $anonfun$handleErrors$3(BrokerProxy brokerProxy, TopicAndPartition topicAndPartition, short s) {
        return Option$.MODULE$.apply(ErrorMapping$.MODULE$.exceptionFor(s)).map(th -> {
            return new BrokerProxy$Error$3(brokerProxy, topicAndPartition, s, th);
        });
    }

    public static final /* synthetic */ boolean $anonfun$handleErrors$5(BrokerProxy$Error$3 brokerProxy$Error$3) {
        return brokerProxy$Error$3.code() == ErrorMapping$.MODULE$.NotLeaderForPartitionCode() || brokerProxy$Error$3.code() == ErrorMapping$.MODULE$.UnknownTopicOrPartitionCode();
    }

    public static final /* synthetic */ boolean $anonfun$handleErrors$6(BrokerProxy$Error$3 brokerProxy$Error$3) {
        return brokerProxy$Error$3.code() == ErrorMapping$.MODULE$.OffsetOutOfRangeCode();
    }

    public static final /* synthetic */ void $anonfun$handleErrors$7(BrokerProxy brokerProxy, Set set, BrokerProxy$Error$3 brokerProxy$Error$3) {
        brokerProxy.warn(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{set.mkString(",")}));
        });
        KafkaUtil$.MODULE$.maybeThrowException(brokerProxy$Error$3.code());
    }

    public static final /* synthetic */ void $anonfun$handleErrors$9(BrokerProxy brokerProxy, BrokerProxy$Error$3 brokerProxy$Error$3) {
        brokerProxy.abdicate(brokerProxy$Error$3.tp());
    }

    public static final /* synthetic */ void $anonfun$moveMessagesToTheirQueue$1(BrokerProxy brokerProxy, TopicAndPartition topicAndPartition, FetchResponsePartitionData fetchResponsePartitionData, LongRef longRef, MessageAndOffset messageAndOffset) {
        brokerProxy.messageSink().addMessage(topicAndPartition, messageAndOffset, fetchResponsePartitionData.hw());
        longRef.elem = messageAndOffset.nextOffset();
        int payloadSize = messageAndOffset.message().payloadSize() + messageAndOffset.message().keySize();
        brokerProxy.metrics().reads().get(topicAndPartition).inc();
        brokerProxy.metrics().bytesRead().get(topicAndPartition).inc(payloadSize);
        brokerProxy.metrics().brokerBytesRead().get(new Tuple2(brokerProxy.host(), BoxesRunTime.boxToInteger(brokerProxy.port()))).inc(payloadSize);
        brokerProxy.metrics().offsets().get(topicAndPartition).set(longRef.elem);
    }

    public BrokerProxy(String str, int i, String str2, String str3, KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics, MessageSink messageSink, int i2, int i3, StreamFetchSizes streamFetchSizes, int i4, int i5, GetOffset getOffset) {
        this.host = str;
        this.port = i;
        this.system = str2;
        this.clientID = str3;
        this.metrics = kafkaSystemConsumerMetrics;
        this.messageSink = messageSink;
        this.timeout = i2;
        this.bufferSize = i3;
        this.fetchSize = streamFetchSizes;
        this.consumerMinSize = i4;
        this.consumerMaxWait = i5;
        this.offsetGetter = getOffset;
        Toss.$init$(this);
        Logging.$init$(this);
        this.sleepMSWhileNoTopicPartitions = 100;
        this.nextOffsets = (Map) JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter(new ConcurrentHashMap()).asScala();
        this.firstCallBarrier = new CountDownLatch(1);
        this.firstCall = true;
        this.simpleConsumer = createSimpleConsumer();
        kafkaSystemConsumerMetrics.registerBrokerProxy(str, i);
        this.thread = new Thread(new Runnable(this) { // from class: org.apache.samza.system.kafka.BrokerProxy$$anon$1
            private final /* synthetic */ BrokerProxy $outer;

            @Override // java.lang.Runnable
            public void run() {
                BooleanRef create = BooleanRef.create(false);
                try {
                    new ExponentialSleepStrategy(ExponentialSleepStrategy$.MODULE$.$lessinit$greater$default$1(), ExponentialSleepStrategy$.MODULE$.$lessinit$greater$default$2(), ExponentialSleepStrategy$.MODULE$.$lessinit$greater$default$3()).run(retryLoop -> {
                        $anonfun$run$1(this, create, retryLoop);
                        return BoxedUnit.UNIT;
                    }, (exc, retryLoop2) -> {
                        $anonfun$run$3(this, create, exc, retryLoop2);
                        return BoxedUnit.UNIT;
                    });
                } catch (InterruptedException e) {
                    this.$outer.info(() -> {
                        return "Got interrupt exception in broker proxy thread.";
                    });
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } catch (OutOfMemoryError e2) {
                    throw new SamzaException("Got out of memory error in broker proxy thread.");
                } catch (StackOverflowError e3) {
                    throw new SamzaException("Got stack overflow error in broker proxy thread.");
                } catch (ClosedByInterruptException e4) {
                    this.$outer.info(() -> {
                        return "Got closed by interrupt exception in broker proxy thread.";
                    });
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
                if (Thread.currentThread().isInterrupted()) {
                    this.$outer.info(() -> {
                        return "Shutting down due to interrupt.";
                    });
                }
            }

            public static final /* synthetic */ void $anonfun$run$1(BrokerProxy$$anon$1 brokerProxy$$anon$1, BooleanRef booleanRef, ExponentialSleepStrategy.RetryLoop retryLoop) {
                if (booleanRef.elem) {
                    brokerProxy$$anon$1.$outer.metrics().reconnects().get(new Tuple2(brokerProxy$$anon$1.$outer.host(), BoxesRunTime.boxToInteger(brokerProxy$$anon$1.$outer.port()))).inc();
                    brokerProxy$$anon$1.$outer.simpleConsumer().close();
                    brokerProxy$$anon$1.$outer.simpleConsumer_$eq(brokerProxy$$anon$1.$outer.createSimpleConsumer());
                }
                while (!Thread.currentThread().isInterrupted()) {
                    brokerProxy$$anon$1.$outer.messageSink().refreshDropped();
                    if (brokerProxy$$anon$1.$outer.nextOffsets().size() == 0) {
                        brokerProxy$$anon$1.$outer.debug(() -> {
                            return "No TopicPartitions to fetch. Sleeping.";
                        });
                        Thread.sleep(brokerProxy$$anon$1.$outer.sleepMSWhileNoTopicPartitions());
                    } else {
                        brokerProxy$$anon$1.$outer.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages();
                        retryLoop.reset();
                    }
                }
            }

            public static final /* synthetic */ void $anonfun$run$3(BrokerProxy$$anon$1 brokerProxy$$anon$1, BooleanRef booleanRef, Exception exc, ExponentialSleepStrategy.RetryLoop retryLoop) {
                brokerProxy$$anon$1.$outer.warn(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{exc}));
                });
                brokerProxy$$anon$1.$outer.debug(() -> {
                    return "Exception detail:";
                }, () -> {
                    return exc;
                });
                brokerProxy$$anon$1.$outer.abdicateAll();
                booleanRef.elem = true;
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        }, new StringOps(Predef$.MODULE$.augmentString("BrokerProxy thread pointed at %s:%d for client %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str, BoxesRunTime.boxToInteger(i), str3})));
    }
}
