/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.impl.rel.BaseRelTest;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRelBoundedVsBoundedTest;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSideInputJoinRelTest;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.hamcrest.core.StringContains;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class BeamSideInputLookupJoinRelTest
extends BaseRelTest {
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private static final boolean nullable = true;

    @BeforeClass
    public static void prepare() {
        BeamSideInputJoinRelTest.registerUnboundedTable();
        BeamSideInputLookupJoinRelTest.registerTable("ORDER_DETAILS1", (BeamSqlTable)BeamCoGBKJoinRelBoundedVsBoundedTest.ORDER_DETAILS1);
        BeamSideInputLookupJoinRelTest.registerTable("SITE_LKP", (BeamSqlTable)new SiteLookupTable(TestTableUtils.buildBeamSqlNullableSchema((Object[])new Object[]{Schema.FieldType.INT32, "site_id", true, Schema.FieldType.STRING, "site_name", true})));
    }

    @Test
    public void testBoundedTableInnerJoinWithLookupTable() throws Exception {
        String sql = "SELECT o1.order_id, o2.site_name FROM  ORDER_DETAILS1 o1  JOIN SITE_LKP o2  on  o1.site_id=o2.site_id  WHERE o1.site_id=2 ";
        PCollection<Row> rows = BeamSideInputLookupJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that((PCollection)((PCollection)rows.apply((PTransform)ParDo.of((DoFn)new TestUtils.BeamSqlRow2StringDoFn())))).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.FieldType.INT32, "order_id", Schema.FieldType.STRING, "site_name").addRows(1, "SITE1").getStringRows());
        this.pipeline.run();
    }

    @Test
    public void testLookupTableInnerJoinWithBoundedTable() throws Exception {
        String sql = "SELECT o1.order_id, o2.site_name FROM  SITE_LKP o2  JOIN ORDER_DETAILS1 o1  on  o1.site_id=o2.site_id  WHERE o1.site_id=2 ";
        PCollection<Row> rows = BeamSideInputLookupJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that((PCollection)((PCollection)rows.apply((PTransform)ParDo.of((DoFn)new TestUtils.BeamSqlRow2StringDoFn())))).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.FieldType.INT32, "order_id", Schema.FieldType.STRING, "site_name").addRows(1, "SITE1").getStringRows());
        this.pipeline.run();
    }

    @Test
    public void testUnboundedTableInnerJoinWithLookupTable() throws Exception {
        String sql = "SELECT o1.order_id, o2.site_name FROM (select order_id, site_id FROM ORDER_DETAILS           GROUP BY order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  JOIN  SITE_LKP o2  on  o1.site_id=o2.site_id WHERE o1.site_id=2 ";
        PCollection<Row> rows = BeamSideInputLookupJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that((PCollection)((PCollection)rows.apply((PTransform)ParDo.of((DoFn)new TestUtils.BeamSqlRow2StringDoFn())))).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.FieldType.INT32, "order_id", Schema.FieldType.STRING, "site_name").addRows(1, "SITE1").addRows(2, "SITE1").getStringRows());
        this.pipeline.run();
    }

    @Test
    public void testLookupTableInnerJoinWithUnboundedTable() throws Exception {
        String sql = "SELECT o1.order_id, o2.site_name FROM  SITE_LKP o2  JOIN (select order_id, site_id FROM ORDER_DETAILS           GROUP BY order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  on  o1.site_id=o2.site_id WHERE o1.site_id=2 ";
        PCollection<Row> rows = BeamSideInputLookupJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that((PCollection)((PCollection)rows.apply((PTransform)ParDo.of((DoFn)new TestUtils.BeamSqlRow2StringDoFn())))).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.FieldType.INT32, "order_id", Schema.FieldType.STRING, "site_name").addRows(1, "SITE1").addRows(2, "SITE1").getStringRows());
        this.pipeline.run();
    }

    @Test
    public void testLookupTableRightOuterJoinWithBoundedTable() throws Exception {
        String sql = "SELECT o1.order_id, o2.site_name FROM  SITE_LKP o2  RIGHT OUTER JOIN  ORDER_DETAILS1 o1  on  o1.site_id=o2.site_id ";
        PCollection<Row> rows = BeamSideInputLookupJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that((PCollection)((PCollection)rows.apply((PTransform)ParDo.of((DoFn)new TestUtils.BeamSqlRow2StringDoFn())))).containsInAnyOrder(TestUtils.RowsBuilder.ofNullable(Schema.FieldType.INT32, "order_id", true, Schema.FieldType.STRING, "site_name", true).addRows(1, "SITE1").addRows(2, null).addRows(3, null).getStringRows());
        this.pipeline.run();
    }

    @Test
    public void testUnboundedTableLeftOuterJoinWithLookupTable() throws Exception {
        String sql = "SELECT o1.order_id, o2.site_name FROM (select order_id, site_id FROM ORDER_DETAILS           GROUP BY order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  LEFT OUTER JOIN  SITE_LKP o2  on  o1.site_id=o2.site_id";
        PCollection<Row> rows = BeamSideInputLookupJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that((PCollection)((PCollection)rows.apply((PTransform)ParDo.of((DoFn)new TestUtils.BeamSqlRow2StringDoFn())))).containsInAnyOrder(TestUtils.RowsBuilder.ofNullable(Schema.FieldType.INT32, "order_id", true, Schema.FieldType.STRING, "site_name", true).addRows(1, "SITE1").addRows(2, "SITE1").addRows(1, null).addRows(2, null).addRows(3, null).getStringRows());
        this.pipeline.run();
    }

    @Test
    public void testLookupTableLeftOuterJoinWithBoundedTableError() throws Exception {
        String sql = "SELECT o1.order_id, o2.site_name FROM  SITE_LKP o2  LEFT OUTER JOIN  ORDER_DETAILS1 o1  on  o1.site_id=o2.site_id ";
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage(StringContains.containsString((String)"OUTER JOIN must be a non Seekable table"));
        BeamSideInputLookupJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.run();
    }

    @Test
    public void testUnboundedTableFullOuterJoinWithLookupTableError() throws Exception {
        String sql = "SELECT o1.order_id, o2.site_name FROM (select order_id, site_id FROM ORDER_DETAILS           GROUP BY order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  FULL OUTER JOIN  SITE_LKP o2  on  o1.site_id=o2.site_id";
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage(StringContains.containsString((String)"not supported"));
        BeamSideInputLookupJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.run();
    }

    @Test
    public void testUnboundedTableRightOuterJoinWithLookupTableError() throws Exception {
        String sql = "SELECT o1.order_id, o2.site_name FROM (select order_id, site_id FROM ORDER_DETAILS           GROUP BY order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  RIGHT OUTER JOIN  SITE_LKP o2  on  o1.site_id=o2.site_id";
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage(StringContains.containsString((String)"OUTER JOIN must be a non Seekable table"));
        BeamSideInputLookupJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.run();
    }

    public static class SiteLookupTable
    extends SchemaBaseBeamTable
    implements BeamSqlSeekableTable {
        public SiteLookupTable(Schema schema) {
            super(schema);
        }

        public PCollection.IsBounded isBounded() {
            return PCollection.IsBounded.BOUNDED;
        }

        public PCollection<Row> buildIOReader(PBegin begin) {
            throw new UnsupportedOperationException();
        }

        public POutput buildIOWriter(PCollection<Row> input) {
            throw new UnsupportedOperationException();
        }

        public List<Row> seekRow(Row lookupSubRow) {
            if (lookupSubRow.getInt32("site_id") == 2) {
                return Arrays.asList(Row.withSchema((Schema)this.getSchema()).addValues(new Object[]{2, "SITE1"}).build());
            }
            return Arrays.asList(Row.nullRow((Schema)this.getSchema()));
        }
    }
}

