/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.zetasketch;

import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.zetasketch.HllCount;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.io.gcp.testing.BigqueryMatcher;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class BigQueryHllSketchCompatibilityIT {
    private static final String APP_NAME;
    private static final String PROJECT_ID;
    private static final String DATASET_ID;
    private static final BigqueryClient BIGQUERY_CLIENT;
    private static final List<String> TEST_DATA;
    private static final String DATA_FIELD_NAME = "data";
    private static final String DATA_FIELD_TYPE = "STRING";
    private static final String QUERY_RESULT_FIELD_NAME = "sketch";
    private static final String DATA_TABLE_ID_NON_EMPTY = "hll_data_non_empty";
    private static final Long EXPECTED_COUNT_NON_EMPTY;
    private static final String DATA_TABLE_ID_EMPTY = "hll_data_empty";
    private static final Long EXPECTED_COUNT_EMPTY;
    private static final String SKETCH_FIELD_NAME = "sketch";
    private static final String SKETCH_FIELD_TYPE = "BYTES";
    private static final String SKETCH_TABLE_ID = "hll_sketch";
    private static final String EXPECTED_CHECKSUM_NON_EMPTY = "f1e31df9806ce94c5bdbbfff9608324930f4d3f1";
    private static final String EXPECTED_CHECKSUM_EMPTY = "1184f5b8d4b6dd08709cf1513f26744167065e0d";

    @BeforeClass
    public static void prepareDatasetAndDataTables() throws Exception {
        BIGQUERY_CLIENT.createNewDataset(PROJECT_ID, DATASET_ID);
        TableSchema dataTableSchema = new TableSchema().setFields(Collections.singletonList(new TableFieldSchema().setName(DATA_FIELD_NAME).setType(DATA_FIELD_TYPE)));
        Table dataTableNonEmpty = new Table().setSchema(dataTableSchema).setTableReference(new TableReference().setProjectId(PROJECT_ID).setDatasetId(DATASET_ID).setTableId(DATA_TABLE_ID_NON_EMPTY));
        BIGQUERY_CLIENT.createNewTable(PROJECT_ID, DATASET_ID, dataTableNonEmpty);
        List rows = TEST_DATA.stream().map(v -> Collections.singletonMap(DATA_FIELD_NAME, v)).collect(Collectors.toList());
        BIGQUERY_CLIENT.insertDataToTable(PROJECT_ID, DATASET_ID, DATA_TABLE_ID_NON_EMPTY, rows);
        Table dataTableEmpty = new Table().setSchema(dataTableSchema).setTableReference(new TableReference().setProjectId(PROJECT_ID).setDatasetId(DATASET_ID).setTableId(DATA_TABLE_ID_EMPTY));
        BIGQUERY_CLIENT.createNewTable(PROJECT_ID, DATASET_ID, dataTableEmpty);
    }

    @AfterClass
    public static void deleteDataset() throws Exception {
        BIGQUERY_CLIENT.deleteDataset(PROJECT_ID, DATASET_ID);
    }

    @Test
    public void testReadNonEmptySketchFromBigQuery() {
        this.readSketchFromBigQuery(DATA_TABLE_ID_NON_EMPTY, EXPECTED_COUNT_NON_EMPTY);
    }

    @Test
    public void testReadEmptySketchFromBigQuery() {
        this.readSketchFromBigQuery(DATA_TABLE_ID_EMPTY, EXPECTED_COUNT_EMPTY);
    }

    private void readSketchFromBigQuery(String tableId, Long expectedCount) {
        String tableSpec = String.format("%s.%s", DATASET_ID, tableId);
        String query = String.format("SELECT HLL_COUNT.INIT(%s) AS %s FROM %s", DATA_FIELD_NAME, "sketch", tableSpec);
        SerializableFunction & Serializable parseQueryResultToByteArray = (SerializableFunction & Serializable)input -> HllCount.getSketchFromByteBuffer((ByteBuffer)((ByteBuffer)input.getRecord().get("sketch")));
        TestPipelineOptions options = (TestPipelineOptions)TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
        Pipeline p = Pipeline.create((PipelineOptions)options);
        PCollection result = (PCollection)((PCollection)((PCollection)p.apply((PTransform)BigQueryIO.read((SerializableFunction)parseQueryResultToByteArray).withFormat(DataFormat.AVRO).fromQuery(query).usingStandardSql().withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).withCoder((Coder)ByteArrayCoder.of()))).apply((PTransform)HllCount.MergePartial.globally())).apply(HllCount.Extract.globally());
        PAssert.thatSingleton((PCollection)result).isEqualTo((Object)expectedCount);
        p.run().waitUntilFinish();
    }

    @Test
    public void testWriteNonEmptySketchToBigQuery() {
        this.writeSketchToBigQuery(TEST_DATA, EXPECTED_CHECKSUM_NON_EMPTY);
    }

    @Test
    public void testWriteEmptySketchToBigQuery() {
        this.writeSketchToBigQuery(Collections.emptyList(), EXPECTED_CHECKSUM_EMPTY);
    }

    private void writeSketchToBigQuery(List<String> testData, String expectedChecksum) {
        String tableSpec = String.format("%s.%s", DATASET_ID, SKETCH_TABLE_ID);
        String query = String.format("SELECT HLL_COUNT.EXTRACT(%s) FROM %s", "sketch", tableSpec);
        TableSchema tableSchema = new TableSchema().setFields(Collections.singletonList(new TableFieldSchema().setName("sketch").setType(SKETCH_FIELD_TYPE)));
        TestPipelineOptions options = (TestPipelineOptions)TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
        Pipeline p = Pipeline.create((PipelineOptions)options);
        SerializableFunction & Serializable formatFn = (SerializableFunction & Serializable)sketch -> new TableRow().set("sketch", (Object)(((byte[])sketch).length == 0 ? null : sketch));
        ((PCollection)((PCollection)p.apply((PTransform)Create.of(testData).withType(TypeDescriptor.of(String.class)))).apply((PTransform)HllCount.Init.forStrings().globally())).apply((PTransform)BigQueryIO.write().to(tableSpec).withSchema(tableSchema).withFormatFunction((SerializableFunction)formatFn).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
        p.run().waitUntilFinish();
        MatcherAssert.assertThat((Object)BigqueryMatcher.createQueryUsingStandardSql((String)APP_NAME, (String)PROJECT_ID, (String)query), (Matcher)BigqueryMatcher.queryResultHasChecksum((String)expectedChecksum));
    }

    static {
        TEST_DATA = Arrays.asList("Apple", "Orange", "Banana", "Orange");
        EXPECTED_COUNT_NON_EMPTY = 3L;
        EXPECTED_COUNT_EMPTY = 0L;
        ApplicationNameOptions options = (ApplicationNameOptions)TestPipeline.testingPipelineOptions().as(ApplicationNameOptions.class);
        APP_NAME = options.getAppName();
        PROJECT_ID = ((GcpOptions)options.as(GcpOptions.class)).getProject();
        DATASET_ID = String.format("zetasketch_%tY_%<tm_%<td_%<tH_%<tM_%<tS_%<tL", new Date());
        BIGQUERY_CLIENT = BigqueryClient.getClient((String)APP_NAME);
    }
}

