package org.apache.kafka.connect.mirror;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorMaker.class */
public class MirrorMaker {
    private static final long SHUTDOWN_TIMEOUT_SECONDS = 60;
    private final Map<SourceAndTarget, Herder> herders;
    private CountDownLatch startLatch;
    private CountDownLatch stopLatch;
    private final AtomicBoolean shutdown;
    private final ShutdownHook shutdownHook;
    private final String advertisedBaseUrl;
    private final Time time;
    private final MirrorMakerConfig config;
    private final Set<String> clusters;
    private final Set<SourceAndTarget> herderPairs;
    private static final Logger log = LoggerFactory.getLogger(MirrorMaker.class);
    private static final ConnectorClientConfigOverridePolicy CLIENT_CONFIG_OVERRIDE_POLICY = new AllConnectorClientConfigOverridePolicy();
    private static final List<Class> CONNECTOR_CLASSES = Arrays.asList(MirrorSourceConnector.class, MirrorHeartbeatConnector.class, MirrorCheckpointConnector.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorMaker$ShutdownHook.class */
    public class ShutdownHook extends Thread {
        private ShutdownHook() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                if (!MirrorMaker.this.startLatch.await(60L, TimeUnit.SECONDS)) {
                    MirrorMaker.log.error("Timed out in shutdown hook waiting for MirrorMaker startup to finish. Unable to shutdown cleanly.");
                }
            } catch (InterruptedException e) {
                MirrorMaker.log.error("Interrupted in shutdown hook while waiting for MirrorMaker startup to finish. Unable to shutdown cleanly.");
            } finally {
                MirrorMaker.this.stop();
            }
        }
    }

    public MirrorMaker(MirrorMakerConfig mirrorMakerConfig, List<String> list, Time time) {
        this.herders = new HashMap();
        this.shutdown = new AtomicBoolean(false);
        log.debug("Kafka MirrorMaker instance created");
        this.time = time;
        this.advertisedBaseUrl = "NOTUSED";
        this.config = mirrorMakerConfig;
        if (list == null || list.isEmpty()) {
            this.clusters = mirrorMakerConfig.clusters();
        } else {
            this.clusters = new HashSet(list);
        }
        log.info("Targeting clusters {}", this.clusters);
        this.herderPairs = (Set) mirrorMakerConfig.clusterPairs().stream().filter(sourceAndTarget -> {
            return this.clusters.contains(sourceAndTarget.target());
        }).collect(Collectors.toSet());
        if (this.herderPairs.isEmpty()) {
            throw new IllegalArgumentException("No source->target replication flows.");
        }
        this.herderPairs.forEach(sourceAndTarget2 -> {
            addHerder(sourceAndTarget2);
        });
        this.shutdownHook = new ShutdownHook();
    }

    public MirrorMaker(Map<String, String> map, List<String> list, Time time) {
        this(new MirrorMakerConfig(map), list, time);
    }

    public MirrorMaker(Map<String, String> map, List<String> list) {
        this(map, list, Time.SYSTEM);
    }

    public MirrorMaker(Map<String, String> map) {
        this(map, null);
    }

    public void start() {
        log.info("Kafka MirrorMaker starting with {} herders.", Integer.valueOf(this.herders.size()));
        if (this.startLatch != null) {
            throw new IllegalStateException("MirrorMaker instance already started");
        }
        this.startLatch = new CountDownLatch(this.herders.size());
        this.stopLatch = new CountDownLatch(this.herders.size());
        Runtime.getRuntime().addShutdownHook(this.shutdownHook);
        Iterator<Herder> it = this.herders.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().start();
            } finally {
                this.startLatch.countDown();
            }
        }
        log.info("Configuring connectors...");
        this.herderPairs.forEach(sourceAndTarget -> {
            configureConnectors(sourceAndTarget);
        });
        log.info("Kafka MirrorMaker started");
    }

    public void stop() {
        if (this.shutdown.getAndSet(true)) {
            return;
        }
        log.info("Kafka MirrorMaker stopping");
        Iterator<Herder> it = this.herders.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().stop();
                this.stopLatch.countDown();
            } catch (Throwable th) {
                this.stopLatch.countDown();
                throw th;
            }
        }
        log.info("Kafka MirrorMaker stopped.");
    }

    public void awaitStop() {
        try {
            this.stopLatch.await();
        } catch (InterruptedException e) {
            log.error("Interrupted waiting for MirrorMaker to shutdown");
        }
    }

    private void configureConnector(SourceAndTarget sourceAndTarget, Class cls) {
        checkHerder(sourceAndTarget);
        this.herders.get(sourceAndTarget).putConnectorConfig(cls.getSimpleName(), this.config.connectorBaseConfig(sourceAndTarget, cls), true, (th, created) -> {
            if (th instanceof NotLeaderException) {
                log.info("Connector {} is a follower. Using existing configuration.", sourceAndTarget);
            } else {
                log.info("Connector {} configured.", sourceAndTarget, th);
            }
        });
    }

    private void checkHerder(SourceAndTarget sourceAndTarget) {
        if (!this.herders.containsKey(sourceAndTarget)) {
            throw new IllegalArgumentException("No herder for " + sourceAndTarget.toString());
        }
    }

    private void configureConnectors(SourceAndTarget sourceAndTarget) {
        CONNECTOR_CLASSES.forEach(cls -> {
            configureConnector(sourceAndTarget, cls);
        });
    }

    private void addHerder(SourceAndTarget sourceAndTarget) {
        log.info("creating herder for " + sourceAndTarget.toString());
        Map<String, String> workerConfig = this.config.workerConfig(sourceAndTarget);
        String str = this.advertisedBaseUrl + "/" + sourceAndTarget.source();
        String sourceAndTarget2 = sourceAndTarget.toString();
        Plugins plugins = new Plugins(workerConfig);
        plugins.compareAndSwapWithDelegatingLoader();
        DistributedConfig distributedConfig = new DistributedConfig(workerConfig);
        String lookupKafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
        KafkaOffsetBackingStore kafkaOffsetBackingStore = new KafkaOffsetBackingStore();
        kafkaOffsetBackingStore.configure(distributedConfig);
        Worker worker = new Worker(sourceAndTarget2, this.time, plugins, distributedConfig, kafkaOffsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
        WorkerConfigTransformer configTransformer = worker.configTransformer();
        Converter internalValueConverter = worker.getInternalValueConverter();
        KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(this.time, internalValueConverter);
        kafkaStatusBackingStore.configure(distributedConfig);
        this.herders.put(sourceAndTarget, new DistributedHerder(distributedConfig, this.time, worker, lookupKafkaClusterId, kafkaStatusBackingStore, new KafkaConfigBackingStore(internalValueConverter, distributedConfig, configTransformer), str, CLIENT_CONFIG_OVERRIDE_POLICY));
    }

    public static void main(String[] strArr) {
        ArgumentParser newArgumentParser = ArgumentParsers.newArgumentParser("connect-mirror-maker");
        newArgumentParser.description("MirrorMaker 2.0 driver");
        newArgumentParser.addArgument(new String[]{"config"}).type(Arguments.fileType().verifyCanRead()).metavar(new String[]{"mm2.properties"}).required(true).help("MM2 configuration file.");
        newArgumentParser.addArgument(new String[]{"--clusters"}).nargs("+").metavar(new String[]{"CLUSTER"}).required(false).help("Target cluster to use for this node.");
        try {
            Namespace parseArgs = newArgumentParser.parseArgs(strArr);
            File file = (File) parseArgs.get("config");
            List list = parseArgs.getList(MirrorMakerConfig.CLUSTERS_CONFIG);
            try {
                log.info("Kafka MirrorMaker initializing ...");
                MirrorMaker mirrorMaker = new MirrorMaker((Map<String, String>) Utils.propsToStringMap(Utils.loadProps(file.getPath())), (List<String>) list, Time.SYSTEM);
                try {
                    mirrorMaker.start();
                } catch (Exception e) {
                    log.error("Failed to start MirrorMaker", e);
                    mirrorMaker.stop();
                    Exit.exit(3);
                }
                mirrorMaker.awaitStop();
            } catch (Throwable th) {
                log.error("Stopping due to error", th);
                Exit.exit(2);
            }
        } catch (ArgumentParserException e2) {
            newArgumentParser.handleError(e2);
            Exit.exit(-1);
        }
    }
}
