/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.snowflake.test;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
import org.apache.beam.sdk.io.snowflake.enums.StreamingLogLevel;
import org.apache.beam.sdk.io.snowflake.test.TestUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

public class StreamingSnowflakeIOIT {
    private static final int TIMEOUT = 900000;
    private static final int INTERVAL = 30000;
    private static final String TABLE = "STREAMING_IOIT";
    private static final List<TestRow> testRows = Lists.newArrayList();
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private static TestUtils.SnowflakeIOITPipelineOptions options;
    private static SnowflakeIO.DataSourceConfiguration dc;
    private static String stagingBucketName;
    private static String storageIntegrationName;

    @BeforeClass
    public static void setupAll() throws SQLException {
        PipelineOptionsFactory.register(TestUtils.SnowflakeIOITPipelineOptions.class);
        options = (TestUtils.SnowflakeIOITPipelineOptions)TestPipeline.testingPipelineOptions().as(TestUtils.SnowflakeIOITPipelineOptions.class);
        dc = SnowflakeIO.DataSourceConfiguration.create().withKeyPairPathAuth(options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase()).withServerName(options.getServerName()).withDatabase(options.getDatabase()).withRole(options.getRole()).withWarehouse(options.getWarehouse()).withSchema(options.getSchema());
        stagingBucketName = options.getStagingBucketName();
        storageIntegrationName = options.getStorageIntegrationName();
        for (int i = 0; i < options.getNumberOfRecords(); ++i) {
            testRows.add(TestRow.create((Integer)i, (String)String.format("TestRow%s:%s", i, UUID.randomUUID())));
        }
        TestUtils.runConnectionWithStatement(dc.buildDatasource(), String.format("CREATE OR REPLACE TABLE %s(id INTEGER, name STRING)", TABLE));
    }

    @AfterClass
    public static void cleanUp() throws Exception {
        String bucketNameAndPath = stagingBucketName.replaceAll(".+//", "");
        String[] parts = bucketNameAndPath.split("/", -1);
        String bucketName = parts[0];
        String directory = null;
        if (parts.length > 1) {
            directory = bucketNameAndPath.replace(bucketName + "/", "");
        }
        TestUtils.clearStagingBucket(bucketName, directory);
        TestUtils.runConnectionWithStatement(dc.buildDatasource(), String.format("DROP TABLE %s", TABLE));
    }

    @Test
    public void writeStreamThenRead() throws SQLException, InterruptedException {
        this.writeStreamToSnowflake();
        this.readStreamFromSnowflakeAndVerify();
    }

    private void writeStreamToSnowflake() {
        TestStream stringsStream = TestStream.create((Coder)SerializableCoder.of(TestRow.class)).advanceWatermarkTo(Instant.now()).addElements((Object)testRows.get(0), (Object[])testRows.subList(1, testRows.size()).toArray(new TestRow[0])).advanceWatermarkToInfinity();
        ((PCollection)this.pipeline.apply((PTransform)stringsStream)).apply("Write SnowflakeIO", (PTransform)SnowflakeIO.write().withDataSourceConfiguration(dc).withUserDataMapper(TestUtils.getTestRowDataMapper()).withSnowPipe(options.getSnowPipe()).withStorageIntegrationName(storageIntegrationName).withStagingBucketName(stagingBucketName).withFlushTimeLimit(Duration.millis((long)18000L)).withFlushRowLimit(Integer.valueOf(50000)).withDebugMode(StreamingLogLevel.ERROR));
        PipelineResult pipelineResult = this.pipeline.run((PipelineOptions)options);
        pipelineResult.waitUntilFinish();
    }

    private void readStreamFromSnowflakeAndVerify() throws SQLException, InterruptedException {
        for (int timeout = 900000; timeout > 0; timeout -= 30000) {
            Set<TestRow> fetchedRows = this.readDataFromStream();
            if (fetchedRows.size() >= testRows.size()) {
                MatcherAssert.assertThat(fetchedRows, (Matcher)Matchers.containsInAnyOrder((Object[])testRows.toArray(new TestRow[0])));
                return;
            }
            Thread.sleep(30000L);
        }
        throw new RuntimeException("Could not read data from table");
    }

    private Set<TestRow> readDataFromStream() throws SQLException {
        Connection connection = dc.buildDatasource().getConnection();
        PreparedStatement statement = connection.prepareStatement(String.format("SELECT * FROM %s", TABLE));
        ResultSet resultSet = statement.executeQuery();
        Set<TestRow> testRows = this.resultSetToJavaSet(resultSet);
        resultSet.close();
        statement.close();
        connection.close();
        return testRows;
    }

    private Set<TestRow> resultSetToJavaSet(ResultSet resultSet) throws SQLException {
        HashSet testRows = Sets.newHashSet();
        while (resultSet.next()) {
            testRows.add(TestRow.create((Integer)resultSet.getInt(1), (String)resultSet.getString(2).replace("'", "")));
        }
        return testRows;
    }
}

