package org.apache.beam.sdk.io.snowflake.test;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
import org.apache.beam.sdk.io.snowflake.enums.StreamingLogLevel;
import org.apache.beam.sdk.io.snowflake.test.TestUtils;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/io/snowflake/test/StreamingSnowflakeIOIT.class */
public class StreamingSnowflakeIOIT {
    private static final int TIMEOUT = 900000;
    private static final int INTERVAL = 30000;
    private static final String TABLE = "STREAMING_IOIT";
    private static final List<TestRow> testRows = Lists.newArrayList();

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private static TestUtils.SnowflakeIOITPipelineOptions options;
    private static SnowflakeIO.DataSourceConfiguration dc;
    private static String stagingBucketName;
    private static String storageIntegrationName;

    @BeforeClass
    public static void setupAll() throws SQLException {
        PipelineOptionsFactory.register(TestUtils.SnowflakeIOITPipelineOptions.class);
        options = (TestUtils.SnowflakeIOITPipelineOptions) TestPipeline.testingPipelineOptions().as(TestUtils.SnowflakeIOITPipelineOptions.class);
        dc = SnowflakeIO.DataSourceConfiguration.create().withKeyPairPathAuth(options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase()).withServerName(options.getServerName()).withDatabase(options.getDatabase()).withRole(options.getRole()).withWarehouse(options.getWarehouse()).withSchema(options.getSchema());
        stagingBucketName = options.getStagingBucketName();
        storageIntegrationName = options.getStorageIntegrationName();
        for (int i = 0; i < options.getNumberOfRecords().intValue(); i++) {
            testRows.add(TestRow.create(Integer.valueOf(i), String.format("TestRow%s:%s", Integer.valueOf(i), UUID.randomUUID())));
        }
        TestUtils.runConnectionWithStatement(dc.buildDatasource(), String.format("CREATE OR REPLACE TABLE %s(id INTEGER, name STRING)", TABLE));
    }

    @AfterClass
    public static void cleanUp() throws Exception {
        String replaceAll = stagingBucketName.replaceAll(".+//", "");
        String[] split = replaceAll.split("/", -1);
        String str = split[0];
        String str2 = null;
        if (split.length > 1) {
            str2 = replaceAll.replace(str + "/", "");
        }
        TestUtils.clearStagingBucket(str, str2);
        TestUtils.runConnectionWithStatement(dc.buildDatasource(), String.format("DROP TABLE %s", TABLE));
    }

    @Test
    public void writeStreamThenRead() throws SQLException, InterruptedException {
        writeStreamToSnowflake();
        readStreamFromSnowflakeAndVerify();
    }

    private void writeStreamToSnowflake() {
        this.pipeline.apply(TestStream.create(SerializableCoder.of(TestRow.class)).advanceWatermarkTo(Instant.now()).addElements(testRows.get(0), (TestRow[]) testRows.subList(1, testRows.size()).toArray(new TestRow[0])).advanceWatermarkToInfinity()).apply("Write SnowflakeIO", SnowflakeIO.write().withDataSourceConfiguration(dc).withUserDataMapper(TestUtils.getTestRowDataMapper()).withSnowPipe(options.getSnowPipe()).withStorageIntegrationName(storageIntegrationName).withStagingBucketName(stagingBucketName).withFlushTimeLimit(Duration.millis(18000L)).withFlushRowLimit(50000).withDebugMode(StreamingLogLevel.ERROR));
        this.pipeline.run(options).waitUntilFinish();
    }

    private void readStreamFromSnowflakeAndVerify() throws SQLException, InterruptedException {
        for (int i = TIMEOUT; i > 0; i -= 30000) {
            Set<TestRow> readDataFromStream = readDataFromStream();
            if (readDataFromStream.size() >= testRows.size()) {
                MatcherAssert.assertThat(readDataFromStream, Matchers.containsInAnyOrder((TestRow[]) testRows.toArray(new TestRow[0])));
                return;
            }
            Thread.sleep(30000L);
        }
        throw new RuntimeException("Could not read data from table");
    }

    private Set<TestRow> readDataFromStream() throws SQLException {
        Connection connection = dc.buildDatasource().getConnection();
        PreparedStatement prepareStatement = connection.prepareStatement(String.format("SELECT * FROM %s", TABLE));
        ResultSet executeQuery = prepareStatement.executeQuery();
        Set<TestRow> resultSetToJavaSet = resultSetToJavaSet(executeQuery);
        executeQuery.close();
        prepareStatement.close();
        connection.close();
        return resultSetToJavaSet;
    }

    private Set<TestRow> resultSetToJavaSet(ResultSet resultSet) throws SQLException {
        HashSet newHashSet = Sets.newHashSet();
        while (resultSet.next()) {
            newHashSet.add(TestRow.create(Integer.valueOf(resultSet.getInt(1)), resultSet.getString(2).replace("'", "")));
        }
        return newHashSet;
    }
}
