/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.hcatalog;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.hcatalog.HCatalogTableProvider;
import org.apache.beam.sdk.io.hcatalog.test.EmbeddedMetastoreService;
import org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang.SystemUtils;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BeamSqlHiveSchemaTest
implements Serializable {
    private static final Schema ROW_SCHEMA = Schema.builder().addInt32Field("f_int").addStringField("f_string").build();
    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    @Rule
    public final transient TestPipeline defaultPipeline = TestPipeline.create();
    @Rule
    public final transient TestPipeline readAfterWritePipeline = TestPipeline.create();
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    private static EmbeddedMetastoreService service;

    @BeforeClass
    public static void setupEmbeddedMetastoreService() throws IOException {
        Assume.assumeFalse((boolean)SystemUtils.isJavaVersionAtLeast((float)1.9f));
        service = new EmbeddedMetastoreService(TMP_FOLDER.getRoot().getAbsolutePath());
    }

    @AfterClass
    public static void shutdownEmbeddedMetastoreService() throws Exception {
        if (service != null) {
            service.executeQuery("drop table mytable");
            service.close();
        }
    }

    @Test
    public void testSelectFromHCatalog() throws Exception {
        this.initializeHCatalog();
        PCollection output = (PCollection)((PCollection)this.readAfterWritePipeline.apply((PTransform)SqlTransform.query((String)String.format("SELECT f_str, f_int FROM `hive`.`%s`.`%s`", "default", "mytable")).withTableProvider("hive", this.hiveTableProvider()))).apply((PTransform)ParDo.of((DoFn)new ToKV()));
        PAssert.that((PCollection)output).containsInAnyOrder((Iterable)HCatalogIOTestUtils.getExpectedRecordsAsKV((int)1000));
        this.readAfterWritePipeline.run();
    }

    @Test
    public void testSelectFromImplicitDefaultDb() throws Exception {
        this.initializeHCatalog();
        PCollection output = (PCollection)((PCollection)this.readAfterWritePipeline.apply((PTransform)SqlTransform.query((String)String.format("SELECT f_str, f_int FROM `hive`.`%s`", "mytable")).withTableProvider("hive", this.hiveTableProvider()))).apply((PTransform)ParDo.of((DoFn)new ToKV()));
        PAssert.that((PCollection)output).containsInAnyOrder((Iterable)HCatalogIOTestUtils.getExpectedRecordsAsKV((int)1000));
        this.readAfterWritePipeline.run();
    }

    @Test
    public void testSelectFromImplicitDefaultSchema() throws Exception {
        this.initializeHCatalog();
        PCollection output = (PCollection)((PCollection)this.readAfterWritePipeline.apply((PTransform)SqlTransform.query((String)String.format("SELECT f_str, f_int FROM `%s`.`%s`", "default", "mytable")).withDefaultTableProvider("hive", this.hiveTableProvider()))).apply((PTransform)ParDo.of((DoFn)new ToKV()));
        PAssert.that((PCollection)output).containsInAnyOrder((Iterable)HCatalogIOTestUtils.getExpectedRecordsAsKV((int)1000));
        this.readAfterWritePipeline.run();
    }

    @Test
    public void testSelectFromImplicitDefaultSchemaAndDB() throws Exception {
        this.initializeHCatalog();
        PCollection output = (PCollection)((PCollection)this.readAfterWritePipeline.apply((PTransform)SqlTransform.query((String)String.format("SELECT f_str, f_int FROM `%s`", "mytable")).withDefaultTableProvider("hive", this.hiveTableProvider()))).apply((PTransform)ParDo.of((DoFn)new ToKV()));
        PAssert.that((PCollection)output).containsInAnyOrder((Iterable)HCatalogIOTestUtils.getExpectedRecordsAsKV((int)1000));
        this.readAfterWritePipeline.run();
    }

    @Test
    public void testJoinPCollectionWithHCatalog() throws Exception {
        this.initializeHCatalog();
        PCollection inputMain = (PCollection)this.pipeline.apply("pcollection", this.create(this.row(1, "pcollection_1"), this.row(2, "pcollection_2")));
        PCollection result = (PCollection)inputMain.apply((PTransform)SqlTransform.query((String)"SELECT hive.f_int, (hive.f_str || ' ' || pcollection.f_string) AS f_string \nFROM `hive`.`default`.`mytable` AS hive \n   INNER JOIN \n PCOLLECTION AS pcollection \n   ON pcollection.f_int = hive.f_int").withTableProvider("hive", this.hiveTableProvider()));
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new Row[]{this.row(1, "record 1 pcollection_1"), this.row(2, "record 2 pcollection_2")});
        this.pipeline.run();
    }

    @Test
    public void testJoinMultipleExtraProvidersWithMain() throws Exception {
        this.initializeHCatalog();
        PCollection inputMain = (PCollection)this.pipeline.apply("mainInput", this.create(this.row(1, "pcollection_1"), this.row(2, "pcollection_2")));
        PCollection inputExtra = (PCollection)this.pipeline.apply("extraInput", this.create(this.row(1, "_extra_table_1"), this.row(2, "_extra_table_2")));
        PCollection result = (PCollection)inputMain.apply((PTransform)SqlTransform.query((String)"SELECT \n   x_tbl.f_int as f_int, \n   (p_tbl.f_string || x_tbl.f_string || ' ' || h_tbl.f_str) AS f_string \nFROM \n     `extraSchema`.`extraTable` AS x_tbl \n  INNER JOIN \n     `hive`.`default`.`mytable` AS h_tbl \n        ON h_tbl.f_int = x_tbl.f_int \n  INNER JOIN \n     PCOLLECTION AS p_tbl \n        ON p_tbl.f_int = x_tbl.f_int").withTableProvider("extraSchema", this.extraTableProvider("extraTable", (PCollection<Row>)inputExtra)).withTableProvider("hive", this.hiveTableProvider()));
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new Row[]{this.row(1, "pcollection_1_extra_table_1 record 1"), this.row(2, "pcollection_2_extra_table_2 record 2")});
        this.pipeline.run();
    }

    @Test
    public void testJoinMultipleExtraProvidersWithImplicitHiveDB() throws Exception {
        this.initializeHCatalog();
        PCollection inputMain = (PCollection)this.pipeline.apply("mainInput", this.create(this.row(1, "pcollection_1"), this.row(2, "pcollection_2")));
        PCollection inputExtra = (PCollection)this.pipeline.apply("extraInput", this.create(this.row(1, "_extra_table_1"), this.row(2, "_extra_table_2")));
        PCollection result = (PCollection)inputMain.apply((PTransform)SqlTransform.query((String)"SELECT \n   x_tbl.f_int as f_int, \n   (p_tbl.f_string || x_tbl.f_string || ' ' || h_tbl.f_str) AS f_string \nFROM \n     `extraSchema`.`extraTable` AS x_tbl \n  INNER JOIN \n     `hive`.`mytable` AS h_tbl \n        ON h_tbl.f_int = x_tbl.f_int \n  INNER JOIN \n     PCOLLECTION AS p_tbl \n        ON p_tbl.f_int = x_tbl.f_int").withTableProvider("extraSchema", this.extraTableProvider("extraTable", (PCollection<Row>)inputExtra)).withTableProvider("hive", this.hiveTableProvider()));
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new Row[]{this.row(1, "pcollection_1_extra_table_1 record 1"), this.row(2, "pcollection_2_extra_table_2 record 2")});
        this.pipeline.run();
    }

    private void reCreateTestTable() {
        service.executeQuery("drop table mytable");
        service.executeQuery("create table mytable(f_str string, f_int int)");
    }

    private void initializeHCatalog() throws Exception {
        this.reCreateTestTable();
        HCatalogIOTestUtils.insertTestData((Map)service.getHiveConfAsMap());
    }

    private TableProvider hiveTableProvider() {
        return HCatalogTableProvider.create((Map)service.getHiveConfAsMap());
    }

    private Row row(int fIntValue, String fStringValue) {
        return Row.withSchema((Schema)ROW_SCHEMA).addValues(new Object[]{fIntValue, fStringValue}).build();
    }

    private TableProvider extraTableProvider(String tableName, PCollection<Row> rows) {
        return new ReadOnlyTableProvider("testExtraTableProvider", (Map)ImmutableMap.of((Object)tableName, (Object)new BeamPCollectionTable(rows)));
    }

    private PTransform<PBegin, PCollection<Row>> create(Row ... rows) {
        return Create.of(Arrays.asList(rows)).withRowSchema(ROW_SCHEMA);
    }

    public static class ToKV
    extends DoFn<Row, KV<String, Integer>> {
        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            Row row = (Row)c.element();
            c.output((Object)KV.of((Object)((String)row.getValue(0)), (Object)((Integer)row.getValue(1))));
        }
    }
}

