package org.apache.beam.sdk.extensions.sql.meta.provider.hcatalog;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.io.hcatalog.test.EmbeddedMetastoreService;
import org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang.SystemUtils;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/hcatalog/BeamSqlHiveSchemaTest.class */
public class BeamSqlHiveSchemaTest implements Serializable {
    private static final Schema ROW_SCHEMA = Schema.builder().addInt32Field("f_int").addStringField("f_string").build();

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();

    @Rule
    public final transient TestPipeline defaultPipeline = TestPipeline.create();

    @Rule
    public final transient TestPipeline readAfterWritePipeline = TestPipeline.create();

    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    private static EmbeddedMetastoreService service;

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/hcatalog/BeamSqlHiveSchemaTest$ToKV.class */
    public static class ToKV extends DoFn<Row, KV<String, Integer>> {
        @DoFn.ProcessElement
        public void processElement(DoFn<Row, KV<String, Integer>>.ProcessContext processContext) {
            Row row = (Row) processContext.element();
            processContext.output(KV.of((String) row.getValue(0), (Integer) row.getValue(1)));
        }
    }

    @BeforeClass
    public static void setupEmbeddedMetastoreService() throws IOException {
        Assume.assumeFalse(SystemUtils.isJavaVersionAtLeast(1.9f));
        service = new EmbeddedMetastoreService(TMP_FOLDER.getRoot().getAbsolutePath());
    }

    @AfterClass
    public static void shutdownEmbeddedMetastoreService() throws Exception {
        if (service != null) {
            service.executeQuery("drop table mytable");
            service.close();
        }
    }

    @Test
    public void testSelectFromHCatalog() throws Exception {
        initializeHCatalog();
        PAssert.that(this.readAfterWritePipeline.apply(SqlTransform.query(String.format("SELECT f_str, f_int FROM `hive`.`%s`.`%s`", "default", "mytable")).withTableProvider("hive", hiveTableProvider())).apply(ParDo.of(new ToKV()))).containsInAnyOrder(HCatalogIOTestUtils.getExpectedRecordsAsKV(1000));
        this.readAfterWritePipeline.run();
    }

    @Test
    public void testSelectFromImplicitDefaultDb() throws Exception {
        initializeHCatalog();
        PAssert.that(this.readAfterWritePipeline.apply(SqlTransform.query(String.format("SELECT f_str, f_int FROM `hive`.`%s`", "mytable")).withTableProvider("hive", hiveTableProvider())).apply(ParDo.of(new ToKV()))).containsInAnyOrder(HCatalogIOTestUtils.getExpectedRecordsAsKV(1000));
        this.readAfterWritePipeline.run();
    }

    @Test
    public void testSelectFromImplicitDefaultSchema() throws Exception {
        initializeHCatalog();
        PAssert.that(this.readAfterWritePipeline.apply(SqlTransform.query(String.format("SELECT f_str, f_int FROM `%s`.`%s`", "default", "mytable")).withDefaultTableProvider("hive", hiveTableProvider())).apply(ParDo.of(new ToKV()))).containsInAnyOrder(HCatalogIOTestUtils.getExpectedRecordsAsKV(1000));
        this.readAfterWritePipeline.run();
    }

    @Test
    public void testSelectFromImplicitDefaultSchemaAndDB() throws Exception {
        initializeHCatalog();
        PAssert.that(this.readAfterWritePipeline.apply(SqlTransform.query(String.format("SELECT f_str, f_int FROM `%s`", "mytable")).withDefaultTableProvider("hive", hiveTableProvider())).apply(ParDo.of(new ToKV()))).containsInAnyOrder(HCatalogIOTestUtils.getExpectedRecordsAsKV(1000));
        this.readAfterWritePipeline.run();
    }

    @Test
    public void testJoinPCollectionWithHCatalog() throws Exception {
        initializeHCatalog();
        PAssert.that(this.pipeline.apply("pcollection", create(row(1, "pcollection_1"), row(2, "pcollection_2"))).apply(SqlTransform.query("SELECT hive.f_int, (hive.f_str || ' ' || pcollection.f_string) AS f_string \nFROM `hive`.`default`.`mytable` AS hive \n   INNER JOIN \n PCOLLECTION AS pcollection \n   ON pcollection.f_int = hive.f_int").withTableProvider("hive", hiveTableProvider()))).containsInAnyOrder(new Row[]{row(1, "record 1 pcollection_1"), row(2, "record 2 pcollection_2")});
        this.pipeline.run();
    }

    @Test
    public void testJoinMultipleExtraProvidersWithMain() throws Exception {
        initializeHCatalog();
        PAssert.that(this.pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, "pcollection_2"))).apply(SqlTransform.query("SELECT \n   x_tbl.f_int as f_int, \n   (p_tbl.f_string || x_tbl.f_string || ' ' || h_tbl.f_str) AS f_string \nFROM \n     `extraSchema`.`extraTable` AS x_tbl \n  INNER JOIN \n     `hive`.`default`.`mytable` AS h_tbl \n        ON h_tbl.f_int = x_tbl.f_int \n  INNER JOIN \n     PCOLLECTION AS p_tbl \n        ON p_tbl.f_int = x_tbl.f_int").withTableProvider("extraSchema", extraTableProvider("extraTable", (PCollection) this.pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, "_extra_table_2"))))).withTableProvider("hive", hiveTableProvider()))).containsInAnyOrder(new Row[]{row(1, "pcollection_1_extra_table_1 record 1"), row(2, "pcollection_2_extra_table_2 record 2")});
        this.pipeline.run();
    }

    @Test
    public void testJoinMultipleExtraProvidersWithImplicitHiveDB() throws Exception {
        initializeHCatalog();
        PAssert.that(this.pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, "pcollection_2"))).apply(SqlTransform.query("SELECT \n   x_tbl.f_int as f_int, \n   (p_tbl.f_string || x_tbl.f_string || ' ' || h_tbl.f_str) AS f_string \nFROM \n     `extraSchema`.`extraTable` AS x_tbl \n  INNER JOIN \n     `hive`.`mytable` AS h_tbl \n        ON h_tbl.f_int = x_tbl.f_int \n  INNER JOIN \n     PCOLLECTION AS p_tbl \n        ON p_tbl.f_int = x_tbl.f_int").withTableProvider("extraSchema", extraTableProvider("extraTable", (PCollection) this.pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, "_extra_table_2"))))).withTableProvider("hive", hiveTableProvider()))).containsInAnyOrder(new Row[]{row(1, "pcollection_1_extra_table_1 record 1"), row(2, "pcollection_2_extra_table_2 record 2")});
        this.pipeline.run();
    }

    private void reCreateTestTable() {
        service.executeQuery("drop table mytable");
        service.executeQuery("create table mytable(f_str string, f_int int)");
    }

    private void initializeHCatalog() throws Exception {
        reCreateTestTable();
        HCatalogIOTestUtils.insertTestData(service.getHiveConfAsMap());
    }

    private TableProvider hiveTableProvider() {
        return HCatalogTableProvider.create(service.getHiveConfAsMap());
    }

    private Row row(int i, String str) {
        return Row.withSchema(ROW_SCHEMA).addValues(new Object[]{Integer.valueOf(i), str}).build();
    }

    private TableProvider extraTableProvider(String str, PCollection<Row> pCollection) {
        return new ReadOnlyTableProvider("testExtraTableProvider", ImmutableMap.of(str, new BeamPCollectionTable(pCollection)));
    }

    private PTransform<PBegin, PCollection<Row>> create(Row... rowArr) {
        return Create.of(Arrays.asList(rowArr)).withRowSchema(ROW_SCHEMA);
    }
}
