package org.apache.samza.system.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.samza.metrics.Timer;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.hdfs.writer.HdfsWriter;
import org.apache.samza.system.hdfs.writer.HdfsWriter$;
import org.apache.samza.util.Logging;
import org.apache.samza.util.TimerUtils;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.collection.Iterable$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: HdfsSystemProducer.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00055d\u0001B\u0001\u0003\u00015\u0011!\u0003\u00133ggNK8\u000f^3n!J|G-^2fe*\u00111\u0001B\u0001\u0005Q\u001247O\u0003\u0002\u0006\r\u000511/_:uK6T!a\u0002\u0005\u0002\u000bM\fWN_1\u000b\u0005%Q\u0011AB1qC\u000eDWMC\u0001\f\u0003\ry'oZ\u0002\u0001'\u0015\u0001aB\u0006\u000e!!\tyA#D\u0001\u0011\u0015\t\t\"#\u0001\u0003mC:<'\"A\n\u0002\t)\fg/Y\u0005\u0003+A\u0011aa\u00142kK\u000e$\bCA\f\u0019\u001b\u0005!\u0011BA\r\u0005\u00059\u0019\u0016p\u001d;f[B\u0013x\u000eZ;dKJ\u0004\"a\u0007\u0010\u000e\u0003qQ!!\b\u0004\u0002\tU$\u0018\u000e\\\u0005\u0003?q\u0011q\u0001T8hO&tw\r\u0005\u0002\u001cC%\u0011!\u0005\b\u0002\u000b)&lWM]+uS2\u001c\b\u0002\u0003\u0013\u0001\u0005\u0003\u0005\u000b\u0011B\u0013\u0002\u0015ML8\u000f^3n\u001d\u0006lW\r\u0005\u0002'_9\u0011q%\f\t\u0003Q-j\u0011!\u000b\u0006\u0003U1\ta\u0001\u0010:p_Rt$\"\u0001\u0017\u0002\u000bM\u001c\u0017\r\\1\n\u00059Z\u0013A\u0002)sK\u0012,g-\u0003\u00021c\t11\u000b\u001e:j]\u001eT!AL\u0016\t\u0011M\u0002!\u0011!Q\u0001\n\u0015\n\u0001b\u00197jK:$\u0018\n\u001a\u0005\tk\u0001\u0011\t\u0011)A\u0005m\u000511m\u001c8gS\u001e\u0004\"a\u000e\u001d\u000e\u0003\tI!!\u000f\u0002\u0003\u0015!#gm]\"p]\u001aLw\r\u0003\u0005<\u0001\t\u0005\t\u0015!\u0003=\u0003\u001diW\r\u001e:jGN\u0004\"aN\u001f\n\u0005y\u0012!!\u0007%eMN\u001c\u0016p\u001d;f[B\u0013x\u000eZ;dKJlU\r\u001e:jGND\u0001\u0002\u0011\u0001\u0003\u0006\u0004%\t!Q\u0001\u0006G2|7m[\u000b\u0002\u0005B\u00191\t\u0012$\u000e\u0003-J!!R\u0016\u0003\u0013\u0019+hn\u0019;j_:\u0004\u0004CA\"H\u0013\tA5F\u0001\u0003M_:<\u0007\u0002\u0003&\u0001\u0005\u0003\u0005\u000b\u0011\u0002\"\u0002\r\rdwnY6!\u0011\u0015a\u0005\u0001\"\u0001N\u0003\u0019a\u0014N\\5u}Q1aj\u0014)R%N\u0003\"a\u000e\u0001\t\u000b\u0011Z\u0005\u0019A\u0013\t\u000bMZ\u0005\u0019A\u0013\t\u000bUZ\u0005\u0019\u0001\u001c\t\u000bmZ\u0005\u0019\u0001\u001f\t\u000f\u0001[\u0005\u0013!a\u0001\u0005\"9Q\u000b\u0001b\u0001\n\u00031\u0016a\u00013ggV\tq\u000b\u0005\u0002Y;6\t\u0011L\u0003\u0002[7\u0006\u0011am\u001d\u0006\u00039\"\ta\u0001[1e_>\u0004\u0018B\u00010Z\u0005)1\u0015\u000e\\3TsN$X-\u001c\u0005\u0007A\u0002\u0001\u000b\u0011B,\u0002\t\u001147\u000f\t\u0005\bE\u0002\u0011\r\u0011\"\u0001d\u0003\u001d9(/\u001b;feN,\u0012\u0001\u001a\t\u0005K*,C.D\u0001g\u0015\t9\u0007.A\u0004nkR\f'\r\\3\u000b\u0005%\\\u0013AC2pY2,7\r^5p]&\u00111N\u001a\u0002\u0004\u001b\u0006\u0004\bGA7v!\rq\u0017o]\u0007\u0002_*\u0011\u0001OA\u0001\u0007oJLG/\u001a:\n\u0005I|'A\u0003%eMN<&/\u001b;feB\u0011A/\u001e\u0007\u0001\t%1x/!A\u0001\u0002\u000b\u0005\u0011PA\u0002`IEBa\u0001\u001f\u0001!\u0002\u0013!\u0017\u0001C<sSR,'o\u001d\u0011\u0012\u0005il\bCA\"|\u0013\ta8FA\u0004O_RD\u0017N\\4\u0011\u0005\rs\u0018BA@,\u0005\r\te.\u001f\u0005\n\u0003\u0007\u0001!\u0019!C\u0005\u0003\u000b\tA\u0001\\8dWV\ta\u0002C\u0004\u0002\n\u0001\u0001\u000b\u0011\u0002\b\u0002\u000b1|7m\u001b\u0011\t\u000f\u00055\u0001\u0001\"\u0001\u0002\u0010\u0005)1\u000f^1siR\u0011\u0011\u0011\u0003\t\u0004\u0007\u0006M\u0011bAA\u000bW\t!QK\\5u\u0011\u001d\tI\u0002\u0001C\u0001\u0003\u001f\tAa\u001d;pa\"9\u0011Q\u0004\u0001\u0005\u0002\u0005}\u0011\u0001\u0003:fO&\u001cH/\u001a:\u0015\t\u0005E\u0011\u0011\u0005\u0005\b\u0003G\tY\u00021\u0001&\u0003\u0019\u0019x.\u001e:dK\"9\u0011q\u0005\u0001\u0005\u0002\u0005%\u0012!\u00024mkNDG\u0003BA\t\u0003WAq!a\t\u0002&\u0001\u0007Q\u0005C\u0004\u00020\u0001!\t!!\r\u0002\tM,g\u000e\u001a\u000b\u0007\u0003#\t\u0019$!\u000e\t\u000f\u0005\r\u0012Q\u0006a\u0001K!A\u0011qGA\u0017\u0001\u0004\tI$A\u0002p[\u0016\u00042aFA\u001e\u0013\r\ti\u0004\u0002\u0002\u0018\u001fV$xm\\5oO6+7o]1hK\u0016sg/\u001a7pa\u0016<\u0011\"!\u0011\u0003\u0003\u0003E\t!a\u0011\u0002%!#gm]*zgR,W\u000e\u0015:pIV\u001cWM\u001d\t\u0004o\u0005\u0015c\u0001C\u0001\u0003\u0003\u0003E\t!a\u0012\u0014\t\u0005\u0015\u0013\u0011\n\t\u0004\u0007\u0006-\u0013bAA'W\t1\u0011I\\=SK\u001aDq\u0001TA#\t\u0003\t\t\u0006\u0006\u0002\u0002D!Q\u0011QKA##\u0003%\t!a\u0016\u00027\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00136+\t\tIFK\u0002C\u00037Z#!!\u0018\u0011\t\u0005}\u0013\u0011N\u0007\u0003\u0003CRA!a\u0019\u0002f\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003OZ\u0013AC1o]>$\u0018\r^5p]&!\u00111NA1\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a")
/* loaded from: input_file:org/apache/samza/system/hdfs/HdfsSystemProducer.class */
public class HdfsSystemProducer implements SystemProducer, Logging, TimerUtils {
    private final String systemName;
    private final String clientId;
    private final HdfsConfig config;
    private final HdfsSystemProducerMetrics metrics;
    private final Function0<Object> clock;
    private final FileSystem dfs;
    private final Map<String, HdfsWriter<?>> writers;
    private final Object lock;
    private final String loggerName;
    private Logger logger;
    private final String startupLoggerName;
    private Logger startupLogger;
    private volatile byte bitmap$0;

    public <T> T updateTimer(Timer timer, Function0<T> function0) {
        return (T) TimerUtils.updateTimer$(this, timer, function0);
    }

    public long updateTimerAndGetDuration(Timer timer, Function1<Object, BoxedUnit> function1) {
        return TimerUtils.updateTimerAndGetDuration$(this, timer, function1);
    }

    public void startupLog(Function0<Object> function0) {
        Logging.startupLog$(this, function0);
    }

    public void trace(Function0<Object> function0) {
        Logging.trace$(this, function0);
    }

    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.trace$(this, function0, function02);
    }

    public void debug(Function0<Object> function0) {
        Logging.debug$(this, function0);
    }

    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.debug$(this, function0, function02);
    }

    public void info(Function0<Object> function0) {
        Logging.info$(this, function0);
    }

    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.info$(this, function0, function02);
    }

    public void warn(Function0<Object> function0) {
        Logging.warn$(this, function0);
    }

    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.warn$(this, function0, function02);
    }

    public void error(Function0<Object> function0) {
        Logging.error$(this, function0);
    }

    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.error$(this, function0, function02);
    }

    public void putMDC(Function0<String> function0, Function0<String> function02) {
        Logging.putMDC$(this, function0, function02);
    }

    public String getMDC(Function0<String> function0) {
        return Logging.getMDC$(this, function0);
    }

    public void removeMDC(Function0<String> function0) {
        Logging.removeMDC$(this, function0);
    }

    public void clearMDC() {
        Logging.clearMDC$(this);
    }

    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.hdfs.HdfsSystemProducer] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.logger = Logging.logger$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? logger$lzycompute() : this.logger;
    }

    public String startupLoggerName() {
        return this.startupLoggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.hdfs.HdfsSystemProducer] */
    private Logger startupLogger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.startupLogger = Logging.startupLogger$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.startupLogger;
    }

    public Logger startupLogger() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? startupLogger$lzycompute() : this.startupLogger;
    }

    public void org$apache$samza$util$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    public void org$apache$samza$util$Logging$_setter_$startupLoggerName_$eq(String str) {
        this.startupLoggerName = str;
    }

    public Function0<Object> clock() {
        return this.clock;
    }

    public FileSystem dfs() {
        return this.dfs;
    }

    public Map<String, HdfsWriter<?>> writers() {
        return this.writers;
    }

    private Object lock() {
        return this.lock;
    }

    public void start() {
        info(() -> {
            return "entering HdfsSystemProducer.start() call for system: " + this.systemName + ", client: " + this.clientId;
        });
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable, java.lang.Object] */
    public void stop() {
        info(() -> {
            return "entering HdfsSystemProducer.stop() for system: " + this.systemName + ", client: " + this.clientId;
        });
        ?? lock = lock();
        synchronized (lock) {
            writers().values().map(hdfsWriter -> {
                hdfsWriter.close();
                return BoxedUnit.UNIT;
            }, Iterable$.MODULE$.canBuildFrom());
            dfs().close();
        }
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable, java.lang.Object] */
    public void register(String str) {
        info(() -> {
            return "entering HdfsSystemProducer.register(" + str + ") call for system: " + this.systemName + ", client: " + this.clientId;
        });
        ?? lock = lock();
        synchronized (lock) {
            writers().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str), HdfsWriter$.MODULE$.getInstance(dfs(), this.systemName, this.config)));
        }
    }

    /* JADX WARN: Type inference failed for: r0v6, types: [java.lang.Throwable, java.lang.Object] */
    public void flush(String str) {
        debug(() -> {
            return "entering HdfsSystemProducer.flush(" + str + ") call for system: " + this.systemName + ", client: " + this.clientId;
        });
        this.metrics.flushes().inc();
        ?? lock = lock();
        synchronized (lock) {
            liftedTree1$1(str);
        }
        this.metrics.flushSuccess().inc();
    }

    /* JADX WARN: Type inference failed for: r0v6, types: [java.lang.Throwable, java.lang.Object] */
    public void send(String str, OutgoingMessageEnvelope outgoingMessageEnvelope) {
        debug(() -> {
            return "entering HdfsSystemProducer.send(source = " + str + ", envelope) call for system: " + this.systemName + ", client: " + this.clientId;
        });
        this.metrics.sends().inc();
        ?? lock = lock();
        synchronized (lock) {
            liftedTree2$1(str, outgoingMessageEnvelope);
        }
        this.metrics.sendSuccess().inc();
    }

    private final void liftedTree1$1(String str) {
        try {
            updateTimer(this.metrics.flushMs(), () -> {
                ((HdfsWriter) Option$.MODULE$.option2Iterable(this.writers().get(str)).head()).flush();
            });
        } catch (Exception e) {
            this.metrics.flushFailed().inc();
            warn(() -> {
                return "Exception thrown while client " + this.clientId + " flushed HDFS out stream, msg: " + e.getMessage();
            });
            debug(() -> {
                return "Detailed message from exception thrown by client " + this.clientId + " in HDFS flush: ";
            }, () -> {
                return e;
            });
            ((HdfsWriter) Option$.MODULE$.option2Iterable(writers().get(str)).head()).close();
            throw e;
        }
    }

    private final void liftedTree2$1(String str, OutgoingMessageEnvelope outgoingMessageEnvelope) {
        try {
            updateTimer(this.metrics.sendMs(), () -> {
                ((HdfsWriter) Option$.MODULE$.option2Iterable(this.writers().get(str)).head()).write(outgoingMessageEnvelope);
            });
        } catch (Exception e) {
            this.metrics.sendFailed().inc();
            warn(() -> {
                return "Exception thrown while client " + this.clientId + " wrote to HDFS, msg: " + e.getMessage();
            });
            debug(() -> {
                return "Detailed message from exception thrown by client " + this.clientId + " in HDFS write: ";
            }, () -> {
                return e;
            });
            ((HdfsWriter) Option$.MODULE$.option2Iterable(writers().get(str)).head()).close();
            throw e;
        }
    }

    public HdfsSystemProducer(String str, String str2, HdfsConfig hdfsConfig, HdfsSystemProducerMetrics hdfsSystemProducerMetrics, Function0<Object> function0) {
        this.systemName = str;
        this.clientId = str2;
        this.config = hdfsConfig;
        this.metrics = hdfsSystemProducerMetrics;
        this.clock = function0;
        Logging.$init$(this);
        TimerUtils.$init$(this);
        this.dfs = FileSystem.newInstance(new Configuration(true));
        this.writers = Map$.MODULE$.empty();
        this.lock = new Object();
    }
}
