package com.couchbase.kafka;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.CouchbaseCore;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.deps.com.lmax.disruptor.EventHandler;
import com.couchbase.client.deps.com.lmax.disruptor.ExceptionHandler;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import com.couchbase.client.deps.com.lmax.disruptor.dsl.Disruptor;
import com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory;
import com.couchbase.kafka.DefaultCouchbaseKafkaEnvironment;
import com.couchbase.kafka.filter.Filter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.cluster.Broker;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import scala.collection.Iterator;

/* loaded from: input_file:com/couchbase/kafka/CouchbaseKafkaConnector.class */
public class CouchbaseKafkaConnector implements Runnable {
    private static final String DEFAULT_BUCKET = "default";
    private static final String DEFAULT_PASSWORD = "";
    private static final String DEFAULT_TOPIC = "default";
    private static final String DEFAULT_COUCHBASE_NODE = "127.0.0.1";
    private static final String DEFAULT_ZOOKEEPER_NODE = "127.0.0.1:2181";
    private final ClusterFacade core;
    private final ExecutorService disruptorExecutor;
    private final Disruptor<DCPEvent> disruptor;
    private final RingBuffer<DCPEvent> dcpRingBuffer;
    private final KafkaWriter kafkaWriter;
    private final Producer<String, DCPEvent> producer;
    private final CouchbaseReader couchbaseReader;
    private final Filter filter;
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(CouchbaseKafkaConnector.class);
    private static final DCPEventFactory DCP_EVENT_FACTORY = new DCPEventFactory();

    public static CouchbaseKafkaConnector create() {
        return create(DEFAULT_COUCHBASE_NODE, "default", DEFAULT_PASSWORD, DEFAULT_ZOOKEEPER_NODE, "default");
    }

    public static CouchbaseKafkaConnector create(String str, String str2, String str3, String str4, String str5) {
        return create(((DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment.builder().dcpEnabled(true)).m4build(), str, str2, str3, str4, str5);
    }

    public static CouchbaseKafkaConnector create(CouchbaseKafkaEnvironment couchbaseKafkaEnvironment, String str, String str2, String str3, String str4, String str5) {
        return new CouchbaseKafkaConnector(couchbaseKafkaEnvironment, Collections.singletonList(str), str2, str3, str4, str5);
    }

    private CouchbaseKafkaConnector(CouchbaseKafkaEnvironment couchbaseKafkaEnvironment, List<String> list, String str, String str2, String str3, String str4) {
        try {
            this.filter = (Filter) Class.forName(couchbaseKafkaEnvironment.kafkaFilterClass()).newInstance();
            this.core = new CouchbaseCore(couchbaseKafkaEnvironment);
            this.disruptorExecutor = Executors.newFixedThreadPool(2, new DefaultThreadFactory("cb-kafka", true));
            this.disruptor = new Disruptor<>(DCP_EVENT_FACTORY, couchbaseKafkaEnvironment.kafkaEventBufferSize(), this.disruptorExecutor);
            this.disruptor.handleExceptionsWith(new ExceptionHandler() { // from class: com.couchbase.kafka.CouchbaseKafkaConnector.1
                public void handleEventException(Throwable th, long j, Object obj) {
                    CouchbaseKafkaConnector.LOGGER.warn("Exception while Handling DCP Events {}, {}", obj, th);
                }

                public void handleOnStartException(Throwable th) {
                    CouchbaseKafkaConnector.LOGGER.warn("Exception while Starting DCP RingBuffer {}", th);
                }

                public void handleOnShutdownException(Throwable th) {
                    CouchbaseKafkaConnector.LOGGER.info("Exception while shutting down DCP RingBuffer {}", th);
                }
            });
            Properties properties = new Properties();
            ZkClient zkClient = new ZkClient(str3, 4000, 6000, ZKStringSerializer$.MODULE$);
            ArrayList arrayList = new ArrayList();
            Iterator it = ZkUtils.getAllBrokersInCluster(zkClient).iterator();
            while (it.hasNext()) {
                Broker broker = (Broker) it.next();
                arrayList.add(broker.host() + ":" + broker.port());
            }
            properties.put("metadata.broker.list", joinNodes(arrayList));
            properties.put("serializer.class", couchbaseKafkaEnvironment.kafkaValueSerializerClass());
            properties.put("key.serializer.class", couchbaseKafkaEnvironment.kafkaKeySerializerClass());
            this.producer = new Producer<>(new ProducerConfig(properties));
            this.kafkaWriter = new KafkaWriter(str4, this.producer, this.filter);
            this.disruptor.handleEventsWith(new EventHandler[]{this.kafkaWriter});
            this.disruptor.start();
            this.dcpRingBuffer = this.disruptor.getRingBuffer();
            this.couchbaseReader = new CouchbaseReader(this.core, this.dcpRingBuffer, list, str, str2);
            this.couchbaseReader.connect();
        } catch (ReflectiveOperationException e) {
            throw new IllegalArgumentException("Cannot initialize filter class", e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        this.couchbaseReader.run();
    }

    private String joinNodes(List<String> list) {
        StringBuilder sb = new StringBuilder();
        boolean z = true;
        for (String str : list) {
            if (z) {
                z = false;
            } else {
                sb.append(";");
            }
            sb.append(str);
        }
        return sb.toString();
    }
}
