package org.apache.samza.checkpoint.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemStreamPartitionIterator;
import org.apache.samza.system.kafka.KafkaStreamSpec;
import org.apache.samza.util.Logging;
import org.slf4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.StringContext;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;

/* compiled from: KafkaCheckpointManager.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\rc\u0001B\u0001\u0003\u00015\u0011acS1gW\u0006\u001c\u0005.Z2la>Lg\u000e^'b]\u0006<WM\u001d\u0006\u0003\u0007\u0011\tQa[1gW\u0006T!!\u0002\u0004\u0002\u0015\rDWmY6q_&tGO\u0003\u0002\b\u0011\u0005)1/Y7{C*\u0011\u0011BC\u0001\u0007CB\f7\r[3\u000b\u0003-\t1a\u001c:h\u0007\u0001\u0019B\u0001\u0001\b\u00175A\u0011q\u0002F\u0007\u0002!)\u0011\u0011CE\u0001\u0005Y\u0006twMC\u0001\u0014\u0003\u0011Q\u0017M^1\n\u0005U\u0001\"AB(cU\u0016\u001cG\u000f\u0005\u0002\u001815\tA!\u0003\u0002\u001a\t\t\t2\t[3dWB|\u0017N\u001c;NC:\fw-\u001a:\u0011\u0005mqR\"\u0001\u000f\u000b\u0005u1\u0011\u0001B;uS2L!a\b\u000f\u0003\u000f1{wmZ5oO\"A\u0011\u0005\u0001B\u0001B\u0003%!%\u0001\bdQ\u0016\u001c7\u000e]8j]R\u001c\u0006/Z2\u0011\u0005\r:S\"\u0001\u0013\u000b\u0005\r)#B\u0001\u0014\u0007\u0003\u0019\u0019\u0018p\u001d;f[&\u0011\u0001\u0006\n\u0002\u0010\u0017\u000647.Y*ue\u0016\fWn\u00159fG\"A!\u0006\u0001B\u0001B\u0003%1&A\u0007tsN$X-\u001c$bGR|'/\u001f\t\u0003Y5j\u0011!J\u0005\u0003]\u0015\u0012QbU=ti\u0016lg)Y2u_JL\b\u0002\u0003\u0019\u0001\u0005\u0003\u0005\u000b\u0011B\u0019\u0002%Y\fG.\u001b3bi\u0016\u001c\u0005.Z2la>Lg\u000e\u001e\t\u0003eUj\u0011a\r\u0006\u0002i\u0005)1oY1mC&\u0011ag\r\u0002\b\u0005>|G.Z1o\u0011!A\u0004A!A!\u0002\u0013I\u0014AB2p]\u001aLw\r\u0005\u0002;y5\t1H\u0003\u00029\r%\u0011Qh\u000f\u0002\u0007\u0007>tg-[4\t\u0011}\u0002!\u0011!Q\u0001\n\u0001\u000bq\"\\3ue&\u001c7OU3hSN$(/\u001f\t\u0003\u0003\u0012k\u0011A\u0011\u0006\u0003\u0007\u001a\tq!\\3ue&\u001c7/\u0003\u0002F\u0005\nyQ*\u001a;sS\u000e\u001c(+Z4jgR\u0014\u0018\u0010\u0003\u0005H\u0001\t\u0005\t\u0015!\u0003I\u0003I\u0019\u0007.Z2la>Lg\u000e^'tON+'\u000fZ3\u0011\u0007%ce*D\u0001K\u0015\tYe!A\u0006tKJL\u0017\r\\5{KJ\u001c\u0018BA'K\u0005\u0015\u0019VM\u001d3f!\t9r*\u0003\u0002Q\t\tQ1\t[3dWB|\u0017N\u001c;\t\u0011I\u0003!\u0011!Q\u0001\nM\u000b!c\u00195fG.\u0004x.\u001b8u\u0017\u0016L8+\u001a:eKB\u0019\u0011\n\u0014+\u0011\u0005U3V\"\u0001\u0002\n\u0005]\u0013!!F&bM.\f7\t[3dWB|\u0017N\u001c;M_\u001e\\U-\u001f\u0005\u00063\u0002!\tAW\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0011mcVLX0aC\n\u0004\"!\u0016\u0001\t\u000b\u0005B\u0006\u0019\u0001\u0012\t\u000b)B\u0006\u0019A\u0016\t\u000bAB\u0006\u0019A\u0019\t\u000baB\u0006\u0019A\u001d\t\u000b}B\u0006\u0019\u0001!\t\u000f\u001dC\u0006\u0013!a\u0001\u0011\"9!\u000b\u0017I\u0001\u0002\u0004\u0019\u0006b\u00023\u0001\u0001\u0004%\t!Z\u0001\u0019\u001b\u0006D(+\u001a;ss\u0012+(/\u0019;j_:Le.T5mY&\u001cX#\u00014\u0011\u0005I:\u0017B\u000154\u0005\u0011auN\\4\t\u000f)\u0004\u0001\u0019!C\u0001W\u0006aR*\u0019=SKR\u0014\u0018\u0010R;sCRLwN\\%o\u001b&dG.[:`I\u0015\fHC\u00017p!\t\u0011T.\u0003\u0002og\t!QK\\5u\u0011\u001d\u0001\u0018.!AA\u0002\u0019\f1\u0001\u001f\u00132\u0011\u0019\u0011\b\u0001)Q\u0005M\u0006IR*\u0019=SKR\u0014\u0018\u0010R;sCRLwN\\%o\u001b&dG.[:!\u0011\u001d!\bA1A\u0005\u0002U\f\u0001c\u00195fG.\u0004x.\u001b8u'f\u001cH/Z7\u0016\u0003Y\u0004\"a\u001e@\u000f\u0005ad\bCA=4\u001b\u0005Q(BA>\r\u0003\u0019a$o\\8u}%\u0011QpM\u0001\u0007!J,G-\u001a4\n\u0007}\f\tA\u0001\u0004TiJLgn\u001a\u0006\u0003{NBq!!\u0002\u0001A\u0003%a/A\tdQ\u0016\u001c7\u000e]8j]R\u001c\u0016p\u001d;f[\u0002B\u0001\"!\u0003\u0001\u0005\u0004%\t!^\u0001\u0010G\",7m\u001b9pS:$Hk\u001c9jG\"9\u0011Q\u0002\u0001!\u0002\u00131\u0018\u0001E2iK\u000e\\\u0007o\\5oiR{\u0007/[2!\u0011%\t\t\u0002\u0001b\u0001\n\u0003\t\u0019\"A\u0007dQ\u0016\u001c7\u000e]8j]R\u001c6\u000f]\u000b\u0003\u0003+\u00012\u0001LA\f\u0013\r\tI\"\n\u0002\u0016'f\u001cH/Z7TiJ,\u0017-\u001c)beRLG/[8o\u0011!\ti\u0002\u0001Q\u0001\n\u0005U\u0011AD2iK\u000e\\\u0007o\\5oiN\u001b\b\u000f\t\u0005\t\u0003C\u0001!\u0019!C\u0001k\u00061R\r\u001f9fGR,Gm\u0012:pkB,'OR1di>\u0014\u0018\u0010C\u0004\u0002&\u0001\u0001\u000b\u0011\u0002<\u0002/\u0015D\b/Z2uK\u0012<%o\\;qKJ4\u0015m\u0019;pef\u0004\u0003\"CA\u0015\u0001\t\u0007I\u0011AA\u0016\u00039\u0019\u0018p\u001d;f[\u000e{gn];nKJ,\"!!\f\u0011\u00071\ny#C\u0002\u00022\u0015\u0012abU=ti\u0016l7i\u001c8tk6,'\u000f\u0003\u0005\u00026\u0001\u0001\u000b\u0011BA\u0017\u0003=\u0019\u0018p\u001d;f[\u000e{gn];nKJ\u0004\u0003\"CA\u001d\u0001\t\u0007I\u0011AA\u001e\u0003-\u0019\u0018p\u001d;f[\u0006#W.\u001b8\u0016\u0005\u0005u\u0002c\u0001\u0017\u0002@%\u0019\u0011\u0011I\u0013\u0003\u0017MK8\u000f^3n\u0003\u0012l\u0017N\u001c\u0005\t\u0003\u000b\u0002\u0001\u0015!\u0003\u0002>\u0005a1/_:uK6\fE-\\5oA!I\u0011\u0011\n\u0001A\u0002\u0013\u0005\u00111J\u0001\ni\u0006\u001c8NT1nKN,\"!!\u0014\u0011\u000b]\fy%a\u0015\n\t\u0005E\u0013\u0011\u0001\u0002\u0004'\u0016$\b\u0003BA+\u00037j!!a\u0016\u000b\u0007\u0005ec!A\u0005d_:$\u0018-\u001b8fe&!\u0011QLA,\u0005!!\u0016m]6OC6,\u0007\"CA1\u0001\u0001\u0007I\u0011AA2\u00035!\u0018m]6OC6,7o\u0018\u0013fcR\u0019A.!\u001a\t\u0013A\fy&!AA\u0002\u00055\u0003\u0002CA5\u0001\u0001\u0006K!!\u0014\u0002\u0015Q\f7o\u001b(b[\u0016\u001c\b\u0005C\u0006\u0002n\u0001\u0001\r\u00111A\u0005\u0002\u0005=\u0014A\u0006;bg.t\u0015-\\3t)>\u001c\u0005.Z2la>Lg\u000e^:\u0016\u0005\u0005E\u0004CB<\u0002t\u0005Mc*\u0003\u0003\u0002v\u0005\u0005!aA'ba\"Y\u0011\u0011\u0010\u0001A\u0002\u0003\u0007I\u0011AA>\u0003i!\u0018m]6OC6,7\u000fV8DQ\u0016\u001c7\u000e]8j]R\u001cx\fJ3r)\ra\u0017Q\u0010\u0005\na\u0006]\u0014\u0011!a\u0001\u0003cB\u0001\"!!\u0001A\u0003&\u0011\u0011O\u0001\u0018i\u0006\u001c8NT1nKN$vn\u00115fG.\u0004x.\u001b8ug\u0002B\u0011\"!\"\u0001\u0005\u0004%\t!a\"\u0002\u0017A\u0014x\u000eZ;dKJ\u0014VMZ\u000b\u0003\u0003\u0013\u0003b!a#\u0002\u0018\u0006mUBAAG\u0015\u0011\ty)!%\u0002\r\u0005$x.\\5d\u0015\u0011\t\u0019*!&\u0002\u0015\r|gnY;se\u0016tGO\u0003\u0002\u001e%%!\u0011\u0011TAG\u0005=\tEo\\7jGJ+g-\u001a:f]\u000e,\u0007c\u0001\u0017\u0002\u001e&\u0019\u0011qT\u0013\u0003\u001dMK8\u000f^3n!J|G-^2fe\"A\u00111\u0015\u0001!\u0002\u0013\tI)\u0001\u0007qe>$WoY3s%\u00164\u0007\u0005C\u0005\u0002(\u0002\u0011\r\u0011\"\u0001\u0002*\u0006!\u0002O]8ek\u000e,'o\u0011:fCRLwN\u001c'pG.,\u0012A\u0004\u0005\b\u0003[\u0003\u0001\u0015!\u0003\u000f\u0003U\u0001(o\u001c3vG\u0016\u00148I]3bi&|g\u000eT8dW\u0002Bq!!-\u0001\t\u0003\n\u0019,A\bde\u0016\fG/\u001a*fg>,(oY3t)\u0005a\u0007bBA\\\u0001\u0011\u0005\u00131W\u0001\u0006gR\f'\u000f\u001e\u0005\b\u0003w\u0003A\u0011IA_\u0003!\u0011XmZ5ti\u0016\u0014Hc\u00017\u0002@\"A\u0011\u0011YA]\u0001\u0004\t\u0019&\u0001\u0005uCN\\g*Y7f\u0011\u001d\t)\r\u0001C!\u0003\u000f\f!C]3bI2\u000b7\u000f^\"iK\u000e\\\u0007o\\5oiR\u0019a*!3\t\u0011\u0005\u0005\u00171\u0019a\u0001\u0003'Bq!!4\u0001\t\u0003\ny-A\bxe&$Xm\u00115fG.\u0004x.\u001b8u)\u0015a\u0017\u0011[Aj\u0011!\t\t-a3A\u0002\u0005M\u0003BB\u0003\u0002L\u0002\u0007a\nC\u0004\u0002X\u0002!\t%a-\u0002!\rdW-\u0019:DQ\u0016\u001c7\u000e]8j]R\u001c\bbBAn\u0001\u0011\u0005\u00131W\u0001\u0005gR|\u0007\u000fC\u0004\u0002`\u0002!\t!!9\u0002#\u001d,GoU=ti\u0016l\u0007K]8ek\u000e,'\u000f\u0006\u0002\u0002\u001c\"\"\u0011Q\\As!\u0011\t9/!?\u000e\u0005\u0005%(\u0002BAv\u0003[\f1\"\u00198o_R\fG/[8og*!\u0011q^Ay\u0003\u0019\u0019w.\\7p]*!\u00111_A{\u0003\u00199wn\\4mK*\u0011\u0011q_\u0001\u0004G>l\u0017\u0002BA~\u0003S\u0014\u0011CV5tS\ndWMR8s)\u0016\u001cH/\u001b8h\u0011\u001d\ty\u0010\u0001C\u0005\u0005\u0003\tqB]3bI\u000eCWmY6q_&tGo\u001d\u000b\u0003\u0003cBqA!\u0002\u0001\t\u0013\u00119!A\bhKR|E\u000eZ3ti>3gm]3u)\r1(\u0011\u0002\u0005\t\u0005\u0017\u0011\u0019\u00011\u0001\u0002\u0016\u0005\u00191o\u001d9\b\u0013\t=!!!A\t\u0002\tE\u0011AF&bM.\f7\t[3dWB|\u0017N\u001c;NC:\fw-\u001a:\u0011\u0007U\u0013\u0019B\u0002\u0005\u0002\u0005\u0005\u0005\t\u0012\u0001B\u000b'\u0011\u0011\u0019Ba\u0006\u0011\u0007I\u0012I\"C\u0002\u0003\u001cM\u0012a!\u00118z%\u00164\u0007bB-\u0003\u0014\u0011\u0005!q\u0004\u000b\u0003\u0005#A!Ba\t\u0003\u0014E\u0005I\u0011\u0001B\u0013\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%mU\u0011!q\u0005\u0016\u0004\u0011\n%2F\u0001B\u0016!\u0011\u0011iCa\u000e\u000e\u0005\t=\"\u0002\u0002B\u0019\u0005g\t\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\tU2'\u0001\u0006b]:|G/\u0019;j_:LAA!\u000f\u00030\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\t\u0015\tu\"1CI\u0001\n\u0003\u0011y$A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$HeN\u000b\u0003\u0005\u0003R3a\u0015B\u0015\u0001")
/* loaded from: input_file:org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.class */
public class KafkaCheckpointManager implements CheckpointManager, Logging {
    private final KafkaStreamSpec checkpointSpec;
    private final SystemFactory systemFactory;
    private final boolean validateCheckpoint;
    private final Config config;
    private final MetricsRegistry metricsRegistry;
    private final Serde<Checkpoint> checkpointMsgSerde;
    private final Serde<KafkaCheckpointLogKey> checkpointKeySerde;
    private long MaxRetryDurationInMillis;
    private final String checkpointSystem;
    private final String checkpointTopic;
    private final SystemStreamPartition checkpointSsp;
    private final String expectedGrouperFactory;
    private final SystemConsumer systemConsumer;
    private final SystemAdmin systemAdmin;
    private Set<TaskName> taskNames;
    private Map<TaskName, Checkpoint> taskNamesToCheckpoints;
    private final AtomicReference<SystemProducer> producerRef;
    private final Object producerCreationLock;
    private final String loggerName;
    private Logger logger;
    private final String startupLoggerName;
    private Logger startupLogger;
    private volatile byte bitmap$0;

    public void startupLog(Function0<Object> function0) {
        Logging.startupLog$(this, function0);
    }

    public void trace(Function0<Object> function0) {
        Logging.trace$(this, function0);
    }

    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.trace$(this, function0, function02);
    }

    public void debug(Function0<Object> function0) {
        Logging.debug$(this, function0);
    }

    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.debug$(this, function0, function02);
    }

    public void info(Function0<Object> function0) {
        Logging.info$(this, function0);
    }

    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.info$(this, function0, function02);
    }

    public void warn(Function0<Object> function0) {
        Logging.warn$(this, function0);
    }

    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.warn$(this, function0, function02);
    }

    public void error(Function0<Object> function0) {
        Logging.error$(this, function0);
    }

    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.error$(this, function0, function02);
    }

    public void putMDC(Function0<String> function0, Function0<String> function02) {
        Logging.putMDC$(this, function0, function02);
    }

    public String getMDC(Function0<String> function0) {
        return Logging.getMDC$(this, function0);
    }

    public void removeMDC(Function0<String> function0) {
        Logging.removeMDC$(this, function0);
    }

    public void clearMDC() {
        Logging.clearMDC$(this);
    }

    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.checkpoint.kafka.KafkaCheckpointManager] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.logger = Logging.logger$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? logger$lzycompute() : this.logger;
    }

    public String startupLoggerName() {
        return this.startupLoggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.checkpoint.kafka.KafkaCheckpointManager] */
    private Logger startupLogger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.startupLogger = Logging.startupLogger$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.startupLogger;
    }

    public Logger startupLogger() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? startupLogger$lzycompute() : this.startupLogger;
    }

    public void org$apache$samza$util$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    public void org$apache$samza$util$Logging$_setter_$startupLoggerName_$eq(String str) {
        this.startupLoggerName = str;
    }

    public long MaxRetryDurationInMillis() {
        return this.MaxRetryDurationInMillis;
    }

    public void MaxRetryDurationInMillis_$eq(long j) {
        this.MaxRetryDurationInMillis = j;
    }

    public String checkpointSystem() {
        return this.checkpointSystem;
    }

    public String checkpointTopic() {
        return this.checkpointTopic;
    }

    public SystemStreamPartition checkpointSsp() {
        return this.checkpointSsp;
    }

    public String expectedGrouperFactory() {
        return this.expectedGrouperFactory;
    }

    public SystemConsumer systemConsumer() {
        return this.systemConsumer;
    }

    public SystemAdmin systemAdmin() {
        return this.systemAdmin;
    }

    public Set<TaskName> taskNames() {
        return this.taskNames;
    }

    public void taskNames_$eq(Set<TaskName> set) {
        this.taskNames = set;
    }

    public Map<TaskName, Checkpoint> taskNamesToCheckpoints() {
        return this.taskNamesToCheckpoints;
    }

    public void taskNamesToCheckpoints_$eq(Map<TaskName, Checkpoint> map) {
        this.taskNamesToCheckpoints = map;
    }

    public AtomicReference<SystemProducer> producerRef() {
        return this.producerRef;
    }

    public Object producerCreationLock() {
        return this.producerCreationLock;
    }

    public void createResources() {
        Preconditions.checkNotNull(systemAdmin());
        systemAdmin().start();
        info(() -> {
            return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Creating checkpoint stream: ", " with "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.checkpointSpec.getPhysicalName()})) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"partition count: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.checkpointSpec.getPartitionCount())}));
        });
        systemAdmin().createStream(this.checkpointSpec);
        if (this.validateCheckpoint) {
            info(() -> {
                return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Validating checkpoint stream"})).s(Nil$.MODULE$);
            });
            systemAdmin().validateStream(this.checkpointSpec);
        }
    }

    public void start() {
        info(() -> {
            return "Starting the checkpoint SystemProducer";
        });
        producerRef().get().start();
        String oldestOffset = getOldestOffset(checkpointSsp());
        info(() -> {
            return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Starting the checkpoint SystemConsumer from oldest offset ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{oldestOffset}));
        });
        systemConsumer().register(checkpointSsp(), oldestOffset);
        systemConsumer().start();
    }

    public void register(TaskName taskName) {
        debug(() -> {
            return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Registering taskName: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{taskName}));
        });
        producerRef().get().register(taskName.getTaskName());
        taskNames_$eq((Set) taskNames().$plus(taskName));
    }

    public Checkpoint readLastCheckpoint(TaskName taskName) {
        if (!taskNames().contains(taskName)) {
            throw new SamzaException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Task: ", " is not registered with this CheckpointManager"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{taskName})));
        }
        info(() -> {
            return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Reading checkpoint for taskName ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{taskName}));
        });
        if (taskNamesToCheckpoints() == null) {
            debug(() -> {
                return "Reading checkpoints for the first time";
            });
            taskNamesToCheckpoints_$eq(readCheckpoints());
        } else {
            debug(() -> {
                return "Updating existing checkpoint mappings";
            });
            taskNamesToCheckpoints_$eq(taskNamesToCheckpoints().$plus$plus(readCheckpoints()));
        }
        Checkpoint checkpoint = (Checkpoint) taskNamesToCheckpoints().getOrElse(taskName, () -> {
            return null;
        });
        info(() -> {
            return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Got checkpoint state for taskName - ", ": ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{taskName, checkpoint}));
        });
        return checkpoint;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v27, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v28, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v31, types: [boolean] */
    public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
        KafkaCheckpointLogKey kafkaCheckpointLogKey = new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE, taskName, expectedGrouperFactory());
        try {
            try {
                OutgoingMessageEnvelope outgoingMessageEnvelope = new OutgoingMessageEnvelope(checkpointSsp(), this.checkpointKeySerde.toBytes(kafkaCheckpointLogKey), this.checkpointMsgSerde.toBytes(checkpoint));
                long currentTimeMillis = System.currentTimeMillis();
                long j = 1000;
                Exception exc = null;
                while (System.currentTimeMillis() - currentTimeMillis <= MaxRetryDurationInMillis()) {
                    SystemProducer systemProducer = producerRef().get();
                    try {
                        systemProducer.send(taskName.getTaskName(), outgoingMessageEnvelope);
                        systemProducer.flush(taskName.getTaskName());
                        debug(() -> {
                            return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Wrote checkpoint: ", " for task: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{checkpoint, taskName}));
                        });
                        return;
                    } catch (Exception e) {
                        exc = e;
                        warn(() -> {
                            return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Retrying failed checkpoint write to key: ", ", checkpoint: ", " for task: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{kafkaCheckpointLogKey, checkpoint, taskName}));
                        }, () -> {
                            return e;
                        });
                        SystemProducer systemProducer2 = getSystemProducer();
                        ?? producerCreationLock = producerCreationLock();
                        synchronized (producerCreationLock) {
                            producerCreationLock = producerRef().compareAndSet(systemProducer, systemProducer2);
                            if (producerCreationLock != 0) {
                                info(() -> {
                                    return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Stopping the checkpoint SystemProducer"})).s(Nil$.MODULE$);
                                });
                                systemProducer.stop();
                                info(() -> {
                                    return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Recreating the checkpoint SystemProducer"})).s(Nil$.MODULE$);
                                });
                                taskNames().foreach(taskName2 -> {
                                    $anonfun$writeCheckpoint$6(this, systemProducer2, taskName2);
                                    return BoxedUnit.UNIT;
                                });
                                systemProducer2.start();
                            } else {
                                info(() -> {
                                    return "Producer instance was recreated by other thread. Retrying with it.";
                                });
                                systemProducer2.stop();
                            }
                            j = Math.min(j * 2, 10000L);
                            Thread.sleep(j);
                        }
                    }
                }
                throw new SamzaException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Exception when writing checkpoint: ", " for task: ", "."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{checkpoint, taskName})), exc);
            } catch (Exception e2) {
                throw new SamzaException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Exception when writing checkpoint for ", ": ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{taskName, checkpoint})), e2);
            }
        } catch (Exception e3) {
            throw new SamzaException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Exception when writing checkpoint-key for ", ": ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{taskName, checkpoint})), e3);
        }
    }

    public void clearCheckpoints() {
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Clear checkpoint stream %s in system %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.checkpointTopic(), this.checkpointSystem()}));
        });
        systemAdmin().clearStream(this.checkpointSpec);
    }

    public void stop() {
        info(() -> {
            return "Stopping system admin.";
        });
        systemAdmin().stop();
        info(() -> {
            return "Stopping system producer.";
        });
        producerRef().get().stop();
        info(() -> {
            return "Stopping system consumer.";
        });
        systemConsumer().stop();
        info(() -> {
            return "CheckpointManager stopped.";
        });
    }

    @VisibleForTesting
    public SystemProducer getSystemProducer() {
        return this.systemFactory.getProducer(checkpointSystem(), this.config, this.metricsRegistry);
    }

    private Map<TaskName, Checkpoint> readCheckpoints() {
        KafkaCheckpointLogKey kafkaCheckpointLogKey;
        scala.collection.mutable.Map apply = Map$.MODULE$.apply(Nil$.MODULE$);
        SystemStreamPartitionIterator systemStreamPartitionIterator = new SystemStreamPartitionIterator(systemConsumer(), checkpointSsp());
        IntRef create = IntRef.create(0);
        while (systemStreamPartitionIterator.hasNext()) {
            IncomingMessageEnvelope next = systemStreamPartitionIterator.next();
            String offset = next.getOffset();
            create.elem++;
            if (create.elem % 1000 == 0) {
                info(() -> {
                    return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Read ", " from topic: ", ". Current offset: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(create.elem), this.checkpointTopic(), offset}));
                });
            }
            byte[] bArr = (byte[]) next.getKey();
            if (bArr == null) {
                throw new SamzaException("Encountered a checkpoint message with null key. Topic:$checkpointTopic " + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Offset:", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{offset})));
            }
            try {
                kafkaCheckpointLogKey = (KafkaCheckpointLogKey) this.checkpointKeySerde.fromBytes(bArr);
            } catch (Exception e) {
                if (this.validateCheckpoint) {
                    throw new SamzaException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Exception while serializing checkpoint-key. "})).s(Nil$.MODULE$) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Topic: ", " Offset: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{checkpointTopic(), offset})), e);
                }
                warn(() -> {
                    return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Ignoring exception while serializing checkpoint-key. Topic: ", " Offset: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.checkpointTopic(), offset}));
                }, () -> {
                    return e;
                });
                kafkaCheckpointLogKey = null;
            }
            KafkaCheckpointLogKey kafkaCheckpointLogKey2 = kafkaCheckpointLogKey;
            if (kafkaCheckpointLogKey2 != null) {
                String grouperFactoryClassName = kafkaCheckpointLogKey2.getGrouperFactoryClassName();
                if (!expectedGrouperFactory().equals(grouperFactoryClassName)) {
                    warn(() -> {
                        return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Grouper mismatch. Configured: ", " Actual: ", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.expectedGrouperFactory(), grouperFactoryClassName}));
                    });
                    if (this.validateCheckpoint) {
                        throw new SamzaException("SSPGrouperFactory in the checkpoint topic does not match the configured value" + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Configured value: ", "; Actual value: ", " Offset: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{expectedGrouperFactory(), grouperFactoryClassName, offset})));
                    }
                }
                if (KafkaCheckpointLogKey.CHECKPOINT_KEY_TYPE.equals(kafkaCheckpointLogKey2.getType())) {
                    try {
                        apply.put(kafkaCheckpointLogKey2.getTaskName(), (Checkpoint) this.checkpointMsgSerde.fromBytes((byte[]) next.getMessage()));
                    } catch (Exception e2) {
                        throw new SamzaException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Exception while serializing checkpoint-message. "})).s(Nil$.MODULE$) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Topic: ", " Offset: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{checkpointTopic(), offset})), e2);
                    }
                } else {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                }
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }
        info(() -> {
            return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Read ", " messages from system:", " topic:", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(create.elem), this.checkpointSystem(), this.checkpointTopic()}));
        });
        return apply.toMap(Predef$.MODULE$.$conforms());
    }

    private String getOldestOffset(SystemStreamPartition systemStreamPartition) {
        String stream = systemStreamPartition.getSystemStream().getStream();
        Partition partition = systemStreamPartition.getPartition();
        SystemStreamMetadata systemStreamMetadata = (SystemStreamMetadata) systemAdmin().getSystemStreamMetadata(Collections.singleton(stream)).get(stream);
        if (systemStreamMetadata == null) {
            throw new SamzaException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Got null metadata for system:", ", topic:", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{checkpointSystem(), stream})));
        }
        SystemStreamMetadata.SystemStreamPartitionMetadata systemStreamPartitionMetadata = (SystemStreamMetadata.SystemStreamPartitionMetadata) systemStreamMetadata.getSystemStreamPartitionMetadata().get(partition);
        if (systemStreamPartitionMetadata == null) {
            throw new SamzaException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Got a null partition metadata for system:", ", topic:", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{checkpointSystem(), stream})));
        }
        return systemStreamPartitionMetadata.getOldestOffset();
    }

    public static final /* synthetic */ void $anonfun$writeCheckpoint$6(KafkaCheckpointManager kafkaCheckpointManager, SystemProducer systemProducer, TaskName taskName) {
        kafkaCheckpointManager.debug(() -> {
            return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Registering the taskName: ", " with SystemProducer"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{taskName}));
        });
        systemProducer.register(taskName.getTaskName());
    }

    public KafkaCheckpointManager(KafkaStreamSpec kafkaStreamSpec, SystemFactory systemFactory, boolean z, Config config, MetricsRegistry metricsRegistry, Serde<Checkpoint> serde, Serde<KafkaCheckpointLogKey> serde2) {
        this.checkpointSpec = kafkaStreamSpec;
        this.systemFactory = systemFactory;
        this.validateCheckpoint = z;
        this.config = config;
        this.metricsRegistry = metricsRegistry;
        this.checkpointMsgSerde = serde;
        this.checkpointKeySerde = serde2;
        Logging.$init$(this);
        this.MaxRetryDurationInMillis = TimeUnit.MINUTES.toMillis(15L);
        info(() -> {
            return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Creating KafkaCheckpointManager for checkpointTopic:", ", systemName:", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.checkpointTopic(), this.checkpointSystem()})) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"validateCheckpoints:", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToBoolean(this.validateCheckpoint)}));
        });
        this.checkpointSystem = kafkaStreamSpec.getSystemName();
        this.checkpointTopic = kafkaStreamSpec.getPhysicalName();
        this.checkpointSsp = new SystemStreamPartition(checkpointSystem(), checkpointTopic(), new Partition(0));
        this.expectedGrouperFactory = new JobConfig(config).getSystemStreamPartitionGrouperFactory();
        this.systemConsumer = systemFactory.getConsumer(checkpointSystem(), config, metricsRegistry);
        this.systemAdmin = systemFactory.getAdmin(checkpointSystem(), config);
        this.taskNames = Predef$.MODULE$.Set().apply(Nil$.MODULE$);
        this.producerRef = new AtomicReference<>(getSystemProducer());
        this.producerCreationLock = new Object();
    }
}
