/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.rel.BaseRelTest;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.RelNode;
import org.hamcrest.core.StringContains;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class BeamCoGBKJoinRelBoundedVsBoundedTest
extends BaseRelTest {
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    public static final TestBoundedTable ORDER_DETAILS1 = TestBoundedTable.of((Object[])new Object[]{Schema.FieldType.INT32, "order_id", Schema.FieldType.INT32, "site_id", Schema.FieldType.INT32, "price"}).addRows(new Object[]{1, 2, 3, 2, 3, 3, 3, 4, 5});
    public static final TestBoundedTable ORDER_DETAILS1_WITH_ARRAY = TestBoundedTable.of((Object[])new Object[]{Schema.FieldType.INT32, "order_id", Schema.FieldType.INT32, "site_id", Schema.FieldType.INT32, "price", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.STRING).withNullable(true), "f_stringArr"}).addRows(new Object[]{1, 2, 3, Arrays.asList("111", "222", "333"), 2, 3, 3, Arrays.asList("222", "333", "333"), 3, 4, 5, Arrays.asList("333", "444", "555")});
    public static final TestBoundedTable ORDER_DETAILS2 = TestBoundedTable.of((Object[])new Object[]{Schema.FieldType.INT32, "order_id", Schema.FieldType.INT32, "site_id", Schema.FieldType.INT32, "price"}).addRows(new Object[]{1, 2, 3, 2, 3, 3, 3, 4, 5});

    @BeforeClass
    public static void prepare() {
        BeamCoGBKJoinRelBoundedVsBoundedTest.registerTable("ORDER_DETAILS1", (BeamSqlTable)ORDER_DETAILS1);
        BeamCoGBKJoinRelBoundedVsBoundedTest.registerTable("ORDER_DETAILS1_WITH_ARRAY", (BeamSqlTable)ORDER_DETAILS1_WITH_ARRAY);
        BeamCoGBKJoinRelBoundedVsBoundedTest.registerTable("ORDER_DETAILS2", (BeamSqlTable)ORDER_DETAILS2);
    }

    @Test
    public void testInnerJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1_WITH_ARRAY o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PCollection<Row> rows = BeamCoGBKJoinRelBoundedVsBoundedTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.builder().addField("order_id", Schema.FieldType.INT32).addField("site_id", Schema.FieldType.INT32).addField("price", Schema.FieldType.INT32).addNullableField("f_stringArr", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.STRING)).addField("order_id0", Schema.FieldType.INT32).addField("site_id0", Schema.FieldType.INT32).addField("price0", Schema.FieldType.INT32).build()).addRows(2, 3, 3, Arrays.asList("222", "333", "333"), 1, 2, 3).getRows());
        this.pipeline.run();
    }

    @Test
    public void testNodeStatsEstimation() {
        String sql = "SELECT *   FROM ORDER_DETAILS1 o1  JOIN ORDER_DETAILS2 o2  on  o1.order_id=o2.site_id ";
        BeamRelNode root = env.parseQuery(sql);
        while (!(root instanceof BeamCoGBKJoinRel)) {
            root = root.getInput(0);
        }
        NodeStats estimate = BeamSqlRelUtils.getNodeStats((RelNode)root, (BeamRelMetadataQuery)((BeamRelMetadataQuery)root.getCluster().getMetadataQuery()));
        NodeStats leftEstimate = BeamSqlRelUtils.getNodeStats((RelNode)((BeamCoGBKJoinRel)root).getLeft(), (BeamRelMetadataQuery)((BeamRelMetadataQuery)root.getCluster().getMetadataQuery()));
        NodeStats rightEstimate = BeamSqlRelUtils.getNodeStats((RelNode)((BeamCoGBKJoinRel)root).getRight(), (BeamRelMetadataQuery)((BeamRelMetadataQuery)root.getCluster().getMetadataQuery()));
        Assert.assertFalse((boolean)estimate.isUnknown());
        Assert.assertEquals((double)0.0, (double)estimate.getRate(), (double)0.01);
        Assert.assertNotEquals((double)0.0, (double)estimate.getRowCount(), (double)0.001);
        Assert.assertTrue((estimate.getRowCount() < leftEstimate.getRowCount() * rightEstimate.getRowCount() ? 1 : 0) != 0);
        Assert.assertNotEquals((double)0.0, (double)estimate.getWindow(), (double)0.001);
        Assert.assertTrue((estimate.getWindow() < leftEstimate.getWindow() * rightEstimate.getWindow() ? 1 : 0) != 0);
    }

    @Test
    public void testNodeStatsOfMoreConditions() {
        String sql1 = "SELECT *   FROM ORDER_DETAILS1 o1  JOIN ORDER_DETAILS2 o2  on  o1.order_id=o2.site_id ";
        String sql2 = "SELECT *   FROM ORDER_DETAILS1 o1  JOIN ORDER_DETAILS2 o2  on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        BeamRelNode root1 = env.parseQuery(sql1);
        while (!(root1 instanceof BeamCoGBKJoinRel)) {
            root1 = root1.getInput(0);
        }
        BeamRelNode root2 = env.parseQuery(sql2);
        while (!(root2 instanceof BeamCoGBKJoinRel)) {
            root2 = root2.getInput(0);
        }
        NodeStats estimate1 = BeamSqlRelUtils.getNodeStats((RelNode)root1, (BeamRelMetadataQuery)((BeamRelMetadataQuery)root1.getCluster().getMetadataQuery()));
        NodeStats estimate2 = BeamSqlRelUtils.getNodeStats((RelNode)root2, (BeamRelMetadataQuery)((BeamRelMetadataQuery)root1.getCluster().getMetadataQuery()));
        Assert.assertNotEquals((double)0.0, (double)estimate2.getRowCount(), (double)0.001);
        Assert.assertTrue((estimate2.getRowCount() < estimate1.getRowCount() ? 1 : 0) != 0);
    }

    @Test
    public void testLeftOuterJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 LEFT OUTER JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PCollection<Row> rows = BeamCoGBKJoinRelBoundedVsBoundedTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.enableAbandonedNodeEnforcement(false);
        PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.builder().addField("order_id", Schema.FieldType.INT32).addField("site_id", Schema.FieldType.INT32).addField("price", Schema.FieldType.INT32).addNullableField("order_id0", Schema.FieldType.INT32).addNullableField("site_id0", Schema.FieldType.INT32).addNullableField("price0", Schema.FieldType.INT32).build()).addRows(1, 2, 3, null, null, null, 2, 3, 3, 1, 2, 3, 3, 4, 5, null, null, null).getRows());
        this.pipeline.run();
    }

    @Test
    public void testLeftOuterJoinWithEmptyTuplesOnRightSide() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 LEFT OUTER JOIN (SELECT * FROM ORDER_DETAILS2 WHERE FALSE) o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PCollection<Row> rows = BeamCoGBKJoinRelBoundedVsBoundedTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.enableAbandonedNodeEnforcement(false);
        PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.builder().addField("order_id", Schema.FieldType.INT32).addField("site_id", Schema.FieldType.INT32).addField("price", Schema.FieldType.INT32).addNullableField("order_id0", Schema.FieldType.INT32).addNullableField("site_id0", Schema.FieldType.INT32).addNullableField("price0", Schema.FieldType.INT32).build()).addRows(1, 2, 3, null, null, null, 2, 3, 3, null, null, null, 3, 4, 5, null, null, null).getRows());
        this.pipeline.run();
    }

    @Test
    public void testInnerJoinWithEmptyTuplesOnRightSide() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 INNER JOIN (SELECT * FROM ORDER_DETAILS2 WHERE FALSE) o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PCollection<Row> rows = BeamCoGBKJoinRelBoundedVsBoundedTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.enableAbandonedNodeEnforcement(false);
        PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.builder().addField("order_id", Schema.FieldType.INT32).addField("site_id", Schema.FieldType.INT32).addField("price", Schema.FieldType.INT32).addNullableField("order_id0", Schema.FieldType.INT32).addNullableField("site_id0", Schema.FieldType.INT32).addNullableField("price0", Schema.FieldType.INT32).build()).getRows());
        this.pipeline.run();
    }

    @Test
    public void testRightOuterJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 RIGHT OUTER JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PCollection<Row> rows = BeamCoGBKJoinRelBoundedVsBoundedTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.builder().addNullableField("order_id", Schema.FieldType.INT32).addNullableField("site_id", Schema.FieldType.INT32).addNullableField("price", Schema.FieldType.INT32).addField("order_id0", Schema.FieldType.INT32).addField("site_id0", Schema.FieldType.INT32).addField("price0", Schema.FieldType.INT32).build()).addRows(2, 3, 3, 1, 2, 3, null, null, null, 2, 3, 3, null, null, null, 3, 4, 5).getRows());
        this.pipeline.run();
    }

    @Test
    public void testFullOuterJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 FULL OUTER JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PCollection<Row> rows = BeamCoGBKJoinRelBoundedVsBoundedTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.builder().addNullableField("order_id", Schema.FieldType.INT32).addNullableField("site_id", Schema.FieldType.INT32).addNullableField("price", Schema.FieldType.INT32).addNullableField("order_id0", Schema.FieldType.INT32).addNullableField("site_id0", Schema.FieldType.INT32).addNullableField("price0", Schema.FieldType.INT32).build()).addRows(2, 3, 3, 1, 2, 3, 1, 2, 3, null, null, null, 3, 4, 5, null, null, null, null, null, null, 2, 3, 3, null, null, null, 3, 4, 5).getRows());
        this.pipeline.run();
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testException_nonEqualJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id>o2.site_id";
        this.pipeline.enableAbandonedNodeEnforcement(false);
        BeamCoGBKJoinRelBoundedVsBoundedTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.run();
    }

    @Test
    public void testException_join_condition1() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id = o2.site_id OR o1.price = o2.site_id";
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage(StringContains.containsString((String)"Operator OR"));
        BeamCoGBKJoinRelBoundedVsBoundedTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.run();
    }

    @Test
    public void testException_join_condition2() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id = o2.site_id AND o1.price > o2.site_id";
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage(StringContains.containsString((String)"Non equi-join"));
        BeamCoGBKJoinRelBoundedVsBoundedTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.run();
    }

    @Test
    public void testException_join_condition3() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id + o2.site_id = 2";
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage(StringContains.containsString((String)"column reference"));
        this.thrown.expectMessage(StringContains.containsString((String)"struct field access"));
        BeamCoGBKJoinRelBoundedVsBoundedTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.run();
    }

    @Test
    public void testException_join_condition4() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id + o2.site_id = 2 AND o1.price > o2.site_id";
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage(StringContains.containsString((String)"column reference"));
        this.thrown.expectMessage(StringContains.containsString((String)"struct field access"));
        BeamCoGBKJoinRelBoundedVsBoundedTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.run();
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testException_crossJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
        this.pipeline.enableAbandonedNodeEnforcement(false);
        BeamCoGBKJoinRelBoundedVsBoundedTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.run();
    }
}

