package io.confluent.connect.replicator;

import io.confluent.connect.replicator.exec.EmbeddedHerder;
import io.confluent.connect.replicator.exec.ReplicatorCli;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/replicator/ReplicatorApp.class */
public class ReplicatorApp {
    private static final Logger log = LoggerFactory.getLogger(ReplicatorApp.class);
    private final ReplicatorCli cli;
    private final Map<String, String> appProps;
    private final Time time;
    private Plugins plugins;
    private DistributedConfig workerConfig;
    private EmbeddedHerder herder;
    private Connect app;
    private RestServer rest;
    private String appInstanceName;

    public ReplicatorApp() {
        this(Time.SYSTEM);
    }

    public ReplicatorApp(Time time) {
        this.time = time;
        this.cli = new ReplicatorCli();
        this.appProps = new HashMap();
    }

    public ReplicatorApp(Time time, Connect connect, RestServer restServer, EmbeddedHerder embeddedHerder) {
        this(time);
        this.app = connect;
        this.rest = restServer;
        this.herder = embeddedHerder;
    }

    public static void main(String[] strArr) {
        ReplicatorApp replicatorApp = new ReplicatorApp();
        replicatorApp.validate(strArr);
        replicatorApp.registerReplicatorPlugin();
        replicatorApp.config();
        replicatorApp.start();
    }

    public static String name() {
        return "Replicator";
    }

    public void validate(String[] strArr) {
        log.info("Validating input arguments");
        long hiResClockMs = this.time.hiResClockMs();
        this.appProps.putAll(this.cli.parse(strArr));
        log.info("{} input argument validation took {}ms", name(), Long.valueOf(this.time.hiResClockMs() - hiResClockMs));
    }

    public void registerReplicatorPlugin() {
        log.info("Scanning for {} plugin classes. This might take a moment ...", name());
        long hiResClockMs = this.time.hiResClockMs();
        if (this.appProps.containsKey("plugin.path")) {
            log.info("Setting '{}' to '{}' using the supplied configuration.", "plugin.path", this.appProps.get("plugin.path"));
        } else {
            try {
                Path parent = Paths.get(getClass().getProtectionDomain().getCodeSource().getLocation().getPath(), new String[0]).getParent().getParent();
                log.info("Setting '{}' to '{}' using the location of {}.", new Object[]{"plugin.path", parent, name()});
                this.appProps.put("plugin.path", parent.toString());
            } catch (Exception e) {
                log.warn("Unable to extract '{}' from '{}' location: ", new Object[]{"plugin.path", getClass().getName(), e});
            }
        }
        this.plugins = new Plugins(this.appProps);
        int i = 0;
        Iterator it = this.plugins.connectors().iterator();
        while (it.hasNext()) {
            i += ((PluginDesc) it.next()).className().equals(ReplicatorSourceConnector.class.getName()) ? 1 : 0;
        }
        if (i > 1) {
            throw new ConnectException("More than one classes of plugin '" + ReplicatorSourceConnector.class.getName() + "' were found in the plugin.path: " + this.workerConfig.getString("plugin.path") + " and the CLASSPATH");
        }
        log.info("{} plugin scanning took {}ms", name(), Long.valueOf(this.time.hiResClockMs() - hiResClockMs));
    }

    public void config() {
        long hiResClockMs = this.time.hiResClockMs();
        this.appInstanceName = this.appProps.get("name");
        this.workerConfig = new DistributedConfig(this.appProps);
        this.rest = new RestServer(this.workerConfig);
        this.rest.initializeServer();
        URI advertisedUrl = this.rest.advertisedUrl();
        String str = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
        KafkaOffsetBackingStore kafkaOffsetBackingStore = new KafkaOffsetBackingStore();
        kafkaOffsetBackingStore.configure(this.workerConfig);
        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = (ConnectorClientConfigOverridePolicy) this.plugins.newPlugin(this.workerConfig.getString("connector.client.config.override.policy"), this.workerConfig, ConnectorClientConfigOverridePolicy.class);
        Worker worker = new Worker(str, this.time, this.plugins, this.workerConfig, kafkaOffsetBackingStore, connectorClientConfigOverridePolicy);
        WorkerConfigTransformer configTransformer = worker.configTransformer();
        KafkaStatusBackingStore kafkaStatusBackingStore = new KafkaStatusBackingStore(this.time, worker.getInternalValueConverter());
        kafkaStatusBackingStore.configure(this.workerConfig);
        this.herder = new EmbeddedHerder(this.appInstanceName, this.workerConfig, worker, ConnectUtils.lookupKafkaClusterId(this.workerConfig), kafkaStatusBackingStore, new KafkaConfigBackingStore(worker.getInternalValueConverter(), this.workerConfig, configTransformer), advertisedUrl.toString(), this.time, connectorClientConfigOverridePolicy);
        this.app = new Connect(this.herder, this.rest);
        log.info("{} initialization took {}ms", name(), Long.valueOf(this.time.hiResClockMs() - hiResClockMs));
    }

    public void start() {
        try {
            this.app.start();
        } catch (Exception e) {
            log.error("Failed to start {} worker", name(), e);
            this.app.stop();
        }
        try {
            Callback<Herder.Created<ConnectorInfo>> futureCallback = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() { // from class: io.confluent.connect.replicator.ReplicatorApp.1
                public void onCompletion(Throwable th, Herder.Created<ConnectorInfo> created) {
                    if (th != null) {
                        ReplicatorApp.log.error("Failed to create job for {}", ReplicatorApp.name());
                    } else {
                        ReplicatorApp.log.info("Created {}", ((ConnectorInfo) created.result()).name());
                    }
                }
            });
            this.herder.putConnectorConfig(this.appInstanceName, this.appProps, true, futureCallback);
            futureCallback.get();
        } catch (Throwable th) {
            if (th.getCause() instanceof NotLeaderException) {
                log.info("This instance of Replicator is not cluster leader. Will not submit a configuration");
            } else {
                log.error("Stopping after {} error", name(), th);
                this.app.stop();
            }
        }
        this.app.awaitStop();
    }
}
