/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.kafkaconnector.common.services.kafkaconnect;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.DefaultKafkaConnectPropertyFactory;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectPropertyFactory;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaConnectRunner {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectRunner.class);
    private final String bootstrapServer;
    private final KafkaConnectPropertyFactory kafkaConnectPropertyFactory;
    private final List<ConnectorPropertyFactory> connectorPropertyFactories = new ArrayList<ConnectorPropertyFactory>();
    private Connect connect;
    private Herder herder;

    public KafkaConnectRunner(String bootstrapServer) {
        this.bootstrapServer = bootstrapServer;
        this.kafkaConnectPropertyFactory = new DefaultKafkaConnectPropertyFactory(bootstrapServer);
    }

    private void init() {
        LOG.info("Started worked initialization");
        Time time = Time.SYSTEM;
        WorkerInfo initInfo = new WorkerInfo();
        initInfo.logAll();
        Properties props = this.kafkaConnectPropertyFactory.getProperties();
        Map standAloneProperties = Utils.propsToStringMap((Properties)props);
        Plugins plugins = new Plugins(standAloneProperties);
        StandaloneConfig config = new StandaloneConfig(standAloneProperties);
        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId((WorkerConfig)config);
        AllConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
        RestServer rest = new RestServer((WorkerConfig)config);
        rest.initializeServer();
        Worker worker = new Worker(this.bootstrapServer, time, plugins, (WorkerConfig)config, (OffsetBackingStore)new FileOffsetBackingStore(), (ConnectorClientConfigOverridePolicy)allConnectorClientConfigOverridePolicy);
        this.herder = new StandaloneHerder(worker, kafkaClusterId, (ConnectorClientConfigOverridePolicy)allConnectorClientConfigOverridePolicy);
        this.connect = new Connect(this.herder, rest);
        LOG.info("Finished initializing the worker");
    }

    public List<ConnectorPropertyFactory> getConnectorPropertyProducers() {
        return this.connectorPropertyFactories;
    }

    public void initializeConnector(ConnectorPropertyFactory connectorPropertyFactory, Consumer<ConnectorInitState> callback) throws ExecutionException, InterruptedException {
        Properties connectorProps = connectorPropertyFactory.getProperties();
        FutureCallback cb = new FutureCallback((error, info) -> callback.accept(new ConnectorInitState(((ConnectorInfo)info.result()).config(), info.created(), error)));
        this.herder.putConnectorConfig(connectorProps.getProperty("name"), Utils.propsToStringMap((Properties)connectorProps), false, (Callback)cb);
        cb.get();
    }

    public <T> void initializeConnector(ConnectorPropertyFactory connectorPropertyFactory, BiConsumer<ConnectorInitState, T> callback, T payload) throws ExecutionException, InterruptedException {
        Properties connectorProps = connectorPropertyFactory.getProperties();
        FutureCallback cb = new FutureCallback((error, info) -> callback.accept(new ConnectorInitState(((ConnectorInfo)info.result()).config(), info.created(), error), payload));
        this.herder.putConnectorConfig(connectorProps.getProperty("name"), Utils.propsToStringMap((Properties)connectorProps), false, (Callback)cb);
        cb.get();
    }

    public boolean run(CountDownLatch latch) {
        try {
            this.init();
            LOG.info("Starting the connect interface");
            this.connect.start();
            LOG.info("Started the connect interface");
        }
        catch (Throwable t) {
            LOG.error("Container init or start has failed due to: ", t);
        }
        finally {
            latch.countDown();
        }
        this.connect.awaitStop();
        return true;
    }

    public void stop() {
        if (this.connect != null) {
            this.connect.stop();
        } else {
            LOG.warn("Trying to stop an uninitialized Kafka Connect Runner");
        }
    }

    public class ConnectorInitState {
        private Map<String, String> configs;
        private boolean created;
        private Throwable error;

        public ConnectorInitState(Map<String, String> configs, boolean created, Throwable error) {
            this.configs = configs;
            this.created = created;
            this.error = error;
        }

        public Map<String, String> getConfigs() {
            return this.configs;
        }

        public boolean isCreated() {
            return this.created;
        }

        public Throwable getError() {
            return this.error;
        }
    }
}

