package org.apache.pinot.compat.tests;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Function;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.pinot.compat.tests.BaseOp;
import org.apache.pinot.controller.helix.ControllerRequestURLBuilder;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.integration.tests.ClusterTest;
import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.StringUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JsonIgnoreProperties(ignoreUnknown = true)
/* loaded from: input_file:org/apache/pinot/compat/tests/StreamOp.class */
public class StreamOp extends BaseOp {
    private Op _op;
    private String _streamConfigFileName;
    private int _numRows;
    private String _inputDataFileName;
    private String _tableConfigFileName;
    private String _recordReaderConfigFileName;
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamOp.class);
    private static final String TOPIC_NAME = "topicName";
    private static final String NUM_PARTITIONS = "numPartitions";
    private static final String PARTITION_COLUMN = "partitionColumn";
    private static final String EXCEPTIONS = "exceptions";
    private static final String NUM_SERVERS_QUERIED = "numServersQueried";
    private static final String NUM_SERVERS_RESPONEDED = "numServersResponded";
    private static final String TOTAL_DOCS = "totalDocs";
    private static final short KAFKA_REPLICATION_FACTOR = 1;

    /* renamed from: org.apache.pinot.compat.tests.StreamOp$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/pinot/compat/tests/StreamOp$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pinot$compat$tests$StreamOp$Op = new int[Op.values().length];

        static {
            try {
                $SwitchMap$org$apache$pinot$compat$tests$StreamOp$Op[Op.CREATE.ordinal()] = StreamOp.KAFKA_REPLICATION_FACTOR;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pinot$compat$tests$StreamOp$Op[Op.PRODUCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/pinot/compat/tests/StreamOp$Op.class */
    public enum Op {
        CREATE,
        PRODUCE
    }

    public StreamOp() {
        super(BaseOp.OpType.STREAM_OP);
    }

    public Op getOp() {
        return this._op;
    }

    public void setOp(Op op) {
        this._op = op;
    }

    public String getStreamConfigFileName() {
        return this._streamConfigFileName;
    }

    public void setStreamConfigFileName(String str) {
        this._streamConfigFileName = str;
    }

    public int getNumRows() {
        return this._numRows;
    }

    public void setNumRows(int i) {
        this._numRows = i;
    }

    public String getInputDataFileName() {
        return this._inputDataFileName;
    }

    public void setInputDataFileName(String str) {
        this._inputDataFileName = str;
    }

    public String getTableConfigFileName() {
        return this._tableConfigFileName;
    }

    public void setTableConfigFileName(String str) {
        this._tableConfigFileName = str;
    }

    public String getRecordReaderConfigFileName() {
        return this._recordReaderConfigFileName;
    }

    public void setRecordReaderConfigFileName(String str) {
        this._recordReaderConfigFileName = str;
    }

    @Override // org.apache.pinot.compat.tests.BaseOp
    boolean runOp() {
        switch (AnonymousClass2.$SwitchMap$org$apache$pinot$compat$tests$StreamOp$Op[this._op.ordinal()]) {
            case KAFKA_REPLICATION_FACTOR /* 1 */:
                return createKafkaTopic();
            case 2:
                return produceData();
            default:
                return true;
        }
    }

    private boolean createKafkaTopic() {
        try {
            Properties properties = (Properties) JsonUtils.fileToObject(new File(this._streamConfigFileName), Properties.class);
            String property = properties.getProperty(TOPIC_NAME);
            int parseInt = Integer.parseInt(properties.getProperty(NUM_PARTITIONS));
            HashMap hashMap = new HashMap();
            hashMap.put("bootstrap.servers", "localhost:19092");
            hashMap.put("client.id", "Kafka2AdminClient-" + UUID.randomUUID().toString());
            hashMap.put("request.timeout.ms", 15000);
            KafkaAdminClient.create(hashMap).createTopics(Collections.singletonList(new NewTopic(property, parseInt, (short) 1))).all().get();
            return true;
        } catch (Exception e) {
            LOGGER.error("Failed to create Kafka topic with stream config file: {}", this._streamConfigFileName, e);
            return false;
        }
    }

    private boolean produceData() {
        try {
            Properties properties = (Properties) JsonUtils.fileToObject(new File(this._streamConfigFileName), Properties.class);
            String property = properties.getProperty(TOPIC_NAME);
            String property2 = properties.getProperty(PARTITION_COLUMN);
            TableConfig tableConfig = (TableConfig) JsonUtils.fileToObject(new File(this._tableConfigFileName), TableConfig.class);
            String tableName = tableConfig.getTableName();
            long fetchExistingTotalDocs = fetchExistingTotalDocs(tableName);
            Properties properties2 = new Properties();
            properties2.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
            properties2.put("serializer.class", "kafka.serializer.DefaultEncoder");
            properties2.put("request.required.acks", "1");
            StreamDataProducer streamDataProducer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties2);
            File file = new File(this._inputDataFileName);
            CSVRecordReaderConfig cSVRecordReaderConfig = (CSVRecordReaderConfig) JsonUtils.fileToObject(new File(this._recordReaderConfigFileName), CSVRecordReaderConfig.class);
            HashSet hashSet = new HashSet();
            Collections.addAll(hashSet, cSVRecordReaderConfig.getHeader().split(Character.toString(cSVRecordReaderConfig.getDelimiter())));
            String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
            DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(((Schema) JsonUtils.stringToObject(ControllerTest.sendGetRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forSchemaGet(TableNameBuilder.extractRawTableName(tableName))), Schema.class)).getSpecForTimeColumn(timeColumnName).getFormat());
            RecordReader recordReader = RecordReaderFactory.getRecordReader(FileFormat.CSV, file, hashSet, cSVRecordReaderConfig);
            Throwable th = null;
            for (int i = 0; i < this._numRows; i += KAFKA_REPLICATION_FACTOR) {
                try {
                    try {
                        if (!recordReader.hasNext()) {
                            recordReader.rewind();
                        }
                        GenericRow next = recordReader.next();
                        next.putValue(timeColumnName, dateTimeFormatSpec.fromMillisToFormat(System.currentTimeMillis()));
                        JsonNode stringToJsonNode = JsonUtils.stringToJsonNode(next.toString());
                        ObjectNode newObjectNode = JsonUtils.newObjectNode();
                        for (String str : next.getFieldToValueMap().keySet()) {
                            newObjectNode.set(str, stringToJsonNode.get("fieldToValueMap").get(str));
                        }
                        if (property2 == null) {
                            streamDataProducer.produce(property, StringUtils.encodeUtf8(newObjectNode.toString()));
                        } else {
                            streamDataProducer.produce(property, StringUtils.encodeUtf8(property2), StringUtils.encodeUtf8(newObjectNode.toString()));
                        }
                    } finally {
                    }
                } finally {
                }
            }
            if (recordReader != null) {
                if (0 != 0) {
                    try {
                        recordReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    recordReader.close();
                }
            }
            waitForDocsLoaded(tableName, fetchExistingTotalDocs + this._numRows, 60000L);
            LOGGER.info("Verified {} new rows in table: {}", Integer.valueOf(this._numRows), tableName);
            return true;
        } catch (Exception e) {
            LOGGER.error("Failed to ingest stream data", e);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long fetchExistingTotalDocs(String str) throws Exception {
        String str2 = "SELECT count(*) FROM " + str;
        JsonNode postQuery = ClusterTest.postQuery(str2, ClusterDescriptor.BROKER_URL, false, "sql");
        if (postQuery == null) {
            String format = String.format("Failed to query Table: %s", str);
            LOGGER.error(format);
            throw new RuntimeException(format);
        }
        if (postQuery.has(EXCEPTIONS) && postQuery.get(EXCEPTIONS).size() > 0) {
            String format2 = String.format("Failed when running query: %s; the response contains exceptions", str2);
            LOGGER.error(format2);
            throw new RuntimeException(format2);
        }
        if (postQuery.has(NUM_SERVERS_QUERIED) && postQuery.has(NUM_SERVERS_RESPONEDED) && postQuery.get(NUM_SERVERS_QUERIED).asInt() > postQuery.get(NUM_SERVERS_RESPONEDED).asInt()) {
            String format3 = String.format("Failed when running query: %s; the response contains partial results", str2);
            LOGGER.error(format3);
            throw new RuntimeException(format3);
        }
        if (postQuery.has(TOTAL_DOCS)) {
            return postQuery.get(TOTAL_DOCS).asLong();
        }
        String format4 = String.format("Failed when running query: %s; the response contains no docs", str2);
        LOGGER.error(format4);
        throw new RuntimeException(format4);
    }

    private void waitForDocsLoaded(final String str, final long j, long j2) {
        LOGGER.info("Wait Doc to load ...");
        TestUtils.waitForCondition(new Function<Void, Boolean>() { // from class: org.apache.pinot.compat.tests.StreamOp.1
            @Nullable
            public Boolean apply(@Nullable Void r6) {
                try {
                    return Boolean.valueOf(StreamOp.this.fetchExistingTotalDocs(str) == j);
                } catch (Exception e) {
                    return null;
                }
            }
        }, 100L, j2, "Failed to load " + j + " documents", true);
    }
}
