package org.apache.samza.system.kafka;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.samza.SamzaException;
import org.apache.samza.metrics.Timer;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.kafka.KafkaSystemProducer;
import org.apache.samza.util.ExponentialSleepStrategy;
import org.apache.samza.util.KafkaUtil$;
import org.apache.samza.util.Logging;
import org.apache.samza.util.TimerUtils;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: KafkaSystemProducer.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-h\u0001B\u0001\u0003\u00015\u00111cS1gW\u0006\u001c\u0016p\u001d;f[B\u0013x\u000eZ;dKJT!a\u0001\u0003\u0002\u000b-\fgm[1\u000b\u0005\u00151\u0011AB:zgR,WN\u0003\u0002\b\u0011\u0005)1/Y7{C*\u0011\u0011BC\u0001\u0007CB\f7\r[3\u000b\u0003-\t1a\u001c:h\u0007\u0001\u0019R\u0001\u0001\b\u00175\u0001\u0002\"a\u0004\u000b\u000e\u0003AQ!!\u0005\n\u0002\t1\fgn\u001a\u0006\u0002'\u0005!!.\u0019<b\u0013\t)\u0002C\u0001\u0004PE*,7\r\u001e\t\u0003/ai\u0011\u0001B\u0005\u00033\u0011\u0011abU=ti\u0016l\u0007K]8ek\u000e,'\u000f\u0005\u0002\u001c=5\tAD\u0003\u0002\u001e\r\u0005!Q\u000f^5m\u0013\tyBDA\u0004M_\u001e<\u0017N\\4\u0011\u0005m\t\u0013B\u0001\u0012\u001d\u0005)!\u0016.\\3s+RLGn\u001d\u0005\tI\u0001\u0011\t\u0011)A\u0005K\u0005Q1/_:uK6t\u0015-\\3\u0011\u0005\u0019zcBA\u0014.!\tA3&D\u0001*\u0015\tQC\"\u0001\u0004=e>|GO\u0010\u0006\u0002Y\u0005)1oY1mC&\u0011afK\u0001\u0007!J,G-\u001a4\n\u0005A\n$AB*ue&twM\u0003\u0002/W!A1\u0007\u0001B\u0001B\u0003%A'\u0001\u0007sKR\u0014\u0018PQ1dW>4g\r\u0005\u0002\u001ck%\u0011a\u0007\b\u0002\u0019\u000bb\u0004xN\\3oi&\fGn\u00157fKB\u001cFO]1uK\u001eL\b\u0002\u0003\u001d\u0001\u0005\u0003\u0005\u000b\u0011B\u001d\u0002\u0017\u001d,G\u000f\u0015:pIV\u001cWM\u001d\t\u0004umjT\"A\u0016\n\u0005qZ#!\u0003$v]\u000e$\u0018n\u001c81!\u0011qDI\u0012$\u000e\u0003}R!\u0001Q!\u0002\u0011A\u0014x\u000eZ;dKJT!AQ\"\u0002\u000f\rd\u0017.\u001a8ug*\u00111\u0001C\u0005\u0003\u000b~\u0012\u0001\u0002\u0015:pIV\u001cWM\u001d\t\u0004u\u001dK\u0015B\u0001%,\u0005\u0015\t%O]1z!\tQ$*\u0003\u0002LW\t!!)\u001f;f\u0011!i\u0005A!A!\u0002\u0013q\u0015aB7fiJL7m\u001d\t\u0003\u001fBk\u0011AA\u0005\u0003#\n\u0011!dS1gW\u0006\u001c\u0016p\u001d;f[B\u0013x\u000eZ;dKJlU\r\u001e:jGND\u0001b\u0015\u0001\u0003\u0006\u0004%\t\u0001V\u0001\u0006G2|7m[\u000b\u0002+B\u0019!h\u000f,\u0011\u0005i:\u0016B\u0001-,\u0005\u0011auN\\4\t\u0011i\u0003!\u0011!Q\u0001\nU\u000baa\u00197pG.\u0004\u0003\"\u0002/\u0001\t\u0003i\u0016A\u0002\u001fj]&$h\b\u0006\u0004_?\u0002\f'm\u0019\t\u0003\u001f\u0002AQ\u0001J.A\u0002\u0015BqaM.\u0011\u0002\u0003\u0007A\u0007C\u000397\u0002\u0007\u0011\bC\u0003N7\u0002\u0007a\nC\u0004T7B\u0005\t\u0019A+\u0007\t\u0015\u0004\u0001A\u001a\u0002\u000b'>,(oY3ECR\f7C\u00013h!\tQ\u0004.\u0003\u0002jW\t1\u0011I\\=SK\u001aDQ\u0001\u00183\u0005\u0002-$\u0012\u0001\u001c\t\u0003[\u0012l\u0011\u0001\u0001\u0005\b_\u0012\u0014\r\u0011\"\u0001q\u0003!\u0019XM\u001c3M_\u000e\\W#\u0001\b\t\rI$\u0007\u0015!\u0003\u000f\u0003%\u0019XM\u001c3M_\u000e\\\u0007\u0005C\u0004uI\u0002\u0007I\u0011A;\u0002\u00191\fG/Z:u\rV$XO]3\u0016\u0003Y\u00042a^>~\u001b\u0005A(BA={\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003;II!\u0001 =\u0003\r\u0019+H/\u001e:f!\tqd0\u0003\u0002��\u007f\tq!+Z2pe\u0012lU\r^1eCR\f\u0007\"CA\u0002I\u0002\u0007I\u0011AA\u0003\u0003Aa\u0017\r^3ti\u001a+H/\u001e:f?\u0012*\u0017\u000f\u0006\u0003\u0002\b\u00055\u0001c\u0001\u001e\u0002\n%\u0019\u00111B\u0016\u0003\tUs\u0017\u000e\u001e\u0005\n\u0003\u001f\t\t!!AA\u0002Y\f1\u0001\u001f\u00132\u0011\u001d\t\u0019\u0002\u001aQ!\nY\fQ\u0002\\1uKN$h)\u001e;ve\u0016\u0004\u0003\u0006BA\t\u0003/\u00012AOA\r\u0013\r\tYb\u000b\u0002\tm>d\u0017\r^5mK\"I\u0011q\u00043A\u0002\u0013\u0005\u0011\u0011E\u0001\u0014Kb\u001cW\r\u001d;j_:LenQ1mY\n\f7m[\u000b\u0003\u0003G\u0001b!!\n\u0002,\u0005=RBAA\u0014\u0015\r\tI\u0003_\u0001\u0007CR|W.[2\n\t\u00055\u0012q\u0005\u0002\u0010\u0003R|W.[2SK\u001a,'/\u001a8dKB!\u0011\u0011GA\u001a\u001b\u00051\u0011bAA\u001b\r\tq1+Y7{C\u0016C8-\u001a9uS>t\u0007\"CA\u001dI\u0002\u0007I\u0011AA\u001e\u0003])\u0007pY3qi&|g.\u00138DC2d'-Y2l?\u0012*\u0017\u000f\u0006\u0003\u0002\b\u0005u\u0002BCA\b\u0003o\t\t\u00111\u0001\u0002$!A\u0011\u0011\t3!B\u0013\t\u0019#\u0001\u000bfq\u000e,\u0007\u000f^5p]&s7)\u00197mE\u0006\u001c7\u000e\t\u0005\t\u0001\u0002\u0001\r\u0011\"\u0001\u0002FU\tQ\bC\u0005\u0002J\u0001\u0001\r\u0011\"\u0001\u0002L\u0005a\u0001O]8ek\u000e,'o\u0018\u0013fcR!\u0011qAA'\u0011%\ty!a\u0012\u0002\u0002\u0003\u0007Q\bC\u0004\u0002R\u0001\u0001\u000b\u0015B\u001f\u0002\u0013A\u0014x\u000eZ;dKJ\u0004\u0003\u0006BA(\u0003/A\u0001\"a\u0016\u0001\u0001\u0004%\t\u0001]\u0001\raJ|G-^2fe2{7m\u001b\u0005\n\u00037\u0002\u0001\u0019!C\u0001\u0003;\n\u0001\u0003\u001d:pIV\u001cWM\u001d'pG.|F%Z9\u0015\t\u0005\u001d\u0011q\f\u0005\n\u0003\u001f\tI&!AA\u00029Aq!a\u0019\u0001A\u0003&a\"A\u0007qe>$WoY3s\u0019>\u001c7\u000e\t\u0005\n\u0003O\u0002!\u0019!C\u0001\u0003S\nQd\u0015;sK\u0006lg*Y7f\u001dVdGn\u0014:F[B$\u00180\u0012:s_Jl5oZ\u000b\u0003\u0003W\u00022aDA7\u0013\t\u0001\u0004\u0003\u0003\u0005\u0002r\u0001\u0001\u000b\u0011BA6\u0003y\u0019FO]3b[:\u000bW.\u001a(vY2|%/R7qif,%O]8s\u001bN<\u0007\u0005C\u0005\u0002v\u0001\u0011\r\u0011\"\u0001\u0002x\u000591o\\;sG\u0016\u001cXCAA=!\u00159\u00181P\u0013m\u0013\r\ti\b\u001f\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\b\u0002CAA\u0001\u0001\u0006I!!\u001f\u0002\u0011M|WO]2fg\u0002Bq!!\"\u0001\t\u0003\t9)A\u0003ti\u0006\u0014H\u000f\u0006\u0002\u0002\b!9\u00111\u0012\u0001\u0005\u0002\u0005\u001d\u0015\u0001B:u_BDq!a$\u0001\t\u0003\t\t*\u0001\u0005sK\u001eL7\u000f^3s)\u0011\t9!a%\t\u000f\u0005U\u0015Q\u0012a\u0001K\u000511o\\;sG\u0016Dq!!'\u0001\t\u0003\tY*\u0001\u0010dY>\u001cX-\u00118e\u001dVdG.\u001b4z\u0007V\u0014(/\u001a8u!J|G-^2feR!\u0011qAAO\u0011\u001d\ty*a&A\u0002u\nqbY;se\u0016tG\u000f\u0015:pIV\u001cWM\u001d\u0005\b\u0003G\u0003A\u0011AAS\u0003\u0011\u0019XM\u001c3\u0015\r\u0005\u001d\u0011qUAU\u0011\u001d\t)*!)A\u0002\u0015B\u0001\"a+\u0002\"\u0002\u0007\u0011QV\u0001\tK:4X\r\\8qKB\u0019q#a,\n\u0007\u0005EFAA\fPkR<w.\u001b8h\u001b\u0016\u001c8/Y4f\u000b:4X\r\\8qK\"9\u0011Q\u0017\u0001\u0005\u0002\u0005]\u0016!\u00024mkNDG\u0003BA\u0004\u0003sCq!!&\u00024\u0002\u0007QeB\u0005\u0002>\n\t\t\u0011#\u0001\u0002@\u0006\u00192*\u00194lCNK8\u000f^3n!J|G-^2feB\u0019q*!1\u0007\u0011\u0005\u0011\u0011\u0011!E\u0001\u0003\u0007\u001c2!!1h\u0011\u001da\u0016\u0011\u0019C\u0001\u0003\u000f$\"!a0\t\u0015\u0005-\u0017\u0011YI\u0001\n\u0003\ti-A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$HEM\u000b\u0003\u0003\u001fT3\u0001NAiW\t\t\u0019\u000e\u0005\u0003\u0002V\u0006}WBAAl\u0015\u0011\tI.a7\u0002\u0013Ut7\r[3dW\u0016$'bAAoW\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005\u0005\u0018q\u001b\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0007BCAs\u0003\u0003\f\n\u0011\"\u0001\u0002h\u0006YB\u0005\\3tg&t\u0017\u000e\u001e\u0013he\u0016\fG/\u001a:%I\u00164\u0017-\u001e7uIU*\"!!;+\u0007U\u000b\t\u000e")
/* loaded from: input_file:org/apache/samza/system/kafka/KafkaSystemProducer.class */
public class KafkaSystemProducer implements SystemProducer, Logging, TimerUtils {
    public final String org$apache$samza$system$kafka$KafkaSystemProducer$$systemName;
    private final Function0<Producer<byte[], byte[]>> getProducer;
    public final KafkaSystemProducerMetrics org$apache$samza$system$kafka$KafkaSystemProducer$$metrics;
    private final Function0<Object> clock;
    private volatile Producer<byte[], byte[]> producer;
    private Object producerLock;
    private final String StreamNameNullOrEmptyErrorMsg;
    private final ConcurrentHashMap<String, SourceData> sources;
    private final String loggerName;
    private Logger logger;
    private final String startupLoggerName;
    private Logger startupLogger;
    private volatile byte bitmap$0;

    /* compiled from: KafkaSystemProducer.scala */
    /* loaded from: input_file:org/apache/samza/system/kafka/KafkaSystemProducer$SourceData.class */
    public class SourceData {
        private final Object sendLock;
        private volatile Future<RecordMetadata> latestFuture;
        private AtomicReference<SamzaException> exceptionInCallback;
        public final /* synthetic */ KafkaSystemProducer $outer;

        public Object sendLock() {
            return this.sendLock;
        }

        public Future<RecordMetadata> latestFuture() {
            return this.latestFuture;
        }

        public void latestFuture_$eq(Future<RecordMetadata> future) {
            this.latestFuture = future;
        }

        public AtomicReference<SamzaException> exceptionInCallback() {
            return this.exceptionInCallback;
        }

        public void exceptionInCallback_$eq(AtomicReference<SamzaException> atomicReference) {
            this.exceptionInCallback = atomicReference;
        }

        public /* synthetic */ KafkaSystemProducer org$apache$samza$system$kafka$KafkaSystemProducer$SourceData$$$outer() {
            return this.$outer;
        }

        public SourceData(KafkaSystemProducer kafkaSystemProducer) {
            if (kafkaSystemProducer == null) {
                throw null;
            }
            this.$outer = kafkaSystemProducer;
            this.sendLock = new Object();
            this.latestFuture = null;
            this.exceptionInCallback = new AtomicReference<>();
        }
    }

    public <T> T updateTimer(Timer timer, Function0<T> function0) {
        return (T) TimerUtils.updateTimer$(this, timer, function0);
    }

    public long updateTimerAndGetDuration(Timer timer, Function1<Object, BoxedUnit> function1) {
        return TimerUtils.updateTimerAndGetDuration$(this, timer, function1);
    }

    public void startupLog(Function0<Object> function0) {
        Logging.startupLog$(this, function0);
    }

    public void trace(Function0<Object> function0) {
        Logging.trace$(this, function0);
    }

    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.trace$(this, function0, function02);
    }

    public void debug(Function0<Object> function0) {
        Logging.debug$(this, function0);
    }

    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.debug$(this, function0, function02);
    }

    public void info(Function0<Object> function0) {
        Logging.info$(this, function0);
    }

    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.info$(this, function0, function02);
    }

    public void warn(Function0<Object> function0) {
        Logging.warn$(this, function0);
    }

    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.warn$(this, function0, function02);
    }

    public void error(Function0<Object> function0) {
        Logging.error$(this, function0);
    }

    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.error$(this, function0, function02);
    }

    public void putMDC(Function0<String> function0, Function0<String> function02) {
        Logging.putMDC$(this, function0, function02);
    }

    public String getMDC(Function0<String> function0) {
        return Logging.getMDC$(this, function0);
    }

    public void removeMDC(Function0<String> function0) {
        Logging.removeMDC$(this, function0);
    }

    public void clearMDC() {
        Logging.clearMDC$(this);
    }

    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.kafka.KafkaSystemProducer] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.logger = Logging.logger$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? logger$lzycompute() : this.logger;
    }

    public String startupLoggerName() {
        return this.startupLoggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.kafka.KafkaSystemProducer] */
    private Logger startupLogger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.startupLogger = Logging.startupLogger$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.startupLogger;
    }

    public Logger startupLogger() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? startupLogger$lzycompute() : this.startupLogger;
    }

    public void org$apache$samza$util$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    public void org$apache$samza$util$Logging$_setter_$startupLoggerName_$eq(String str) {
        this.startupLoggerName = str;
    }

    public Function0<Object> clock() {
        return this.clock;
    }

    public Producer<byte[], byte[]> producer() {
        return this.producer;
    }

    public void producer_$eq(Producer<byte[], byte[]> producer) {
        this.producer = producer;
    }

    public Object producerLock() {
        return this.producerLock;
    }

    public void producerLock_$eq(Object obj) {
        this.producerLock = obj;
    }

    public String StreamNameNullOrEmptyErrorMsg() {
        return this.StreamNameNullOrEmptyErrorMsg;
    }

    public ConcurrentHashMap<String, SourceData> sources() {
        return this.sources;
    }

    public void start() {
    }

    /* JADX WARN: Code restructure failed: missing block: B:19:0x0026, code lost:
    
        if (r0.equals(r1) != false) goto L13;
     */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v5, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v9, types: [org.apache.samza.system.kafka.KafkaSystemProducer] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void stop() {
        /*
            r4 = this;
            r0 = r4
            org.apache.kafka.clients.producer.Producer r0 = r0.producer()     // Catch: java.lang.Exception -> L60
            r5 = r0
            r0 = r5
            if (r0 == 0) goto L5d
            r0 = r4
            java.lang.Object r0 = r0.producerLock()     // Catch: java.lang.Exception -> L60
            r1 = r0
            r6 = r1
            monitor-enter(r0)     // Catch: java.lang.Exception -> L60
            r0 = r5
            r1 = r4
            org.apache.kafka.clients.producer.Producer r1 = r1.producer()     // Catch: java.lang.Throwable -> L36 java.lang.Exception -> L60
            r7 = r1
            r1 = r0
            if (r1 != 0) goto L22
        L1b:
            r0 = r7
            if (r0 == 0) goto L29
            goto L31
        L22:
            r1 = r7
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> L36 java.lang.Exception -> L60
            if (r0 == 0) goto L31
        L29:
            r0 = r4
            r1 = 0
            r0.producer_$eq(r1)     // Catch: java.lang.Throwable -> L36 java.lang.Exception -> L60
            goto L31
        L31:
            r0 = r6
            monitor-exit(r0)     // Catch: java.lang.Exception -> L60
            goto L39
        L36:
            r1 = move-exception
            monitor-exit(r1)     // Catch: java.lang.Exception -> L60
            throw r0     // Catch: java.lang.Exception -> L60
        L39:
            r0 = r5
            r0.close()     // Catch: java.lang.Exception -> L60
            scala.collection.JavaConverters$ r0 = scala.collection.JavaConverters$.MODULE$     // Catch: java.lang.Exception -> L60
            r1 = r4
            java.util.concurrent.ConcurrentHashMap r1 = r1.sources()     // Catch: java.lang.Exception -> L60
            scala.collection.convert.Decorators$AsScala r0 = r0.mapAsScalaConcurrentMapConverter(r1)     // Catch: java.lang.Exception -> L60
            java.lang.Object r0 = r0.asScala()     // Catch: java.lang.Exception -> L60
            scala.collection.IterableLike r0 = (scala.collection.IterableLike) r0     // Catch: java.lang.Exception -> L60
            r1 = r4
            void r1 = (v1) -> { // scala.Function1.apply(java.lang.Object):java.lang.Object
                return $anonfun$stop$1$adapted(r1, v1);
            }     // Catch: java.lang.Exception -> L60
            r0.foreach(r1)     // Catch: java.lang.Exception -> L60
            goto L5d
        L5d:
            goto L77
        L60:
            r8 = move-exception
            r0 = r4
            r1 = r8
            void r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$stop$2(r1);
            }
            r2 = r8
            void r2 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$stop$3(r2);
            }
            r0.error(r1, r2)
            goto L77
        L77:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.samza.system.kafka.KafkaSystemProducer.stop():void");
    }

    public void register(String str) {
        if (sources().putIfAbsent(str, new SourceData(this)) != null) {
            throw new SamzaException(new StringOps(Predef$.MODULE$.augmentString("%s is already registered with the %s system producer")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str, this.org$apache$samza$system$kafka$KafkaSystemProducer$$systemName})));
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:18:0x003c, code lost:
    
        if (r5.equals(r1) != false) goto L14;
     */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v3, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v6, types: [org.apache.samza.system.kafka.KafkaSystemProducer] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void closeAndNullifyCurrentProducer(org.apache.kafka.clients.producer.Producer<byte[], byte[]> r5) {
        /*
            r4 = this;
            r0 = r5
            r0.close()     // Catch: java.lang.Exception -> L9
            goto L1c
        L9:
            r6 = move-exception
            r0 = r4
            void r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$closeAndNullifyCurrentProducer$1();
            }
            r2 = r6
            void r2 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$closeAndNullifyCurrentProducer$2(r2);
            }
            r0.error(r1, r2)
            goto L1c
        L1c:
            r0 = r4
            java.lang.Object r0 = r0.producerLock()
            r1 = r0
            r7 = r1
            monitor-enter(r0)
            r0 = r5
            r1 = r4
            org.apache.kafka.clients.producer.Producer r1 = r1.producer()     // Catch: java.lang.Throwable -> L4c
            r8 = r1
            r1 = r0
            if (r1 != 0) goto L37
        L2f:
            r0 = r8
            if (r0 == 0) goto L3f
            goto L47
        L37:
            r1 = r8
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> L4c
            if (r0 == 0) goto L47
        L3f:
            r0 = r4
            r1 = 0
            r0.producer_$eq(r1)     // Catch: java.lang.Throwable -> L4c
            goto L47
        L47:
            r0 = r7
            monitor-exit(r0)
            goto L4f
        L4c:
            r1 = move-exception
            monitor-exit(r1)
            throw r0
        L4f:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.samza.system.kafka.KafkaSystemProducer.closeAndNullifyCurrentProducer(org.apache.kafka.clients.producer.Producer):void");
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v38, types: [java.lang.Throwable, java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v52, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v53, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v58, types: [org.apache.samza.system.kafka.KafkaSystemProducer] */
    public void send(final String str, OutgoingMessageEnvelope outgoingMessageEnvelope) {
        trace(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Enqueuing message: %s, %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str, outgoingMessageEnvelope}));
        });
        final String stream = outgoingMessageEnvelope.getSystemStream().getStream();
        if (stream == null || (stream != null ? stream.equals("") : "" == 0)) {
            throw new IllegalArgumentException(StreamNameNullOrEmptyErrorMsg());
        }
        final SourceData sourceData = sources().get(str);
        if (sourceData == null) {
            throw new IllegalArgumentException(new StringOps(Predef$.MODULE$.augmentString("Source %s must be registered first before send.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str})));
        }
        SamzaException andSet = sourceData.exceptionInCallback().getAndSet(null);
        if (andSet != null) {
            this.org$apache$samza$system$kafka$KafkaSystemProducer$$metrics.sendFailed().inc();
            throw andSet;
        }
        if (producer() == null) {
            ?? producerLock = producerLock();
            synchronized (producerLock) {
                if (producer() == null) {
                    info(() -> {
                        return new StringOps(Predef$.MODULE$.augmentString("Creating a new producer for system %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.org$apache$samza$system$kafka$KafkaSystemProducer$$systemName}));
                    });
                    producerLock = this;
                    producerLock.producer_$eq((Producer) this.getProducer.apply());
                }
            }
        }
        final Producer<byte[], byte[]> producer = producer();
        if (producer == null) {
            throw new SamzaException("Kafka system producer is not available.");
        }
        final Integer integerPartitionKey = outgoingMessageEnvelope.getPartitionKey() != null ? KafkaUtil$.MODULE$.getIntegerPartitionKey(outgoingMessageEnvelope, producer.partitionsFor(stream)) : null;
        ProducerRecord producerRecord = new ProducerRecord(outgoingMessageEnvelope.getSystemStream().getStream(), integerPartitionKey, (byte[]) outgoingMessageEnvelope.getKey(), (byte[]) outgoingMessageEnvelope.getMessage());
        try {
            ?? sendLock = sourceData.sendLock();
            synchronized (sendLock) {
                sourceData.latestFuture_$eq(producer.send(producerRecord, new Callback(this, str, stream, sourceData, producer, integerPartitionKey) { // from class: org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1
                    private final /* synthetic */ KafkaSystemProducer $outer;
                    private final String source$2;
                    private final String topicName$1;
                    private final KafkaSystemProducer.SourceData sourceData$1;
                    private final Producer currentProducer$1;
                    private final Integer partitionKey$1;

                    public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                        if (exc == null) {
                            this.$outer.org$apache$samza$system$kafka$KafkaSystemProducer$$metrics.sendSuccess().inc();
                            return;
                        }
                        this.$outer.error(() -> {
                            return "Closing the producer because of an exception in callback: ";
                        }, () -> {
                            return exc;
                        });
                        this.$outer.closeAndNullifyCurrentProducer(this.currentProducer$1);
                        this.sourceData$1.exceptionInCallback().compareAndSet(null, new SamzaException(new StringOps(Predef$.MODULE$.augmentString("Unable to send message from %s to system %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.source$2, this.$outer.org$apache$samza$system$kafka$KafkaSystemProducer$$systemName})), exc));
                        this.$outer.org$apache$samza$system$kafka$KafkaSystemProducer$$metrics.sendFailed().inc();
                        this.$outer.error(() -> {
                            return new StringOps(Predef$.MODULE$.augmentString("Unable to send message on Topic:%s Partition:%s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.topicName$1, this.partitionKey$1}));
                        }, () -> {
                            return exc;
                        });
                    }

                    {
                        if (this == null) {
                            throw null;
                        }
                        this.$outer = this;
                        this.source$2 = str;
                        this.topicName$1 = stream;
                        this.sourceData$1 = sourceData;
                        this.currentProducer$1 = producer;
                        this.partitionKey$1 = integerPartitionKey;
                    }
                }));
            }
            this.org$apache$samza$system$kafka$KafkaSystemProducer$$metrics.sends().inc();
        } catch (Exception e) {
            error(() -> {
                return "Closing the producer because of an exception in send: ";
            }, () -> {
                return e;
            });
            closeAndNullifyCurrentProducer(producer);
            this.org$apache$samza$system$kafka$KafkaSystemProducer$$metrics.sendFailed().inc();
            throw new SamzaException(new StringOps(Predef$.MODULE$.augmentString("Failed to send message on Topic:%s Partition:%s Exception:\n %s,")).format(Predef$.MODULE$.genericWrapArray(new Object[]{stream, integerPartitionKey, e})));
        }
    }

    public void flush(String str) {
        updateTimer(this.org$apache$samza$system$kafka$KafkaSystemProducer$$metrics.flushNs(), () -> {
            this.org$apache$samza$system$kafka$KafkaSystemProducer$$metrics.flushes().inc();
            SourceData sourceData = this.sources().get(str);
            if (sourceData.latestFuture() != null) {
                while (!sourceData.latestFuture().isDone() && sourceData.exceptionInCallback().get() == null) {
                    try {
                        sourceData.latestFuture().get();
                    } catch (Throwable th) {
                        this.error(() -> {
                            return th.getMessage();
                        }, () -> {
                            return th;
                        });
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    }
                }
                if (sourceData.exceptionInCallback().get() != null) {
                    this.org$apache$samza$system$kafka$KafkaSystemProducer$$metrics.flushFailed().inc();
                    throw sourceData.exceptionInCallback().get();
                }
                this.trace(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("Flushed %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
                });
            }
        });
    }

    public static final /* synthetic */ void $anonfun$stop$1(KafkaSystemProducer kafkaSystemProducer, Tuple2 tuple2) {
        if (((SourceData) tuple2._2()).exceptionInCallback().get() == null) {
            kafkaSystemProducer.flush((String) tuple2._1());
        }
    }

    public KafkaSystemProducer(String str, ExponentialSleepStrategy exponentialSleepStrategy, Function0<Producer<byte[], byte[]>> function0, KafkaSystemProducerMetrics kafkaSystemProducerMetrics, Function0<Object> function02) {
        this.org$apache$samza$system$kafka$KafkaSystemProducer$$systemName = str;
        this.getProducer = function0;
        this.org$apache$samza$system$kafka$KafkaSystemProducer$$metrics = kafkaSystemProducerMetrics;
        this.clock = function02;
        Logging.$init$(this);
        TimerUtils.$init$(this);
        this.producer = null;
        this.producerLock = new Object();
        this.StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the stream configuration file.";
        this.sources = new ConcurrentHashMap<>();
    }
}
