package org.apache.pinot.integration.tests;

import cloud.localstack.Localstack;
import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor;
import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
import cloud.localstack.docker.annotation.LocalstackDockerProperties;
import cloud.localstack.docker.command.Command;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.google.common.base.Function;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.activation.UnsupportedDataTypeException;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.client.ResultSet;
import org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.StringUtil;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.utils.AttributeMap;

@LocalstackDockerProperties(services = {RealtimeKinesisIntegrationTest.STREAM_TYPE}, imageTag = "0.12.15")
/* loaded from: input_file:org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.class */
public class RealtimeKinesisIntegrationTest extends BaseClusterIntegrationTestSet {
    private static final String STREAM_NAME = "kinesis-test";
    private static final String STREAM_TYPE = "kinesis";
    public static final int MAX_RECORDS_TO_FETCH = Integer.MAX_VALUE;
    public static final String REGION = "us-east-1";
    public static final String LOCALSTACK_KINESIS_ENDPOINT = "http://localhost:4566";
    public static final int NUM_SHARDS = 10;
    public static final String SCHEMA_FILE_PATH = "kinesis/airlineStats_data_reduced.schema";
    public static final String DATA_FILE_PATH = "kinesis/airlineStats_data_reduced.json";
    private final Localstack _localstackDocker = Localstack.INSTANCE;
    private long _totalRecordsPushedInStream = 0;
    List<String> _h2FieldNameAndTypes = new ArrayList();
    private boolean _skipTestNoDockerInstalled = false;
    private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeKinesisIntegrationTest.class);
    private static final LocalstackDockerAnnotationProcessor PROCESSOR = new LocalstackDockerAnnotationProcessor();
    private static KinesisClient _kinesisClient = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pinot.integration.tests.RealtimeKinesisIntegrationTest$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$com$fasterxml$jackson$databind$node$JsonNodeType = new int[JsonNodeType.values().length];

        static {
            try {
                $SwitchMap$com$fasterxml$jackson$databind$node$JsonNodeType[JsonNodeType.NUMBER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$fasterxml$jackson$databind$node$JsonNodeType[JsonNodeType.STRING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$fasterxml$jackson$databind$node$JsonNodeType[JsonNodeType.BOOLEAN.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest$DockerInfoCommand.class */
    public static class DockerInfoCommand extends Command {
        public void execute() {
            if (this.dockerExe.execute(Collections.singletonList("info")).toLowerCase().contains("error")) {
                throw new IllegalStateException("Docker daemon is not running!");
            }
        }
    }

    /* loaded from: input_file:org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest$StopAllLocalstackDockerCommand.class */
    public static class StopAllLocalstackDockerCommand extends Command {
        public void execute() {
            String execute = this.dockerExe.execute(Arrays.asList("ps", "-a", "-q", "-f", "ancestor=localstack/localstack"));
            if (!StringUtils.isNotBlank(execute) || execute.toLowerCase().contains("error")) {
                return;
            }
            for (String str : execute.split("\n")) {
                this.dockerExe.execute(Arrays.asList("stop", str));
            }
        }
    }

    @BeforeClass(enabled = false)
    public void setUp() throws Exception {
        try {
            new DockerInfoCommand().execute();
            TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir});
            startZk();
            startController();
            startBroker();
            startServer();
            startKinesis();
            addSchema(createKinesisSchema());
            addTableConfig(createKinesisTableConfig());
            createH2ConnectionAndTable();
            publishRecordsToKinesis();
            waitForAllDocsLoadedKinesis(120000L);
        } catch (IllegalStateException e) {
            this._skipTestNoDockerInstalled = true;
            LOGGER.warn("Skipping test! Docker is not found running", e);
            throw new SkipException(e.getMessage());
        }
    }

    public Schema createKinesisSchema() throws Exception {
        URL resource = BaseClusterIntegrationTest.class.getClassLoader().getResource(SCHEMA_FILE_PATH);
        Assert.assertNotNull(resource);
        return Schema.fromFile(new File(resource.getFile()));
    }

    protected void waitForAllDocsLoadedKinesis(long j) throws Exception {
        waitForAllDocsLoadedKinesis(j, true);
    }

    protected void waitForAllDocsLoadedKinesis(long j, boolean z) {
        TestUtils.waitForCondition(new Function<Void, Boolean>() { // from class: org.apache.pinot.integration.tests.RealtimeKinesisIntegrationTest.1
            @Nullable
            public Boolean apply(@Nullable Void r6) {
                try {
                    return Boolean.valueOf(RealtimeKinesisIntegrationTest.this.getCurrentCountStarResult() >= RealtimeKinesisIntegrationTest.this._totalRecordsPushedInStream);
                } catch (Exception e) {
                    RealtimeKinesisIntegrationTest.LOGGER.warn("Could not fetch current number of rows in pinot table " + RealtimeKinesisIntegrationTest.this.getTableName(), e);
                    return null;
                }
            }
        }, 1000L, j, "Failed to load " + this._totalRecordsPushedInStream + " documents", z);
    }

    public TableConfig createKinesisTableConfig() {
        return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getTableName()).setTimeColumnName("DaysSinceEpoch").setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setLLC(true).setStreamConfigs(createKinesisStreamConfig()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
    }

    public Map<String, String> createKinesisStreamConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", STREAM_TYPE);
        hashMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, "topic.name"), STREAM_NAME);
        hashMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, "fetch.timeout.millis"), "30000");
        hashMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, "consumer.type"), StreamConfig.ConsumerType.LOWLEVEL.toString());
        hashMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, "consumer.factory.class.name"), KinesisConsumerFactory.class.getName());
        hashMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, "decoder.class.name"), "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
        hashMap.put("region", REGION);
        hashMap.put("maxRecordsToFetch", String.valueOf(MAX_RECORDS_TO_FETCH));
        hashMap.put("shardIteratorType", ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString());
        hashMap.put("endpoint", LOCALSTACK_KINESIS_ENDPOINT);
        hashMap.put("accessKey", getLocalAWSCredentials().resolveCredentials().accessKeyId());
        hashMap.put("secretKey", getLocalAWSCredentials().resolveCredentials().secretAccessKey());
        hashMap.put("realtime.segment.flush.threshold.rows", Integer.toString(200));
        hashMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, "consumer.prop.auto.offset.reset"), "smallest");
        return hashMap;
    }

    public void startKinesis() throws Exception {
        LocalstackDockerConfiguration process = PROCESSOR.process(getClass());
        new StopAllLocalstackDockerCommand().execute();
        this._localstackDocker.startup(process);
        _kinesisClient = (KinesisClient) KinesisClient.builder().httpClient(new ApacheSdkHttpService().createHttpClientBuilder().buildWithDefaults(AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.TRUE).build())).credentialsProvider(getLocalAWSCredentials()).region(Region.of(REGION)).endpointOverride(new URI(LOCALSTACK_KINESIS_ENDPOINT)).build();
        _kinesisClient.createStream((CreateStreamRequest) CreateStreamRequest.builder().streamName(STREAM_NAME).shardCount(10).build());
        TestUtils.waitForCondition(new Function<Void, Boolean>() { // from class: org.apache.pinot.integration.tests.RealtimeKinesisIntegrationTest.2
            @Nullable
            public Boolean apply(@Nullable Void r5) {
                try {
                    return Boolean.valueOf(RealtimeKinesisIntegrationTest._kinesisClient.describeStream((DescribeStreamRequest) DescribeStreamRequest.builder().streamName(RealtimeKinesisIntegrationTest.STREAM_NAME).build()).streamDescription().streamStatusAsString().contentEquals("ACTIVE"));
                } catch (Exception e) {
                    RealtimeKinesisIntegrationTest.LOGGER.warn("Could not fetch kinesis stream status", e);
                    return null;
                }
            }
        }, 1000L, 30000L, "Kinesis stream kinesis-test is not created or is not in active state", true);
    }

    public void stopKinesis() {
        if (this._localstackDocker.isRunning()) {
            this._localstackDocker.stop();
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:21:0x011e. Please report as an issue. */
    private void publishRecordsToKinesis() {
        try {
            StringBuilder sb = new StringBuilder("?");
            for (int i = 0; i < this._h2FieldNameAndTypes.size() - 1; i++) {
                sb.append(",?");
            }
            PreparedStatement prepareStatement = this._h2Connection.prepareStatement("INSERT INTO " + getTableName() + " VALUES (" + sb.toString() + ")");
            InputStream resourceAsStream = RealtimeKinesisIntegrationTest.class.getClassLoader().getResourceAsStream(DATA_FILE_PATH);
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resourceAsStream, StandardCharsets.UTF_8));
            while (true) {
                try {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        bufferedReader.close();
                        resourceAsStream.close();
                        return;
                    }
                    JsonNode stringToJsonNode = JsonUtils.stringToJsonNode(readLine);
                    PutRecordResponse putRecord = _kinesisClient.putRecord((PutRecordRequest) PutRecordRequest.builder().streamName(STREAM_NAME).data(SdkBytes.fromUtf8String(readLine)).partitionKey(stringToJsonNode.get("Origin").textValue()).build());
                    if (putRecord.sdkHttpResponse().statusCode() == 200 && StringUtils.isNotBlank(putRecord.sequenceNumber()) && StringUtils.isNotBlank(putRecord.shardId())) {
                        this._totalRecordsPushedInStream++;
                        int i2 = 1;
                        Iterator<String> it = this._h2FieldNameAndTypes.iterator();
                        while (it.hasNext()) {
                            String[] split = it.next().split(" ");
                            String str = split[0];
                            String str2 = split[1];
                            boolean z = -1;
                            switch (str2.hashCode()) {
                                case 104431:
                                    if (str2.equals("int")) {
                                        z = false;
                                        break;
                                    }
                                    break;
                                case 423566365:
                                    if (str2.equals("varchar(128)")) {
                                        z = true;
                                        break;
                                    }
                                    break;
                            }
                            switch (z) {
                                case false:
                                    int i3 = i2;
                                    i2++;
                                    prepareStatement.setObject(i3, Integer.valueOf(stringToJsonNode.get(str).intValue()));
                                    break;
                                case true:
                                    int i4 = i2;
                                    i2++;
                                    prepareStatement.setObject(i4, stringToJsonNode.get(str).textValue());
                                    break;
                            }
                        }
                        prepareStatement.execute();
                    }
                } finally {
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("Could not publish records to Kinesis Stream", e);
        }
    }

    private static AwsCredentialsProvider getLocalAWSCredentials() {
        return StaticCredentialsProvider.create(AwsBasicCredentials.create("access", "secret"));
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:12:0x00ec. Please report as an issue. */
    @Test(enabled = false)
    public void testRecords() throws Exception {
        Assert.assertNotEquals(Long.valueOf(this._totalRecordsPushedInStream), 0);
        ResultSet resultSet = getPinotConnection().execute("SELECT * FROM " + getTableName() + " ORDER BY Origin LIMIT 10000").getResultSet(0);
        Assert.assertNotEquals(Integer.valueOf(resultSet.getRowCount()), 0);
        Statement createStatement = this._h2Connection.createStatement(1003, 1007);
        createStatement.execute("SELECT * FROM " + getTableName() + " ORDER BY Origin");
        java.sql.ResultSet resultSet2 = createStatement.getResultSet();
        Assert.assertFalse(resultSet2.isLast());
        resultSet2.beforeFirst();
        int i = 0;
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < this._h2FieldNameAndTypes.size(); i2++) {
            hashMap.put(resultSet.getColumnName(i2), Integer.valueOf(i2));
        }
        while (resultSet2.next()) {
            Iterator<String> it = this._h2FieldNameAndTypes.iterator();
            while (it.hasNext()) {
                String[] split = it.next().split(" ");
                String str = split[0];
                String str2 = split[1];
                boolean z = -1;
                switch (str2.hashCode()) {
                    case 104431:
                        if (str2.equals("int")) {
                            z = false;
                            break;
                        }
                        break;
                    case 423566365:
                        if (str2.equals("varchar(128)")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        Assert.assertEquals(resultSet2.getInt(str), resultSet.getInt(i, ((Integer) hashMap.get(str)).intValue()));
                        break;
                    case true:
                        Assert.assertEquals(resultSet2.getString(str), resultSet.getString(i, ((Integer) hashMap.get(str)).intValue()));
                        break;
                }
            }
            i++;
            if (i >= resultSet.getRowCount()) {
                int i3 = 0;
                while (resultSet2.next()) {
                    i3++;
                }
                Assert.assertEquals(i3, 0);
                return;
            }
        }
    }

    @Test(enabled = false)
    public void testCountRecords() {
        Assert.assertEquals(getPinotConnection().execute("SELECT COUNT(*) FROM " + getTableName()).getResultSet(0).getLong(0), this._totalRecordsPushedInStream);
    }

    public void createH2ConnectionAndTable() throws Exception {
        String str;
        Assert.assertNull(this._h2Connection);
        Class.forName("org.h2.Driver");
        this._h2Connection = DriverManager.getConnection("jdbc:h2:mem:");
        this._h2Connection.prepareCall("DROP TABLE IF EXISTS " + getTableName()).execute();
        this._h2FieldNameAndTypes = new ArrayList();
        InputStream resourceAsStream = RealtimeKinesisIntegrationTest.class.getClassLoader().getResourceAsStream(DATA_FILE_PATH);
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resourceAsStream, StandardCharsets.UTF_8));
            try {
                String readLine = bufferedReader.readLine();
                if (readLine != null) {
                }
                bufferedReader.close();
                if (StringUtils.isNotBlank(readLine)) {
                    Iterator fields = JsonUtils.stringToJsonNode(readLine).fields();
                    while (fields.hasNext()) {
                        Map.Entry entry = (Map.Entry) fields.next();
                        String str2 = (String) entry.getKey();
                        JsonNodeType nodeType = ((JsonNode) entry.getValue()).getNodeType();
                        switch (AnonymousClass3.$SwitchMap$com$fasterxml$jackson$databind$node$JsonNodeType[nodeType.ordinal()]) {
                            case 1:
                                str = "int";
                                break;
                            case SimpleMinionClusterIntegrationTest.NUM_TASKS /* 2 */:
                                str = "varchar(128)";
                                break;
                            case 3:
                                str = "boolean";
                                break;
                            default:
                                throw new UnsupportedDataTypeException("Kinesis Integration test doesn't support datatype: " + nodeType.name());
                        }
                        this._h2FieldNameAndTypes.add(str2 + " " + str);
                    }
                }
                this._h2Connection.prepareCall("CREATE TABLE " + getTableName() + "(" + StringUtil.join(",", (String[]) this._h2FieldNameAndTypes.toArray(new String[this._h2FieldNameAndTypes.size()])) + ")").execute();
            } finally {
            }
        } finally {
            resourceAsStream.close();
        }
    }

    @AfterClass(enabled = false)
    public void tearDown() throws Exception {
        if (this._skipTestNoDockerInstalled) {
            return;
        }
        dropRealtimeTable(getTableName());
        stopServer();
        stopBroker();
        stopController();
        stopZk();
        stopKinesis();
        FileUtils.deleteDirectory(this._tempDir);
    }
}
