package org.apache.samza.system.kafka_deprecated;

import java.lang.Thread;
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import kafka.api.FetchResponse;
import kafka.api.FetchResponsePartitionData;
import kafka.api.Request$;
import kafka.common.ErrorMapping$;
import kafka.common.NotLeaderForPartitionException;
import kafka.common.TopicAndPartition;
import kafka.common.UnknownTopicOrPartitionException;
import kafka.message.MessageAndOffset;
import kafka.message.MessageSet;
import org.apache.samza.SamzaException;
import org.apache.samza.util.ExponentialSleepStrategy;
import org.apache.samza.util.KafkaUtil$;
import org.apache.samza.util.Logging;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.concurrent.Map;
import scala.collection.immutable.List;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;
import scala.runtime.LongRef;
import scala.runtime.Nothing$;

/* compiled from: BrokerProxy.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\u0005d\u0001B\u0001\u0003\u00015\u00111B\u0011:pW\u0016\u0014\bK]8ys*\u00111\u0001B\u0001\u0011W\u000647.Y0eKB\u0014XmY1uK\u0012T!!\u0002\u0004\u0002\rML8\u000f^3n\u0015\t9\u0001\"A\u0003tC6T\u0018M\u0003\u0002\n\u0015\u00051\u0011\r]1dQ\u0016T\u0011aC\u0001\u0004_J<7\u0001A\n\u0005\u00019!\u0002\u0004\u0005\u0002\u0010%5\t\u0001CC\u0001\u0012\u0003\u0015\u00198-\u00197b\u0013\t\u0019\u0002C\u0001\u0004B]f\u0014VM\u001a\t\u0003+Yi\u0011AA\u0005\u0003/\t\u0011A\u0001V8tgB\u0011\u0011\u0004H\u0007\u00025)\u00111DB\u0001\u0005kRLG.\u0003\u0002\u001e5\t9Aj\\4hS:<\u0007\u0002C\u0010\u0001\u0005\u000b\u0007I\u0011\u0001\u0011\u0002\t!|7\u000f^\u000b\u0002CA\u0011!%\u000b\b\u0003G\u001d\u0002\"\u0001\n\t\u000e\u0003\u0015R!A\n\u0007\u0002\rq\u0012xn\u001c;?\u0013\tA\u0003#\u0001\u0004Qe\u0016$WMZ\u0005\u0003U-\u0012aa\u0015;sS:<'B\u0001\u0015\u0011\u0011!i\u0003A!A!\u0002\u0013\t\u0013!\u00025pgR\u0004\u0003\u0002C\u0018\u0001\u0005\u000b\u0007I\u0011\u0001\u0019\u0002\tA|'\u000f^\u000b\u0002cA\u0011qBM\u0005\u0003gA\u00111!\u00138u\u0011!)\u0004A!A!\u0002\u0013\t\u0014!\u00029peR\u0004\u0003\u0002C\u0003\u0001\u0005\u000b\u0007I\u0011\u0001\u0011\t\u0011a\u0002!\u0011!Q\u0001\n\u0005\nqa]=ti\u0016l\u0007\u0005\u0003\u0005;\u0001\t\u0015\r\u0011\"\u0001!\u0003!\u0019G.[3oi&#\u0005\u0002\u0003\u001f\u0001\u0005\u0003\u0005\u000b\u0011B\u0011\u0002\u0013\rd\u0017.\u001a8u\u0013\u0012\u0003\u0003\u0002\u0003 \u0001\u0005\u000b\u0007I\u0011A \u0002\u000f5,GO]5dgV\t\u0001\t\u0005\u0002\u0016\u0003&\u0011!I\u0001\u0002\u001b\u0017\u000647.Y*zgR,WnQ8ogVlWM]'fiJL7m\u001d\u0005\t\t\u0002\u0011\t\u0011)A\u0005\u0001\u0006AQ.\u001a;sS\u000e\u001c\b\u0005\u0003\u0005G\u0001\t\u0015\r\u0011\"\u0001H\u0003-iWm]:bO\u0016\u001c\u0016N\\6\u0016\u0003!\u0003\"!F%\n\u0005)\u0013!aC'fgN\fw-Z*j].D\u0001\u0002\u0014\u0001\u0003\u0002\u0003\u0006I\u0001S\u0001\r[\u0016\u001c8/Y4f'&t7\u000e\t\u0005\t\u001d\u0002\u0011)\u0019!C\u0001a\u00059A/[7f_V$\b\u0002\u0003)\u0001\u0005\u0003\u0005\u000b\u0011B\u0019\u0002\u0011QLW.Z8vi\u0002B\u0001B\u0015\u0001\u0003\u0006\u0004%\t\u0001M\u0001\u000bEV4g-\u001a:TSj,\u0007\u0002\u0003+\u0001\u0005\u0003\u0005\u000b\u0011B\u0019\u0002\u0017\t,hMZ3s'&TX\r\t\u0005\t-\u0002\u0011)\u0019!C\u0001/\u0006Ia-\u001a;dQNK'0Z\u000b\u00021B\u0011Q#W\u0005\u00035\n\u0011\u0001c\u0015;sK\u0006lg)\u001a;dQNK'0Z:\t\u0011q\u0003!\u0011!Q\u0001\na\u000b!BZ3uG\"\u001c\u0016N_3!\u0011!q\u0006A!b\u0001\n\u0003\u0001\u0014aD2p]N,X.\u001a:NS:\u001c\u0016N_3\t\u0011\u0001\u0004!\u0011!Q\u0001\nE\n\u0001cY8ogVlWM]'j]NK'0\u001a\u0011\t\u0011\t\u0004!Q1A\u0005\u0002A\nqbY8ogVlWM]'bq^\u000b\u0017\u000e\u001e\u0005\tI\u0002\u0011\t\u0011)A\u0005c\u0005\u00012m\u001c8tk6,'/T1y/\u0006LG\u000f\t\u0005\tM\u0002\u0011\t\u0011)A\u0005O\u0006aqN\u001a4tKR<U\r\u001e;feB\u0011Q\u0003[\u0005\u0003S\n\u0011\u0011bR3u\u001f\u001a47/\u001a;\t\u000b-\u0004A\u0011\u00017\u0002\rqJg.\u001b;?)5ign\u001c9reN$XO^<ysB\u0011Q\u0003\u0001\u0005\u0006?)\u0004\r!\t\u0005\u0006_)\u0004\r!\r\u0005\u0006\u000b)\u0004\r!\t\u0005\u0006u)\u0004\r!\t\u0005\u0006})\u0004\r\u0001\u0011\u0005\u0006\r*\u0004\r\u0001\u0013\u0005\b\u001d*\u0004\n\u00111\u00012\u0011\u001d\u0011&\u000e%AA\u0002EBqA\u00166\u0011\u0002\u0003\u0007\u0001\fC\u0004_UB\u0005\t\u0019A\u0019\t\u000f\tT\u0007\u0013!a\u0001c!9aM\u001bI\u0001\u0002\u00049\u0007bB>\u0001\u0005\u0004%\t\u0001M\u0001\u001eg2,W\r]'T/\"LG.\u001a(p)>\u0004\u0018n\u0019)beRLG/[8og\"1Q\u0010\u0001Q\u0001\nE\nad\u001d7fKBl5k\u00165jY\u0016tu\u000eV8qS\u000e\u0004\u0016M\u001d;ji&|gn\u001d\u0011\t\u0011}\u0004!\u0019!C\u0001\u0003\u0003\t1B\\3yi>3gm]3ugV\u0011\u00111\u0001\t\t\u0003\u000b\ty!a\u0005\u0002$5\u0011\u0011q\u0001\u0006\u0005\u0003\u0013\tY!\u0001\u0006d_:\u001cWO\u001d:f]RT1!!\u0004\u0011\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0005\u0003#\t9AA\u0002NCB\u0004B!!\u0006\u0002 5\u0011\u0011q\u0003\u0006\u0005\u00033\tY\"\u0001\u0004d_6lwN\u001c\u0006\u0003\u0003;\tQa[1gW\u0006LA!!\t\u0002\u0018\t\tBk\u001c9jG\u0006sG\rU1si&$\u0018n\u001c8\u0011\u0007=\t)#C\u0002\u0002(A\u0011A\u0001T8oO\"A\u00111\u0006\u0001!\u0002\u0013\t\u0019!\u0001\u0007oKb$xJ\u001a4tKR\u001c\b\u0005C\u0005\u00020\u0001\u0011\r\u0011\"\u0001\u00022\u0005\u0001b-\u001b:ti\u000e\u000bG\u000e\u001c\"beJLWM]\u000b\u0003\u0003g\u0001B!!\u000e\u0002@5\u0011\u0011q\u0007\u0006\u0005\u0003\u0013\tIDC\u0002\u001c\u0003wQ!!!\u0010\u0002\t)\fg/Y\u0005\u0005\u0003\u0003\n9D\u0001\bD_VtG\u000fR8x]2\u000bGo\u00195\t\u0011\u0005\u0015\u0003\u0001)A\u0005\u0003g\t\u0011CZ5sgR\u001c\u0015\r\u001c7CCJ\u0014\u0018.\u001a:!\u0011%\tI\u0005\u0001a\u0001\n\u0003\tY%A\u0005gSJ\u001cHoQ1mYV\u0011\u0011Q\n\t\u0004\u001f\u0005=\u0013bAA)!\t9!i\\8mK\u0006t\u0007\"CA+\u0001\u0001\u0007I\u0011AA,\u000351\u0017N]:u\u0007\u0006dGn\u0018\u0013fcR!\u0011\u0011LA0!\ry\u00111L\u0005\u0004\u0003;\u0002\"\u0001B+oSRD!\"!\u0019\u0002T\u0005\u0005\t\u0019AA'\u0003\rAH%\r\u0005\t\u0003K\u0002\u0001\u0015)\u0003\u0002N\u0005Qa-\u001b:ti\u000e\u000bG\u000e\u001c\u0011\t\u0013\u0005%\u0004\u00011A\u0005\u0002\u0005-\u0014AD:j[BdWmQ8ogVlWM]\u000b\u0003\u0003[\u00022!FA8\u0013\r\t\tH\u0001\u0002\u001b\t\u00164\u0017-\u001e7u\r\u0016$8\r[*j[BdWmQ8ogVlWM\u001d\u0005\n\u0003k\u0002\u0001\u0019!C\u0001\u0003o\n!c]5na2,7i\u001c8tk6,'o\u0018\u0013fcR!\u0011\u0011LA=\u0011)\t\t'a\u001d\u0002\u0002\u0003\u0007\u0011Q\u000e\u0005\t\u0003{\u0002\u0001\u0015)\u0003\u0002n\u0005y1/[7qY\u0016\u001cuN\\:v[\u0016\u0014\b\u0005C\u0004\u0002\u0002\u0002!\t!a!\u0002)\r\u0014X-\u0019;f'&l\u0007\u000f\\3D_:\u001cX/\\3s)\t\ti\u0007C\u0004\u0002\b\u0002!\t!!#\u0002#\u0005$G\rV8qS\u000e\u0004\u0016M\u001d;ji&|g\u000eF\u00032\u0003\u0017\u000by\t\u0003\u0005\u0002\u000e\u0006\u0015\u0005\u0019AA\n\u0003\t!\b\u000f\u0003\u0005\u0002\u0012\u0006\u0015\u0005\u0019AAJ\u0003)qW\r\u001f;PM\u001a\u001cX\r\u001e\t\u0005\u001f\u0005U\u0015%C\u0002\u0002\u0018B\u0011aa\u00149uS>t\u0007bBAN\u0001\u0011\u0005\u0011QT\u0001\u0015e\u0016lwN^3U_BL7\rU1si&$\u0018n\u001c8\u0015\t\u0005}\u0015\u0011\u0015\t\u0006\u001f\u0005U\u00151\u0005\u0005\t\u0003\u001b\u000bI\n1\u0001\u0002\u0014!I\u0011Q\u0015\u0001C\u0002\u0013\u0005\u0011qU\u0001\u0007i\"\u0014X-\u00193\u0016\u0005\u0005%\u0006\u0003BAV\u0003ck!!!,\u000b\t\u0005=\u00161H\u0001\u0005Y\u0006tw-\u0003\u0003\u00024\u00065&A\u0002+ie\u0016\fG\r\u0003\u0005\u00028\u0002\u0001\u000b\u0011BAU\u0003\u001d!\bN]3bI\u0002Bq!a/\u0001\t\u0013\ti,A\u0007gKR\u001c\u0007.T3tg\u0006<Wm\u001d\u000b\u0003\u00033Bq!!1\u0001\t\u0003\t\u0019-\u0001\u0005bE\u0012L7-\u0019;f)\u0011\tI&!2\t\u0011\u00055\u0015q\u0018a\u0001\u0003'Aq!!3\u0001\t\u0003\tY-A\u0006bE\u0012L7-\u0019;f\u00032dWCAA-\u0011\u001d\ty\r\u0001C\u0001\u0003#\fA\u0002[1oI2,WI\u001d:peN$b!!\u0017\u0002T\u0006=\b\u0002CAk\u0003\u001b\u0004\r!a6\u0002\u001d\u0015\u0014(o\u001c:SKN\u0004xN\\:fgB)!%!7\u0002^&\u0019\u00111\\\u0016\u0003\u0007M+G\u000fE\u0004\u0010\u0003?\f\u0019\"a9\n\u0007\u0005\u0005\bC\u0001\u0004UkBdWM\r\t\u0005\u0003K\fY/\u0004\u0002\u0002h*!\u0011\u0011^A\u000e\u0003\r\t\u0007/[\u0005\u0005\u0003[\f9O\u0001\u000eGKR\u001c\u0007NU3ta>t7/\u001a)beRLG/[8o\t\u0006$\u0018\r\u0003\u0005\u0002r\u00065\u0007\u0019AAz\u0003!\u0011Xm\u001d9p]N,\u0007\u0003BAs\u0003kLA!a>\u0002h\nia)\u001a;dQJ+7\u000f]8og\u0016Dq!a?\u0001\t\u0003\ti0\u0001\rn_Z,W*Z:tC\u001e,7\u000fV8UQ\u0016L'/U;fk\u0016$b!a@\u0003\u0006\t\u001d\u0001cA\b\u0003\u0002%\u0019!1\u0001\t\u0003\r\u0005s\u0017PV1m\u0011!\ti)!?A\u0002\u0005M\u0001\u0002\u0003B\u0005\u0003s\u0004\r!a9\u0002\t\u0011\fG/\u0019\u0005\b\u0005\u001b\u0001A\u0011\tB\b\u0003!!xn\u0015;sS:<G#A\u0011\t\u000f\tM\u0001\u0001\"\u0001\u0002L\u0006)1\u000f^1si\"9!q\u0003\u0001\u0005\u0002\u0005-\u0017\u0001B:u_BDqAa\u0007\u0001\t\u0013\tY-A\u000bsK\u001a\u0014Xm\u001d5MCR,gnY=NKR\u0014\u0018nY:\b\u0013\t}!!!A\t\u0002\t\u0005\u0012a\u0003\"s_.,'\u000f\u0015:pqf\u00042!\u0006B\u0012\r!\t!!!A\t\u0002\t\u00152c\u0001B\u0012\u001d!91Na\t\u0005\u0002\t%BC\u0001B\u0011\u0011)\u0011iCa\t\u0012\u0002\u0013\u0005!qF\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000fJ\u001c\u0016\u0005\tE\"fA\u0019\u00034-\u0012!Q\u0007\t\u0005\u0005o\u0011\t%\u0004\u0002\u0003:)!!1\bB\u001f\u0003%)hn\u00195fG.,GMC\u0002\u0003@A\t!\"\u00198o_R\fG/[8o\u0013\u0011\u0011\u0019E!\u000f\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW\r\u0003\u0006\u0003H\t\r\u0012\u0013!C\u0001\u0005_\t1\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u0012B\u0004B\u0003B&\u0005G\t\n\u0011\"\u0001\u0003N\u0005YB\u0005\\3tg&t\u0017\u000e\u001e\u0013he\u0016\fG/\u001a:%I\u00164\u0017-\u001e7uIe*\"Aa\u0014+\u0007a\u0013\u0019\u0004\u0003\u0006\u0003T\t\r\u0012\u0013!C\u0001\u0005_\tA\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u0012\n\u0004\u0007\u0003\u0006\u0003X\t\r\u0012\u0013!C\u0001\u0005_\tA\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u0012\n\u0014\u0007\u0003\u0006\u0003\\\t\r\u0012\u0013!C\u0001\u0005;\nA\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u0012\n$'\u0006\u0002\u0003`)\u001aqMa\r")
/* loaded from: input_file:org/apache/samza/system/kafka_deprecated/BrokerProxy.class */
public class BrokerProxy implements Toss, Logging {
    private final String host;
    private final int port;
    private final String system;
    private final String clientID;
    private final KafkaSystemConsumerMetrics metrics;
    private final MessageSink messageSink;
    private final int timeout;
    private final int bufferSize;
    private final StreamFetchSizes fetchSize;
    private final int consumerMinSize;
    private final int consumerMaxWait;
    private final GetOffset offsetGetter;
    private final int sleepMSWhileNoTopicPartitions;
    private final Map<TopicAndPartition, Object> nextOffsets;
    private final CountDownLatch firstCallBarrier;
    private boolean firstCall;
    private DefaultFetchSimpleConsumer simpleConsumer;
    private final Thread thread;
    private final String loggerName;
    private Logger logger;
    private final String startupLoggerName;
    private Logger startupLogger;
    private volatile byte bitmap$0;

    public void startupLog(Function0<Object> function0) {
        Logging.startupLog$(this, function0);
    }

    public void trace(Function0<Object> function0) {
        Logging.trace$(this, function0);
    }

    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.trace$(this, function0, function02);
    }

    public void debug(Function0<Object> function0) {
        Logging.debug$(this, function0);
    }

    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.debug$(this, function0, function02);
    }

    public void info(Function0<Object> function0) {
        Logging.info$(this, function0);
    }

    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.info$(this, function0, function02);
    }

    public void warn(Function0<Object> function0) {
        Logging.warn$(this, function0);
    }

    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.warn$(this, function0, function02);
    }

    public void error(Function0<Object> function0) {
        Logging.error$(this, function0);
    }

    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.error$(this, function0, function02);
    }

    public void putMDC(Function0<String> function0, Function0<String> function02) {
        Logging.putMDC$(this, function0, function02);
    }

    public String getMDC(Function0<String> function0) {
        return Logging.getMDC$(this, function0);
    }

    public void removeMDC(Function0<String> function0) {
        Logging.removeMDC$(this, function0);
    }

    public void clearMDC() {
        Logging.clearMDC$(this);
    }

    @Override // org.apache.samza.system.kafka_deprecated.Toss
    public Nothing$ toss(String str) {
        Nothing$ ssVar;
        ssVar = toss(str);
        return ssVar;
    }

    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.kafka_deprecated.BrokerProxy] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.logger = Logging.logger$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? logger$lzycompute() : this.logger;
    }

    public String startupLoggerName() {
        return this.startupLoggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.kafka_deprecated.BrokerProxy] */
    private Logger startupLogger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.startupLogger = Logging.startupLogger$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.startupLogger;
    }

    public Logger startupLogger() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? startupLogger$lzycompute() : this.startupLogger;
    }

    public void org$apache$samza$util$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    public void org$apache$samza$util$Logging$_setter_$startupLoggerName_$eq(String str) {
        this.startupLoggerName = str;
    }

    public String host() {
        return this.host;
    }

    public int port() {
        return this.port;
    }

    public String system() {
        return this.system;
    }

    public String clientID() {
        return this.clientID;
    }

    public KafkaSystemConsumerMetrics metrics() {
        return this.metrics;
    }

    public MessageSink messageSink() {
        return this.messageSink;
    }

    public int timeout() {
        return this.timeout;
    }

    public int bufferSize() {
        return this.bufferSize;
    }

    public StreamFetchSizes fetchSize() {
        return this.fetchSize;
    }

    public int consumerMinSize() {
        return this.consumerMinSize;
    }

    public int consumerMaxWait() {
        return this.consumerMaxWait;
    }

    public int sleepMSWhileNoTopicPartitions() {
        return this.sleepMSWhileNoTopicPartitions;
    }

    public Map<TopicAndPartition, Object> nextOffsets() {
        return this.nextOffsets;
    }

    public CountDownLatch firstCallBarrier() {
        return this.firstCallBarrier;
    }

    public boolean firstCall() {
        return this.firstCall;
    }

    public void firstCall_$eq(boolean z) {
        this.firstCall = z;
    }

    public DefaultFetchSimpleConsumer simpleConsumer() {
        return this.simpleConsumer;
    }

    public void simpleConsumer_$eq(DefaultFetchSimpleConsumer defaultFetchSimpleConsumer) {
        this.simpleConsumer = defaultFetchSimpleConsumer;
    }

    public DefaultFetchSimpleConsumer createSimpleConsumer() {
        String format = new StringOps(Predef$.MODULE$.augmentString("%s:%d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{host(), BoxesRunTime.boxToInteger(port())}));
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Creating new SimpleConsumer for host %s for system %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{format, this.system()}));
        });
        return new DefaultFetchSimpleConsumer(host(), port(), timeout(), bufferSize(), clientID(), fetchSize(), consumerMinSize(), consumerMaxWait());
    }

    public int addTopicPartition(TopicAndPartition topicAndPartition, Option<String> option) {
        long resetOffset;
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Adding new topic and partition %s to queue for %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topicAndPartition, this.host()}));
        });
        if (((java.util.Map) JavaConverters$.MODULE$.mapAsJavaConcurrentMapConverter(nextOffsets()).asJava()).containsKey(topicAndPartition)) {
            throw toss(new StringOps(Predef$.MODULE$.augmentString("Already consuming TopicPartition %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topicAndPartition})));
        }
        if (option.isDefined() && this.offsetGetter.isValidOffset(simpleConsumer(), topicAndPartition, (String) option.get())) {
            resetOffset = new StringOps(Predef$.MODULE$.augmentString((String) option.get())).toLong();
        } else {
            warn(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{option, topicAndPartition}));
            });
            resetOffset = this.offsetGetter.getResetOffset(simpleConsumer(), topicAndPartition);
        }
        long j = resetOffset;
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got offset %s for new topic and partition %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(j), topicAndPartition}));
        });
        nextOffsets().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicAndPartition), BoxesRunTime.boxToLong(j)));
        return BoxesRunTime.unboxToInt(metrics().topicPartitions().get(new Tuple2(host(), BoxesRunTime.boxToInteger(port()))).set(BoxesRunTime.boxToInteger(nextOffsets().size())));
    }

    public Option<Object> removeTopicPartition(TopicAndPartition topicAndPartition) {
        if (!((java.util.Map) JavaConverters$.MODULE$.mapAsJavaConcurrentMapConverter(nextOffsets()).asJava()).containsKey(topicAndPartition)) {
            warn(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Asked to remove topic and partition %s, but not in map (keys = %s)")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topicAndPartition, this.nextOffsets().keys().mkString(",")}));
            });
            return None$.MODULE$;
        }
        Option<Object> remove = nextOffsets().remove(topicAndPartition);
        metrics().topicPartitions().get(new Tuple2(host(), BoxesRunTime.boxToInteger(port()))).set(BoxesRunTime.boxToInteger(nextOffsets().size()));
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Removed %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topicAndPartition}));
        });
        return remove;
    }

    public Thread thread() {
        return this.thread;
    }

    public void org$apache$samza$system$kafka_deprecated$BrokerProxy$$fetchMessages() {
        List list = nextOffsets().filterKeys(topicAndPartition -> {
            return BoxesRunTime.boxToBoolean($anonfun$fetchMessages$1(this, topicAndPartition));
        }).toList();
        if (list.size() <= 0) {
            refreshLatencyMetrics();
            debug(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.host(), BoxesRunTime.boxToInteger(this.port()), BoxesRunTime.boxToInteger(this.sleepMSWhileNoTopicPartitions())}));
            });
            metrics().brokerSkippedFetchRequests().get(new Tuple2(host(), BoxesRunTime.boxToInteger(port()))).inc();
            Thread.sleep(sleepMSWhileNoTopicPartitions());
            return;
        }
        metrics().brokerReads().get(new Tuple2(host(), BoxesRunTime.boxToInteger(port()))).inc();
        FetchResponse defaultFetch = simpleConsumer().defaultFetch(list);
        firstCall_$eq(false);
        firstCallBarrier().countDown();
        Tuple2 partition = defaultFetch.data().toSet().partition(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$fetchMessages$2(tuple2));
        });
        if (partition == null) {
            throw new MatchError(partition);
        }
        Tuple2 tuple22 = new Tuple2((Set) partition._1(), (Set) partition._2());
        Set set = (Set) tuple22._1();
        handleErrors((Set) tuple22._2(), defaultFetch);
        set.foreach(tuple23 -> {
            if (tuple23 != null) {
                return this.moveMessagesToTheirQueue((TopicAndPartition) tuple23._1(), (FetchResponsePartitionData) tuple23._2());
            }
            throw new MatchError(tuple23);
        });
    }

    public void abdicate(TopicAndPartition topicAndPartition) {
        Some removeTopicPartition = removeTopicPartition(topicAndPartition);
        if (removeTopicPartition instanceof Some) {
            messageSink().abdicate(topicAndPartition, BoxesRunTime.unboxToLong(removeTopicPartition.value()));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(removeTopicPartition)) {
                throw new MatchError(removeTopicPartition);
            }
            warn(() -> {
                return "Tried to abdicate for topic partition not in map. Removed in interim?";
            });
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public void abdicateAll() {
        info(() -> {
            return "Abdicating all topic partitions.";
        });
        nextOffsets().toMap(Predef$.MODULE$.$conforms()).keySet().foreach(topicAndPartition -> {
            this.abdicate(topicAndPartition);
            return BoxedUnit.UNIT;
        });
    }

    public void handleErrors(Set<Tuple2<TopicAndPartition, FetchResponsePartitionData>> set, FetchResponse fetchResponse) {
        new LazyRef();
        Tuple2 partition = ((Set) set.withFilter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$handleErrors$1(tuple2));
        }).flatMap(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError(tuple22);
            }
            TopicAndPartition topicAndPartition = (TopicAndPartition) tuple22._1();
            return Option$.MODULE$.option2Iterable(Option$.MODULE$.apply(fetchResponse.error(topicAndPartition.topic(), topicAndPartition.partition())).map(errors -> {
                return new BrokerProxy$Error$3(this, topicAndPartition, errors.code(), errors.exception());
            }));
        }, Set$.MODULE$.canBuildFrom())).partition(brokerProxy$Error$3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$handleErrors$4(brokerProxy$Error$3));
        });
        if (partition == null) {
            throw new MatchError(partition);
        }
        Tuple2 tuple23 = new Tuple2((Set) partition._1(), (Set) partition._2());
        Set set2 = (Set) tuple23._1();
        Tuple2 partition2 = ((Set) tuple23._2()).partition(brokerProxy$Error$32 -> {
            return BoxesRunTime.boxToBoolean($anonfun$handleErrors$5(brokerProxy$Error$32));
        });
        if (partition2 == null) {
            throw new MatchError(partition2);
        }
        Tuple2 tuple24 = new Tuple2((Set) partition2._1(), (Set) partition2._2());
        Set set3 = (Set) tuple24._1();
        Set set4 = (Set) tuple24._2();
        set4.foreach(brokerProxy$Error$33 -> {
            $anonfun$handleErrors$6(this, set4, brokerProxy$Error$33);
            return BoxedUnit.UNIT;
        });
        set2.foreach(brokerProxy$Error$34 -> {
            $anonfun$handleErrors$8(this, brokerProxy$Error$34);
            return BoxedUnit.UNIT;
        });
        set3.foreach(brokerProxy$Error$35 -> {
            this.warn(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Received OffsetOutOfRange exception for %s. Current offset = %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{brokerProxy$Error$35.tp(), this.nextOffsets().getOrElse(brokerProxy$Error$35.tp(), () -> {
                    return "not found in map, likely removed in the interim";
                })}));
            });
            try {
                return this.nextOffsets().replace(brokerProxy$Error$35.tp(), BoxesRunTime.boxToLong(this.offsetGetter.getResetOffset(this.simpleConsumer(), brokerProxy$Error$35.tp())));
            } catch (Throwable th) {
                if (!(th instanceof UnknownTopicOrPartitionException ? true : th instanceof NotLeaderForPartitionException)) {
                    throw th;
                }
                this.warn(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToShort(brokerProxy$Error$35.code()), brokerProxy$Error$35.tp()}));
                });
                this.abdicate(brokerProxy$Error$35.tp());
                return BoxedUnit.UNIT;
            }
        });
    }

    public Object moveMessagesToTheirQueue(TopicAndPartition topicAndPartition, FetchResponsePartitionData fetchResponsePartitionData) {
        MessageSet messages = fetchResponsePartitionData.messages();
        LongRef create = LongRef.create(BoxesRunTime.unboxToLong(nextOffsets().apply(topicAndPartition)));
        messageSink().setIsAtHighWatermark(topicAndPartition, fetchResponsePartitionData.hw() == 0 || fetchResponsePartitionData.hw() == create.elem);
        Predef$.MODULE$.require(messages != null);
        messages.iterator().foreach(messageAndOffset -> {
            $anonfun$moveMessagesToTheirQueue$1(this, topicAndPartition, fetchResponsePartitionData, create, messageAndOffset);
            return BoxedUnit.UNIT;
        });
        nextOffsets().replace(topicAndPartition, BoxesRunTime.boxToLong(create.elem));
        long hw = fetchResponsePartitionData.hw();
        if (hw >= 0) {
            metrics().highWatermark().get(topicAndPartition).set(BoxesRunTime.boxToLong(hw));
            return metrics().lag().get(topicAndPartition).set(BoxesRunTime.boxToLong(hw - create.elem));
        }
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got a high water mark less than 0 (%d) for %s, so skipping.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(hw), topicAndPartition}));
        });
        return BoxedUnit.UNIT;
    }

    public String toString() {
        return new StringOps(Predef$.MODULE$.augmentString("BrokerProxy for %s:%d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{host(), BoxesRunTime.boxToInteger(port())}));
    }

    public void start() {
        if (thread().isAlive()) {
            debug(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Tried to start an already started broker proxy (%s). Ignoring.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.toString()}));
            });
            return;
        }
        info(() -> {
            return "Starting " + this.toString();
        });
        thread().setDaemon(true);
        thread().setName("Samza BrokerProxy " + thread().getName());
        thread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(this) { // from class: org.apache.samza.system.kafka_deprecated.BrokerProxy$$anon$2
            private final /* synthetic */ BrokerProxy $outer;

            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                this.$outer.error(() -> {
                    return "Uncaught exception in broker proxy:";
                }, () -> {
                    return th;
                });
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
        thread().start();
    }

    public void stop() {
        info(() -> {
            return "Shutting down " + this.toString();
        });
        if (simpleConsumer() != null) {
            info(() -> {
                return "closing simple consumer...";
            });
            simpleConsumer().close();
        }
        thread().interrupt();
        thread().join();
    }

    private void refreshLatencyMetrics() {
        nextOffsets().foreach(tuple2 -> {
            Object obj;
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            TopicAndPartition topicAndPartition = (TopicAndPartition) tuple2._1();
            long _2$mcJ$sp = tuple2._2$mcJ$sp();
            long earliestOrLatestOffset = this.simpleConsumer().earliestOrLatestOffset(topicAndPartition, -1L, Request$.MODULE$.OrdinaryConsumerId());
            this.trace(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("latest offset of %s is %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topicAndPartition, BoxesRunTime.boxToLong(earliestOrLatestOffset)}));
            });
            if (earliestOrLatestOffset >= 0) {
                if (this.metrics().highWatermark().containsKey(topicAndPartition)) {
                    this.metrics().highWatermark().get(topicAndPartition).set(BoxesRunTime.boxToLong(earliestOrLatestOffset));
                } else {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                }
                obj = this.metrics().lag().containsKey(topicAndPartition) ? this.metrics().lag().get(topicAndPartition).set(BoxesRunTime.boxToLong(earliestOrLatestOffset - _2$mcJ$sp)) : BoxedUnit.UNIT;
            } else {
                obj = BoxedUnit.UNIT;
            }
            return obj;
        });
    }

    public static final /* synthetic */ boolean $anonfun$fetchMessages$1(BrokerProxy brokerProxy, TopicAndPartition topicAndPartition) {
        return brokerProxy.messageSink().needsMoreMessages(topicAndPartition);
    }

    public static final /* synthetic */ boolean $anonfun$fetchMessages$2(Tuple2 tuple2) {
        return ((FetchResponsePartitionData) tuple2._2()).error().code() == ErrorMapping$.MODULE$.NoError();
    }

    private final /* synthetic */ BrokerProxy$Error$4$ Error$lzycompute$1(LazyRef lazyRef) {
        BrokerProxy$Error$4$ brokerProxy$Error$4$;
        synchronized (lazyRef) {
            brokerProxy$Error$4$ = lazyRef.initialized() ? (BrokerProxy$Error$4$) lazyRef.value() : (BrokerProxy$Error$4$) lazyRef.initialize(new BrokerProxy$Error$4$(this));
        }
        return brokerProxy$Error$4$;
    }

    private final BrokerProxy$Error$4$ Error$2(LazyRef lazyRef) {
        return lazyRef.initialized() ? (BrokerProxy$Error$4$) lazyRef.value() : Error$lzycompute$1(lazyRef);
    }

    public static final /* synthetic */ boolean $anonfun$handleErrors$1(Tuple2 tuple2) {
        return tuple2 != null;
    }

    public static final /* synthetic */ boolean $anonfun$handleErrors$4(BrokerProxy$Error$3 brokerProxy$Error$3) {
        return brokerProxy$Error$3.code() == ErrorMapping$.MODULE$.NotLeaderForPartitionCode() || brokerProxy$Error$3.code() == ErrorMapping$.MODULE$.UnknownTopicOrPartitionCode();
    }

    public static final /* synthetic */ boolean $anonfun$handleErrors$5(BrokerProxy$Error$3 brokerProxy$Error$3) {
        return brokerProxy$Error$3.code() == ErrorMapping$.MODULE$.OffsetOutOfRangeCode();
    }

    public static final /* synthetic */ void $anonfun$handleErrors$6(BrokerProxy brokerProxy, Set set, BrokerProxy$Error$3 brokerProxy$Error$3) {
        brokerProxy.warn(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{set.mkString(",")}));
        });
        KafkaUtil$.MODULE$.maybeThrowException(brokerProxy$Error$3.exception());
    }

    public static final /* synthetic */ void $anonfun$handleErrors$8(BrokerProxy brokerProxy, BrokerProxy$Error$3 brokerProxy$Error$3) {
        brokerProxy.warn(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToShort(brokerProxy$Error$3.code()), brokerProxy$Error$3.tp()}));
        });
        brokerProxy.abdicate(brokerProxy$Error$3.tp());
    }

    public static final /* synthetic */ void $anonfun$moveMessagesToTheirQueue$1(BrokerProxy brokerProxy, TopicAndPartition topicAndPartition, FetchResponsePartitionData fetchResponsePartitionData, LongRef longRef, MessageAndOffset messageAndOffset) {
        brokerProxy.messageSink().addMessage(topicAndPartition, messageAndOffset, fetchResponsePartitionData.hw());
        longRef.elem = messageAndOffset.nextOffset();
        int payloadSize = messageAndOffset.message().payloadSize() + messageAndOffset.message().keySize();
        brokerProxy.metrics().reads().get(topicAndPartition).inc();
        brokerProxy.metrics().bytesRead().get(topicAndPartition).inc(payloadSize);
        brokerProxy.metrics().brokerBytesRead().get(new Tuple2(brokerProxy.host(), BoxesRunTime.boxToInteger(brokerProxy.port()))).inc(payloadSize);
        brokerProxy.metrics().offsets().get(topicAndPartition).set(longRef.elem);
    }

    public BrokerProxy(String str, int i, String str2, String str3, KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics, MessageSink messageSink, int i2, int i3, StreamFetchSizes streamFetchSizes, int i4, int i5, GetOffset getOffset) {
        this.host = str;
        this.port = i;
        this.system = str2;
        this.clientID = str3;
        this.metrics = kafkaSystemConsumerMetrics;
        this.messageSink = messageSink;
        this.timeout = i2;
        this.bufferSize = i3;
        this.fetchSize = streamFetchSizes;
        this.consumerMinSize = i4;
        this.consumerMaxWait = i5;
        this.offsetGetter = getOffset;
        Toss.$init$(this);
        Logging.$init$(this);
        this.sleepMSWhileNoTopicPartitions = 100;
        this.nextOffsets = (Map) JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter(new ConcurrentHashMap()).asScala();
        this.firstCallBarrier = new CountDownLatch(1);
        this.firstCall = true;
        this.simpleConsumer = createSimpleConsumer();
        kafkaSystemConsumerMetrics.registerBrokerProxy(str, i);
        this.thread = new Thread(new Runnable(this) { // from class: org.apache.samza.system.kafka_deprecated.BrokerProxy$$anon$1
            private final /* synthetic */ BrokerProxy $outer;

            @Override // java.lang.Runnable
            public void run() {
                BooleanRef create = BooleanRef.create(false);
                try {
                    new ExponentialSleepStrategy().run(retryLoop -> {
                        $anonfun$run$1(this, create, retryLoop);
                        return BoxedUnit.UNIT;
                    }, (exc, retryLoop2) -> {
                        $anonfun$run$3(this, create, exc, retryLoop2);
                        return BoxedUnit.UNIT;
                    });
                } catch (InterruptedException e) {
                    this.$outer.info(() -> {
                        return "Got interrupt exception in broker proxy thread.";
                    });
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } catch (OutOfMemoryError e2) {
                    throw new SamzaException("Got out of memory error in broker proxy thread.");
                } catch (StackOverflowError e3) {
                    throw new SamzaException("Got stack overflow error in broker proxy thread.");
                } catch (ClosedByInterruptException e4) {
                    this.$outer.info(() -> {
                        return "Got closed by interrupt exception in broker proxy thread.";
                    });
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
                if (Thread.currentThread().isInterrupted()) {
                    this.$outer.info(() -> {
                        return "Shutting down due to interrupt.";
                    });
                }
            }

            public static final /* synthetic */ void $anonfun$run$1(BrokerProxy$$anon$1 brokerProxy$$anon$1, BooleanRef booleanRef, ExponentialSleepStrategy.RetryLoop retryLoop) {
                if (booleanRef.elem) {
                    brokerProxy$$anon$1.$outer.metrics().reconnects().get(new Tuple2(brokerProxy$$anon$1.$outer.host(), BoxesRunTime.boxToInteger(brokerProxy$$anon$1.$outer.port()))).inc();
                    brokerProxy$$anon$1.$outer.simpleConsumer().close();
                    brokerProxy$$anon$1.$outer.simpleConsumer_$eq(brokerProxy$$anon$1.$outer.createSimpleConsumer());
                }
                while (!Thread.currentThread().isInterrupted()) {
                    brokerProxy$$anon$1.$outer.messageSink().refreshDropped();
                    if (brokerProxy$$anon$1.$outer.nextOffsets().size() == 0) {
                        brokerProxy$$anon$1.$outer.debug(() -> {
                            return "No TopicPartitions to fetch. Sleeping.";
                        });
                        Thread.sleep(brokerProxy$$anon$1.$outer.sleepMSWhileNoTopicPartitions());
                    } else {
                        brokerProxy$$anon$1.$outer.org$apache$samza$system$kafka_deprecated$BrokerProxy$$fetchMessages();
                        retryLoop.reset();
                    }
                }
            }

            public static final /* synthetic */ void $anonfun$run$3(BrokerProxy$$anon$1 brokerProxy$$anon$1, BooleanRef booleanRef, Exception exc, ExponentialSleepStrategy.RetryLoop retryLoop) {
                brokerProxy$$anon$1.$outer.warn(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{exc}));
                });
                brokerProxy$$anon$1.$outer.debug(() -> {
                    return "Exception detail:";
                }, () -> {
                    return exc;
                });
                brokerProxy$$anon$1.$outer.abdicateAll();
                booleanRef.elem = true;
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        }, new StringOps(Predef$.MODULE$.augmentString("BrokerProxy thread pointed at %s:%d for client %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str, BoxesRunTime.boxToInteger(i), str3})));
    }
}
