/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.kafkaconnector.common.test;

import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.test.FunctionalTestMessageProducer;
import org.apache.camel.kafkaconnector.common.test.StringMessageConsumer;
import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CamelSourceTestSupport
extends AbstractKafkaTest {
    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceTestSupport.class);

    protected abstract void produceTestData();

    protected abstract void verifyMessages(TestMessageConsumer<?> var1);

    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws ExecutionException, InterruptedException {
        KafkaClient<String, String> kafkaClient = new KafkaClient<String, String>(this.getKafkaService().getBootstrapServers());
        StringMessageConsumer consumer = new StringMessageConsumer(kafkaClient, topic, count);
        this.runTest(connectorPropertyFactory, consumer);
    }

    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer) throws ExecutionException, InterruptedException {
        this.runTest(connectorPropertyFactory, consumer, this::produceTestData);
    }

    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer, FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
        connectorPropertyFactory.log();
        LOG.debug("Initialized the connector and put the data for the test execution");
        this.getKafkaConnectService().initializeConnector(connectorPropertyFactory);
        LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
        producer.produceMessages();
        LOG.debug("Creating the Kafka consumer ...");
        consumer.consumeMessages();
        LOG.debug("Ran the Kafka consumer ...");
        LOG.debug("Verifying messages");
        this.verifyMessages(consumer);
        LOG.debug("Verified messages");
    }

    public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws ExecutionException, InterruptedException {
        KafkaClient<String, String> kafkaClient = new KafkaClient<String, String>(this.getKafkaService().getBootstrapServers());
        StringMessageConsumer consumer = new StringMessageConsumer(kafkaClient, topic, count);
        this.runTestBlocking(connectorPropertyFactory, consumer);
    }

    public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer) throws ExecutionException, InterruptedException {
        this.runTestBlocking(connectorPropertyFactory, consumer, this::produceTestData);
    }

    public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer, FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
        connectorPropertyFactory.log();
        LOG.debug("Initialized the connector and put the data for the test execution");
        this.getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
        LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
        producer.produceMessages();
        LOG.debug("Creating the Kafka consumer ...");
        consumer.consumeMessages();
        LOG.debug("Ran the Kafka consumer ...");
        LOG.debug("Verifying messages");
        this.verifyMessages(consumer);
        LOG.debug("Verified messages");
    }
}

