package org.apache.samza.migration;

import java.util.Map;
import org.I0Itec.zkclient.ZkClient;
import org.apache.samza.checkpoint.kafka.KafkaCheckpointManager;
import org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory;
import org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory$;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig$;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.KafkaConfig$;
import org.apache.samza.config.KafkaProducerConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
import org.apache.samza.coordinator.stream.messages.SetMigrationMetaMessage;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.storage.ChangelogPartitionManager;
import org.apache.samza.util.ClientUtilTopicMetadataStore;
import org.apache.samza.util.ExponentialSleepStrategy;
import org.apache.samza.util.ExponentialSleepStrategy$;
import org.apache.samza.util.KafkaUtil;
import org.apache.samza.util.KafkaUtil$;
import org.apache.samza.util.Logging;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.apache.samza.util.TopicMetadataStore;
import org.slf4j.Logger;
import scala.Function0;
import scala.Option$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: KafkaCheckpointMigration.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Ea\u0001B\u0001\u0003\u0001-\u0011\u0001dS1gW\u0006\u001c\u0005.Z2la>Lg\u000e^'jOJ\fG/[8o\u0015\t\u0019A!A\u0005nS\u001e\u0014\u0018\r^5p]*\u0011QAB\u0001\u0006g\u0006l'0\u0019\u0006\u0003\u000f!\ta!\u00199bG\",'\"A\u0005\u0002\u0007=\u0014xm\u0001\u0001\u0014\t\u0001aA\u0003\u0007\t\u0003\u001bIi\u0011A\u0004\u0006\u0003\u001fA\tA\u0001\\1oO*\t\u0011#\u0001\u0003kCZ\f\u0017BA\n\u000f\u0005\u0019y%M[3diB\u0011QCF\u0007\u0002\u0005%\u0011qC\u0001\u0002\u000e\u001b&<'/\u0019;j_:\u0004F.\u00198\u0011\u0005eaR\"\u0001\u000e\u000b\u0005m!\u0011\u0001B;uS2L!!\b\u000e\u0003\u000f1{wmZ5oO\")q\u0004\u0001C\u0001A\u00051A(\u001b8jiz\"\u0012!\t\t\u0003+\u0001Aqa\t\u0001C\u0002\u0013\u0005A%\u0001\u0004t_V\u00148-Z\u000b\u0002KA\u0011QBJ\u0005\u0003O9\u0011aa\u0015;sS:<\u0007BB\u0015\u0001A\u0003%Q%A\u0004t_V\u00148-\u001a\u0011\t\u000f-\u0002!\u0019!C\u0001I\u0005aQ.[4sCRLwN\\&fs\"1Q\u0006\u0001Q\u0001\n\u0015\nQ\"\\5he\u0006$\u0018n\u001c8LKf\u0004\u0003bB\u0018\u0001\u0005\u0004%\t\u0001J\u0001\r[&<'/\u0019;j_:4\u0016\r\u001c\u0005\u0007c\u0001\u0001\u000b\u0011B\u0013\u0002\u001b5LwM]1uS>tg+\u00197!\u0011\u001d\u0019\u0004\u00011A\u0005\u0002Q\n\u0011bY8o]\u0016\u001cGOW6\u0016\u0003U\u00022AN\u001d<\u001b\u00059$\"\u0001\u001d\u0002\u000bM\u001c\u0017\r\\1\n\u0005i:$!\u0003$v]\u000e$\u0018n\u001c81!\ta\u0014)D\u0001>\u0015\tqt(\u0001\u0005{W\u000ed\u0017.\u001a8u\u0015\t\u0001\u0005\"\u0001\u0004Ja%#XmY\u0005\u0003\u0005v\u0012\u0001BW6DY&,g\u000e\u001e\u0005\b\t\u0002\u0001\r\u0011\"\u0001F\u00035\u0019wN\u001c8fGRT6n\u0018\u0013fcR\u0011a)\u0013\t\u0003m\u001dK!\u0001S\u001c\u0003\tUs\u0017\u000e\u001e\u0005\b\u0015\u000e\u000b\t\u00111\u00016\u0003\rAH%\r\u0005\u0007\u0019\u0002\u0001\u000b\u0015B\u001b\u0002\u0015\r|gN\\3dij[\u0007\u0005C\u0003O\u0001\u0011%q*A\fhKR\u001c\u0005.Z2la>Lg\u000e^*zgR,WNT1nKR\u0011\u0001K\u0016\t\u0003#Rs!A\u000e*\n\u0005M;\u0014A\u0002)sK\u0012,g-\u0003\u0002(+*\u00111k\u000e\u0005\u0006/6\u0003\r\u0001W\u0001\u0007G>tg-[4\u0011\u0005e[V\"\u0001.\u000b\u0005]#\u0011B\u0001/[\u0005\u0019\u0019uN\u001c4jO\")a\f\u0001C\u0005?\u0006Yq-\u001a;DY&,g\u000e^%e)\t\u0001\u0006\rC\u0003X;\u0002\u0007\u0001\fC\u0003c\u0001\u0011%1-A\u000bhKR$v\u000e]5d\u001b\u0016$\u0018\rZ1uCN#xN]3\u0015\u0005\u0011<\u0007CA\rf\u0013\t1'D\u0001\nU_BL7-T3uC\u0012\fG/Y*u_J,\u0007\"B,b\u0001\u0004A\u0006\"B5\u0001\t\u0013Q\u0017\u0001D4fi\u000e{gN\\3dij[GCA\u001bl\u0011\u00159\u0006\u000e1\u0001Y\u0011\u0015i\u0007\u0001\"\u0011o\u0003\u001di\u0017n\u001a:bi\u0016$\"AR8\t\u000b]c\u0007\u0019\u0001-\t\u000bE\u0004A\u0011\u0001:\u0002+5LwM]1uS>tg+\u001a:jM&\u001c\u0017\r^5p]R\u00111O\u001e\t\u0003mQL!!^\u001c\u0003\u000f\t{w\u000e\\3b]\")q\u000f\u001da\u0001q\u0006I2m\\8sI&t\u0017\r^8s'f\u001cH/Z7D_:\u001cX/\\3s!\tIh0D\u0001{\u0015\tYH0\u0001\u0004tiJ,\u0017-\u001c\u0006\u0003{\u0012\t1bY8pe\u0012Lg.\u0019;pe&\u0011qP\u001f\u0002 \u0007>|'\u000fZ5oCR|'o\u0015;sK\u0006l7+_:uK6\u001cuN\\:v[\u0016\u0014\bbBA\u0002\u0001\u0011\u0005\u0011QA\u0001\u0018[&<'/\u0019;j_:\u001cu.\u001c9mKRLwN\\'be.$2ARA\u0004\u0011!\tI!!\u0001A\u0002\u0005-\u0011!G2p_J$\u0017N\\1u_J\u001c\u0016p\u001d;f[B\u0013x\u000eZ;dKJ\u00042!_A\u0007\u0013\r\tyA\u001f\u0002 \u0007>|'\u000fZ5oCR|'o\u0015;sK\u0006l7+_:uK6\u0004&o\u001c3vG\u0016\u0014\b")
/* loaded from: input_file:org/apache/samza/migration/KafkaCheckpointMigration.class */
public class KafkaCheckpointMigration implements MigrationPlan, Logging {
    private final String source;
    private final String migrationKey;
    private final String migrationVal;
    private Function0<ZkClient> connectZk;
    private final String loggerName;
    private final Logger logger;
    private volatile boolean bitmap$0;

    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.logger = Logging.class.logger(this);
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$0 ? this.logger : logger$lzycompute();
    }

    public void org$apache$samza$util$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    public void trace(Function0<Object> function0) {
        Logging.class.trace(this, function0);
    }

    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.class.trace(this, function0, function02);
    }

    public void debug(Function0<Object> function0) {
        Logging.class.debug(this, function0);
    }

    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.class.debug(this, function0, function02);
    }

    public void info(Function0<Object> function0) {
        Logging.class.info(this, function0);
    }

    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.class.info(this, function0, function02);
    }

    public void warn(Function0<Object> function0) {
        Logging.class.warn(this, function0);
    }

    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.class.warn(this, function0, function02);
    }

    public void error(Function0<Object> function0) {
        Logging.class.error(this, function0);
    }

    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.class.error(this, function0, function02);
    }

    public void putMDC(Function0<String> function0, Function0<String> function02) {
        Logging.class.putMDC(this, function0, function02);
    }

    public String getMDC(Function0<String> function0) {
        return Logging.class.getMDC(this, function0);
    }

    public void removeMDC(Function0<String> function0) {
        Logging.class.removeMDC(this, function0);
    }

    public void clearMDC() {
        Logging.class.clearMDC(this);
    }

    public String source() {
        return this.source;
    }

    public String migrationKey() {
        return this.migrationKey;
    }

    public String migrationVal() {
        return this.migrationVal;
    }

    public Function0<ZkClient> connectZk() {
        return this.connectZk;
    }

    public void connectZk_$eq(Function0<ZkClient> function0) {
        this.connectZk = function0;
    }

    private String getCheckpointSystemName(Config config) {
        return (String) KafkaConfig$.MODULE$.Config2Kafka(config).getCheckpointSystem().getOrElse(new KafkaCheckpointMigration$$anonfun$getCheckpointSystemName$1(this));
    }

    private String getClientId(Config config) {
        return KafkaUtil$.MODULE$.getClientId("samza-checkpoint-manager", config);
    }

    private TopicMetadataStore getTopicMetadataStore(Config config) {
        String clientId = getClientId(config);
        String checkpointSystemName = getCheckpointSystemName(config);
        KafkaProducerConfig kafkaSystemProducerConfig = KafkaConfig$.MODULE$.Config2Kafka(config).getKafkaSystemProducerConfig(checkpointSystemName, clientId, KafkaCheckpointManagerFactory$.MODULE$.INJECTED_PRODUCER_PROPERTIES());
        KafkaConfig Config2Kafka = KafkaConfig$.MODULE$.Config2Kafka(config);
        return new ClientUtilTopicMetadataStore(kafkaSystemProducerConfig.bootsrapServers(), clientId, Config2Kafka.getKafkaSystemConsumerConfig(checkpointSystemName, clientId, Config2Kafka.getKafkaSystemConsumerConfig$default$3(), Config2Kafka.getKafkaSystemConsumerConfig$default$4()).socketTimeoutMs());
    }

    private Function0<ZkClient> getConnectZk(Config config) {
        String clientId = getClientId(config);
        String checkpointSystemName = getCheckpointSystemName(config);
        KafkaConfig Config2Kafka = KafkaConfig$.MODULE$.Config2Kafka(config);
        return new KafkaCheckpointMigration$$anonfun$getConnectZk$1(this, (String) Option$.MODULE$.apply(Config2Kafka.getKafkaSystemConsumerConfig(checkpointSystemName, clientId, Config2Kafka.getKafkaSystemConsumerConfig$default$3(), Config2Kafka.getKafkaSystemConsumerConfig$default$4()).zkConnect()).getOrElse(new KafkaCheckpointMigration$$anonfun$1(this)));
    }

    public void migrate(Config config) {
        String str = (String) JobConfig$.MODULE$.Config2Job(config).getName().getOrElse(new KafkaCheckpointMigration$$anonfun$2(this));
        String str2 = (String) JobConfig$.MODULE$.Config2Job(config).getJobId().getOrElse(new KafkaCheckpointMigration$$anonfun$3(this));
        String checkpointTopic = KafkaUtil$.MODULE$.getCheckpointTopic(str, str2);
        CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
        CoordinatorStreamSystemConsumer coordinatorStreamSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
        CoordinatorStreamSystemProducer coordinatorStreamSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap());
        KafkaCheckpointManager kafkaCheckpointManager = (KafkaCheckpointManager) new KafkaCheckpointManagerFactory().getCheckpointManager(config, new NoOpMetricsRegistry());
        KafkaUtil kafkaUtil = new KafkaUtil(new ExponentialSleepStrategy(ExponentialSleepStrategy$.MODULE$.$lessinit$greater$default$1(), ExponentialSleepStrategy$.MODULE$.$lessinit$greater$default$2(), ExponentialSleepStrategy$.MODULE$.$lessinit$greater$default$3()), getConnectZk(config));
        if (kafkaUtil.topicExists(checkpointTopic)) {
            kafkaUtil.validateTopicPartitionCount(checkpointTopic, getCheckpointSystemName(config), getTopicMetadataStore(config), 1, JobConfig$.MODULE$.Config2Job(config).failOnCheckpointValidation());
            if (migrationVerification(coordinatorStreamSystemConsumer)) {
                info(new KafkaCheckpointMigration$$anonfun$migrate$1(this));
                return;
            }
            info(new KafkaCheckpointMigration$$anonfun$migrate$2(this));
            info(new KafkaCheckpointMigration$$anonfun$migrate$3(this, checkpointTopic));
            Map<TaskName, Integer> readChangeLogPartitionMapping = kafkaCheckpointManager.readChangeLogPartitionMapping();
            kafkaCheckpointManager.stop();
            info(new KafkaCheckpointMigration$$anonfun$migrate$4(this, str, str2));
            ChangelogPartitionManager changelogPartitionManager = new ChangelogPartitionManager(coordinatorStreamSystemProducer, coordinatorStreamSystemConsumer, source());
            changelogPartitionManager.start();
            changelogPartitionManager.writeChangeLogPartitionMapping(readChangeLogPartitionMapping);
            changelogPartitionManager.stop();
        }
        migrationCompletionMark(coordinatorStreamSystemProducer);
    }

    public boolean migrationVerification(CoordinatorStreamSystemConsumer coordinatorStreamSystemConsumer) {
        coordinatorStreamSystemConsumer.register();
        coordinatorStreamSystemConsumer.start();
        coordinatorStreamSystemConsumer.bootstrap();
        return coordinatorStreamSystemConsumer.getBootstrappedStream("set-migration-info").contains(new SetMigrationMetaMessage(source(), migrationKey(), migrationVal()));
    }

    public void migrationCompletionMark(CoordinatorStreamSystemProducer coordinatorStreamSystemProducer) {
        info(new KafkaCheckpointMigration$$anonfun$migrationCompletionMark$1(this));
        SetMigrationMetaMessage setMigrationMetaMessage = new SetMigrationMetaMessage(source(), migrationKey(), migrationVal());
        coordinatorStreamSystemProducer.start();
        coordinatorStreamSystemProducer.send(setMigrationMetaMessage);
        coordinatorStreamSystemProducer.stop();
    }

    public KafkaCheckpointMigration() {
        Logging.class.$init$(this);
        this.source = "CHECKPOINTMIGRATION";
        this.migrationKey = "CheckpointMigration09to10";
        this.migrationVal = "true";
        this.connectZk = null;
    }
}
