package io.confluent.connect.jms.integration;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/jms/integration/BaseJmsSourceIT.class */
public abstract class BaseJmsSourceIT {
    protected static final int DEFAULT_NUM_TOPIC_PARTITIONS = 5;
    protected static final String DEFAULT_CONNECTOR_NAME = "jms-source-conn";
    protected static final int NUM_TASKS = 1;
    protected static final int AWAIT_TASK_STARTUP_MS = 5000;
    protected static final String DEFAULT_KAFKA_TOPIC = "from.jms.to.kafka";
    protected static final String DEFAULT_QUEUE_NAME = "jms-to-kafka-queue";
    private EmbeddedConnectCluster connect;
    protected Map<String, String> connectorProps;
    private String connectorName;
    private boolean deployed = false;
    private int kafkaTopicPartitions;
    private String kafkaTopic;
    private String jmsQueueName;
    protected long maxDurationMillis;
    private static final Logger log = LoggerFactory.getLogger(BaseJmsSourceIT.class);
    protected static final long DEFAULT_CONSUME_MAX_DURATION_MS = TimeUnit.SECONDS.toMillis(120);

    public abstract void doProduceRecords(String str, int i);

    public abstract AbstractConfig config(Map<String, String> map);

    @Before
    public void setup() throws IOException {
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(NUM_TASKS).numBrokers(NUM_TASKS).build();
        this.connect.start();
        this.connectorName = DEFAULT_CONNECTOR_NAME;
        this.deployed = false;
        this.maxDurationMillis = DEFAULT_CONSUME_MAX_DURATION_MS;
        this.kafkaTopicPartitions = DEFAULT_NUM_TOPIC_PARTITIONS;
        this.kafkaTopic = DEFAULT_KAFKA_TOPIC;
        this.jmsQueueName = DEFAULT_QUEUE_NAME;
        this.connectorProps = new HashMap();
        initializeConnectorProperties();
    }

    @After
    public void close() {
        this.connect.stop();
    }

    protected void produceRecords(int i) {
        log.info("Writing {} records directly to the JMS broker", Integer.valueOf(i));
        doProduceRecords(this.jmsQueueName, i);
        log.info("Wrote {} records directly to the JMS broker", Integer.valueOf(i));
    }

    protected void initializeConnectorProperties() {
        this.connectorProps.put("tasks.max", String.valueOf(NUM_TASKS));
        this.connectorProps.put("kafka.topic", this.kafkaTopic);
        this.connectorProps.put("jms.destination.name", this.jmsQueueName);
        this.connectorProps.put("jms.destination.type", "queue");
        this.connectorProps.put("confluent.topic.bootstrap.servers", this.connect.kafka().bootstrapServers());
        this.connectorProps.put("confluent.topic.replication.factor", "1");
        this.connectorProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        this.connectorProps.put("value.converter.schemas.enable", "false");
    }

    protected void setConnectorClass(String str) {
        this.connectorProps.put("connector.class", str);
    }

    protected void setConnectorClass(Class<? extends SourceConnector> cls) {
        checkNotDeployed();
        setConnectorClass(cls.getName());
    }

    protected void setConnectorName(String str) {
        checkNotDeployed();
        this.connectorName = str;
    }

    protected void setProviderUrl(String str) {
        checkNotDeployed();
        this.connectorProps.put("java.naming.provider.url", str);
    }

    protected void setMaxTasks(int i) {
        checkNotDeployed();
        this.connectorProps.put("tasks.max", String.valueOf(i));
    }

    protected void setInitialContextFactory(String str) {
        checkNotDeployed();
        this.connectorProps.put("java.naming.factory.initial", str);
    }

    protected void setSecurityPrincipal(String str) {
        checkNotDeployed();
        this.connectorProps.put("java.naming.security.principal", str);
    }

    protected void setSecurityCredentials(String str) {
        checkNotDeployed();
        if (str != null) {
            this.connectorProps.put("java.naming.security.credentials", str);
        } else {
            this.connectorProps.remove("java.naming.security.credentials");
        }
    }

    protected void setKafkaTopicPartitions(int i) {
        checkNotDeployed();
        this.kafkaTopicPartitions = i;
    }

    protected void setKafkaTopic(String str) {
        checkNotDeployed();
        this.kafkaTopic = str;
        this.connectorProps.put("kafka.topic", str);
    }

    protected void setJmsQueueName(String str) {
        checkNotDeployed();
        this.jmsQueueName = str;
        this.connectorProps.put("jms.destination.name", str);
    }

    protected void createKafkaTopic() {
        this.connect.kafka().createTopic(this.kafkaTopic, this.kafkaTopicPartitions);
    }

    protected void startConnectorAndWait() throws IOException, InterruptedException {
        if (this.deployed) {
            log.info("Connector {} already running", this.connectorName);
            return;
        }
        int parseInt = Integer.parseInt(this.connectorProps.get("tasks.max"));
        log.info("Starting connector {} with configs: {}", this.connectorName, config(this.connectorProps));
        this.connect.configureConnector(this.connectorName, this.connectorProps);
        this.deployed = true;
        TestUtils.waitForCondition(safely(() -> {
            return Boolean.valueOf(this.connect.connectorStatus(this.connectorName).tasks().size() == parseInt);
        }), 5000L, "Timed out waiting for connector task to be assigned a partition.");
        log.info("Started connector {}", this.connectorName);
    }

    protected void deleteConnector() throws IOException {
        if (this.deployed) {
            log.info("Deleting connector {}", this.connectorName);
            this.connect.deleteConnector(this.connectorName);
            this.deployed = false;
            log.info("Deleted connector {}", this.connectorName);
        }
    }

    protected void checkNotDeployed() {
        if (this.deployed) {
            throw new IllegalStateException("The connector has already been deployed");
        }
    }

    protected ConsumerRecords<byte[], byte[]> consume(int i) {
        return consume(i, this.maxDurationMillis);
    }

    protected ConsumerRecords<byte[], byte[]> consume(int i, long j) {
        return this.connect.kafka().consume(i, j, new String[]{this.kafkaTopic});
    }

    protected void runConnector(String str, int i, int i2, String str2) throws Exception {
        Assert.assertTrue("Total records should be larger than initial records", i2 >= i);
        setJmsQueueName(str);
        setKafkaTopic(str2);
        createKafkaTopic();
        if (i > 0) {
            produceRecords(i);
        }
        startConnectorAndWait();
        if (i2 > i) {
            produceRecords(i2 - i);
        }
        log.info("Waiting for records to be produced into Kafka");
        ConsumerRecords<byte[], byte[]> consume = consume(i2);
        Iterator it = consume.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            log.debug("Received >> {}, {} -> {}, {}", new Object[]{consumerRecord.topic(), Long.valueOf(consumerRecord.offset()), new String((byte[]) consumerRecord.key(), StandardCharsets.UTF_8), new String((byte[]) consumerRecord.value(), StandardCharsets.UTF_8)});
        }
        log.info("Consumed {} records from Kafka", Integer.valueOf(consume.count()));
        deleteConnector();
    }

    private static TestCondition safely(Supplier<Boolean> supplier) {
        return () -> {
            try {
                return ((Boolean) supplier.get()).booleanValue();
            } catch (RuntimeException e) {
                return false;
            }
        };
    }
}
