package org.apache.druid.tests.indexer;

import com.google.common.base.Throwables;
import com.google.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode$Disabled$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

/* loaded from: input_file:org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.class */
public class AbstractKafkaIndexerTest extends AbstractIndexerTest {
    private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
    private static final String INDEXER_FILE = "/indexer/kafka_supervisor_spec.json";
    private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
    private static final String TOPIC_NAME = "kafka_indexing_service_topic";
    private static final int NUM_EVENTS_TO_SEND = 60;
    private static final long WAIT_TIME_MILLIS = 120000;
    public static final String testPropertyPrefix = "kafka.test.property.";
    private String supervisorId;
    private ZkClient zkClient;
    private ZkUtils zkUtils;
    private boolean segmentsExist;
    private DateTime dtFirst;
    private DateTime dtLast;

    @Inject
    private TestQueryHelper queryHelper;

    @Inject
    private IntegrationTestingConfig config;
    private String fullDatasourceName;
    final String event_template = "{\"timestamp\": \"%s\",\"page\": \"Gypsy Danger\",\"language\" : \"en\",\"user\" : \"nuclear\",\"unpatrolled\" : \"true\",\"newPage\" : \"true\",\"robot\": \"false\",\"anonymous\": \"false\",\"namespace\":\"article\",\"continent\":\"North America\",\"country\":\"United States\",\"region\":\"Bay Area\",\"city\":\"San Francisco\",\"added\":%d,\"deleted\":%d,\"delta\":%d}";
    private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
    private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doKafkaIndexTest(String str, boolean z) {
        this.fullDatasourceName = str + this.config.getExtraDatasourceNameSuffix();
        try {
            String zookeeperHosts = this.config.getZookeeperHosts();
            this.zkClient = new ZkClient(zookeeperHosts, 10000, 10000, ZKStringSerializer$.MODULE$);
            this.zkUtils = new ZkUtils(this.zkClient, new ZkConnection(zookeeperHosts, 10000), false);
            if (this.config.manageKafkaTopic()) {
                AdminUtils.createTopic(this.zkUtils, TOPIC_NAME, 4, 1, new Properties(), RackAwareMode$Disabled$.MODULE$);
            }
            try {
                LOG.info("supervisorSpec name: [%s]", new Object[]{INDEXER_FILE});
                Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
                Properties properties = new Properties();
                properties.putAll(consumerProperties);
                properties.put("bootstrap.servers", this.config.getKafkaInternalHost());
                String replace = StringUtils.replace(StringUtils.replace(StringUtils.replace(getResourceAsString(INDEXER_FILE), "%%DATASOURCE%%", this.fullDatasourceName), "%%TOPIC%%", TOPIC_NAME), "%%CONSUMER_PROPERTIES%%", this.jsonMapper.writeValueAsString(properties));
                LOG.info("supervisorSpec: [%s]\n", new Object[]{replace});
                this.supervisorId = this.indexer.submitSupervisor(replace);
                LOG.info("Submitted supervisor", new Object[0]);
                Properties properties2 = new Properties();
                addFilteredProperties(this.config, properties2);
                properties2.put("bootstrap.servers", this.config.getKafkaHost());
                LOG.info("Kafka bootstrap.servers: [%s]", new Object[]{this.config.getKafkaHost()});
                properties2.put("acks", "all");
                properties2.put("retries", "3");
                properties2.put("key.serializer", ByteArraySerializer.class.getName());
                properties2.put("value.serializer", ByteArraySerializer.class.getName());
                if (z) {
                    properties2.put("enable.idempotence", "true");
                    properties2.put("transactional.id", RandomIdUtils.getRandomId());
                }
                KafkaProducer kafkaProducer = new KafkaProducer(properties2, new StringSerializer(), new StringSerializer());
                DateTimeZone inferTzFromString = DateTimes.inferTzFromString("UTC");
                DateTimeFormatter forPattern = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
                DateTime dateTime = new DateTime(inferTzFromString);
                this.dtFirst = dateTime;
                this.dtLast = dateTime;
                int i = 0;
                int i2 = 0;
                if (z) {
                    kafkaProducer.initTransactions();
                    kafkaProducer.beginTransaction();
                }
                while (i2 < NUM_EVENTS_TO_SEND) {
                    i2++;
                    i += i2;
                    String format = StringUtils.format("{\"timestamp\": \"%s\",\"page\": \"Gypsy Danger\",\"language\" : \"en\",\"user\" : \"nuclear\",\"unpatrolled\" : \"true\",\"newPage\" : \"true\",\"robot\": \"false\",\"anonymous\": \"false\",\"namespace\":\"article\",\"continent\":\"North America\",\"country\":\"United States\",\"region\":\"Bay Area\",\"city\":\"San Francisco\",\"added\":%d,\"deleted\":%d,\"delta\":%d}", new Object[]{forPattern.print(dateTime), Integer.valueOf(i2), 0, Integer.valueOf(i2)});
                    LOG.info("sending event: [%s]", new Object[]{format});
                    try {
                        kafkaProducer.send(new ProducerRecord(TOPIC_NAME, format)).get();
                        this.dtLast = dateTime;
                        dateTime = new DateTime(inferTzFromString);
                    } catch (Exception e) {
                        throw Throwables.propagate(e);
                    }
                }
                if (z) {
                    kafkaProducer.commitTransaction();
                }
                kafkaProducer.close();
                LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", new Object[]{Long.valueOf(WAIT_TIME_MILLIS)});
                try {
                    Thread.sleep(WAIT_TIME_MILLIS);
                    InputStream resourceAsStream = AbstractKafkaIndexerTest.class.getResourceAsStream(QUERIES_FILE);
                    if (null == resourceAsStream) {
                        throw new ISE("could not open query file: %s", new Object[]{QUERIES_FILE});
                    }
                    try {
                        String replace2 = StringUtils.replace(StringUtils.replace(StringUtils.replace(StringUtils.replace(StringUtils.replace(StringUtils.replace(StringUtils.replace(StringUtils.replace(StringUtils.replace(IOUtils.toString(resourceAsStream, "UTF-8"), "%%DATASOURCE%%", this.fullDatasourceName), "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", this.TIMESTAMP_FMT.print(this.dtFirst)), "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", this.TIMESTAMP_FMT.print(this.dtLast)), "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", this.TIMESTAMP_FMT.print(this.dtFirst)), "%%TIMESERIES_QUERY_START%%", this.INTERVAL_FMT.print(this.dtFirst)), "%%TIMESERIES_QUERY_END%%", this.INTERVAL_FMT.print(this.dtLast.plusMinutes(2))), "%%TIMESERIES_RESPONSE_TIMESTAMP%%", this.TIMESTAMP_FMT.print(this.dtFirst)), "%%TIMESERIES_ADDED%%", Integer.toString(i)), "%%TIMESERIES_NUMEVENTS%%", Integer.toString(i2));
                        try {
                            this.queryHelper.testQueriesFromString(replace2, 2);
                            LOG.info("Shutting down Kafka Supervisor", new Object[0]);
                            this.indexer.shutdownSupervisor(this.supervisorId);
                            LOG.info("Waiting for all kafka indexing tasks to finish", new Object[0]);
                            RetryUtil.retryUntilTrue(new Callable<Boolean>() { // from class: org.apache.druid.tests.indexer.AbstractKafkaIndexerTest.1
                                /* JADX WARN: Can't rename method to resolve collision */
                                @Override // java.util.concurrent.Callable
                                public Boolean call() {
                                    return Boolean.valueOf((AbstractKafkaIndexerTest.this.indexer.getPendingTasks().size() + AbstractKafkaIndexerTest.this.indexer.getRunningTasks().size()) + AbstractKafkaIndexerTest.this.indexer.getWaitingTasks().size() == 0);
                                }
                            }, "Waiting for Tasks Completion");
                            try {
                                RetryUtil.retryUntil(new Callable<Boolean>() { // from class: org.apache.druid.tests.indexer.AbstractKafkaIndexerTest.2
                                    /* JADX WARN: Can't rename method to resolve collision */
                                    @Override // java.util.concurrent.Callable
                                    public Boolean call() {
                                        return Boolean.valueOf(AbstractKafkaIndexerTest.this.coordinator.areSegmentsLoaded(AbstractKafkaIndexerTest.this.fullDatasourceName));
                                    }
                                }, true, 10000L, 30, "Real-time generated segments loaded");
                                LOG.info("segments are present", new Object[0]);
                                this.segmentsExist = true;
                                try {
                                    this.queryHelper.testQueriesFromString(replace2, 2);
                                } catch (Exception e2) {
                                    throw Throwables.propagate(e2);
                                }
                            } catch (Exception e3) {
                                throw Throwables.propagate(e3);
                            }
                        } catch (Exception e4) {
                            throw Throwables.propagate(e4);
                        }
                    } catch (IOException e5) {
                        throw new ISE(e5, "could not read query file: %s", new Object[]{QUERIES_FILE});
                    }
                } catch (InterruptedException e6) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e6);
                }
            } catch (Exception e7) {
                LOG.error("could not read file [%s]", new Object[]{INDEXER_FILE});
                throw new ISE(e7, "could not read file [%s]", new Object[]{INDEXER_FILE});
            }
        } catch (Exception e8) {
            throw new ISE(e8, "could not create kafka topic", new Object[0]);
        }
    }

    private void addFilteredProperties(IntegrationTestingConfig integrationTestingConfig, Properties properties) {
        for (Map.Entry entry : integrationTestingConfig.getProperties().entrySet()) {
            if (((String) entry.getKey()).startsWith(testPropertyPrefix)) {
                properties.put(((String) entry.getKey()).substring(testPropertyPrefix.length()), entry.getValue());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doTearDown() {
        if (this.config.manageKafkaTopic()) {
            AdminUtils.deleteTopic(this.zkUtils, TOPIC_NAME);
        }
        if (!this.segmentsExist || this.fullDatasourceName == null) {
            return;
        }
        unloadAndKillData(this.fullDatasourceName);
    }
}
