package org.apache.drill.exec.store.kafka;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import org.apache.drill.PlanTestBase;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;

/* loaded from: input_file:org/apache/drill/exec/store/kafka/KafkaTestBase.class */
public class KafkaTestBase extends PlanTestBase {
    protected static KafkaStoragePluginConfig storagePluginConfig;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        Assume.assumeTrue(TestKafkaSuit.isRunningSuite());
        TestKafkaSuit.initKafka();
        initKafkaStoragePlugin(TestKafkaSuit.embeddedKafkaCluster);
    }

    public static void initKafkaStoragePlugin(EmbeddedKafkaCluster embeddedKafkaCluster) throws Exception {
        StoragePluginRegistry storage = getDrillbitContext().getStorage();
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("bootstrap.servers", embeddedKafkaCluster.getKafkaBrokerList());
        newHashMap.put("group.id", "drill-test-consumer");
        storagePluginConfig = new KafkaStoragePluginConfig(newHashMap);
        storagePluginConfig.setEnabled(true);
        storage.createOrUpdate("kafka", storagePluginConfig, true);
        testNoResult(String.format("alter session set `%s` = '%s'", "store.kafka.record.reader", "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader"), new Object[0]);
        testNoResult(String.format("alter session set `%s` = %d", "store.kafka.poll.timeout", 200), new Object[0]);
    }

    public List<QueryDataBatch> runKafkaSQLWithResults(String str) throws Exception {
        return testSqlWithResults(str);
    }

    public void runKafkaSQLVerifyCount(String str, int i) throws Exception {
        logResultAndVerifyRowCount(runKafkaSQLWithResults(str), i);
    }

    public void logResultAndVerifyRowCount(List<QueryDataBatch> list, int i) throws SchemaChangeException {
        int logResult = logResult(list);
        if (i != -1) {
            Assert.assertEquals(i, logResult);
        }
    }

    public void testHelper(String str, String str2, int i) throws Exception {
        testPhysicalPlan(str, new String[]{str2});
        int testSql = testSql(str);
        Assert.assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s", Integer.valueOf(i), Integer.valueOf(testSql)), i, testSql);
    }

    @AfterClass
    public static void tearDownKafkaTestBase() throws Exception {
        TestKafkaSuit.tearDownCluster();
    }
}
