/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.rel.BaseRelTest;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSideInputJoinRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.RelNode;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

public class BeamSideInputJoinRelTest
extends BaseRelTest {
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    public static final DateTime FIRST_DATE = new DateTime(1L);
    public static final DateTime SECOND_DATE = new DateTime(3600001L);
    public static final DateTime THIRD_DATE = new DateTime(7200002L);
    private static final Duration WINDOW_SIZE = Duration.standardHours((long)1L);

    @BeforeClass
    public static void prepare() {
        BeamSideInputJoinRelTest.registerUnboundedTable();
        BeamSideInputJoinRelTest.registerTable("ORDER_DETAILS1", (BeamSqlTable)TestBoundedTable.of((Object[])new Object[]{Schema.FieldType.INT32, "order_id", Schema.FieldType.STRING, "buyer"}).addRows(new Object[]{1, "james", 2, "bond"}));
    }

    public static void registerUnboundedTable() {
        BeamSideInputJoinRelTest.registerTable("ORDER_DETAILS", (BeamSqlTable)TestUnboundedTable.of((Object[])new Object[]{Schema.FieldType.INT32, "order_id", Schema.FieldType.INT32, "site_id", Schema.FieldType.INT32, "price", Schema.FieldType.DATETIME, "order_time"}).timestampColumnIndex(3).addRows(Duration.ZERO, new Object[]{1, 1, 1, FIRST_DATE, 1, 2, 2, FIRST_DATE}).addRows(WINDOW_SIZE.plus((ReadableDuration)Duration.standardSeconds((long)1L)), new Object[]{2, 2, 3, SECOND_DATE, 2, 3, 3, SECOND_DATE, 1, 2, 3, FIRST_DATE}).addRows(WINDOW_SIZE.plus((ReadableDuration)WINDOW_SIZE).plus((ReadableDuration)Duration.standardSeconds((long)1L)), new Object[]{3, 3, 3, THIRD_DATE, 2, 2, 3, SECOND_DATE}));
    }

    @Test
    public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception {
        String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  JOIN  ORDER_DETAILS1 o2  on  o1.order_id=o2.order_id";
        PCollection<Row> rows = BeamSideInputJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that((PCollection)((PCollection)rows.apply((PTransform)ParDo.of((DoFn)new TestUtils.BeamSqlRow2StringDoFn())))).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.FieldType.INT32, "order_id", Schema.FieldType.INT32, "sum_site_id", Schema.FieldType.STRING, "buyer").addRows(1, 3, "james", 2, 5, "bond").getStringRows());
        this.pipeline.run();
    }

    @Test
    public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception {
        String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM  ORDER_DETAILS1 o2  JOIN (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  on  o1.order_id=o2.order_id";
        PCollection<Row> rows = BeamSideInputJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that((PCollection)((PCollection)rows.apply((PTransform)ParDo.of((DoFn)new TestUtils.BeamSqlRow2StringDoFn())))).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.FieldType.INT32, "order_id", Schema.FieldType.INT32, "sum_site_id", Schema.FieldType.STRING, "buyer").addRows(1, 3, "james", 2, 5, "bond").getStringRows());
        this.pipeline.run();
    }

    @Test
    public void testNodeStatsEstimation() {
        String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  JOIN  ORDER_DETAILS1 o2  on  o1.order_id=o2.order_id";
        BeamRelNode root = env.parseQuery(sql);
        while (!(root instanceof BeamSideInputJoinRel)) {
            root = root.getInput(0);
        }
        NodeStats estimate = BeamSqlRelUtils.getNodeStats((RelNode)root, (BeamRelMetadataQuery)((BeamRelMetadataQuery)root.getCluster().getMetadataQuery()));
        NodeStats leftEstimate = BeamSqlRelUtils.getNodeStats((RelNode)((BeamSideInputJoinRel)root).getLeft(), (BeamRelMetadataQuery)((BeamRelMetadataQuery)root.getCluster().getMetadataQuery()));
        NodeStats rightEstimate = BeamSqlRelUtils.getNodeStats((RelNode)((BeamSideInputJoinRel)root).getRight(), (BeamRelMetadataQuery)((BeamRelMetadataQuery)root.getCluster().getMetadataQuery()));
        Assert.assertFalse((boolean)estimate.isUnknown());
        Assert.assertEquals((double)0.0, (double)estimate.getRowCount(), (double)0.01);
        Assert.assertNotEquals((double)0.0, (double)estimate.getRate(), (double)0.001);
        Assert.assertTrue((estimate.getRate() < leftEstimate.getRowCount() * rightEstimate.getWindow() + rightEstimate.getRowCount() * leftEstimate.getWindow() ? 1 : 0) != 0);
        Assert.assertNotEquals((double)0.0, (double)estimate.getWindow(), (double)0.001);
        Assert.assertTrue((estimate.getWindow() < leftEstimate.getWindow() * rightEstimate.getWindow() ? 1 : 0) != 0);
    }

    @Test
    public void testLeftOuterJoin() throws Exception {
        String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  LEFT OUTER JOIN  ORDER_DETAILS1 o2  on  o1.order_id=o2.order_id";
        PCollection<Row> rows = BeamSideInputJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        rows.apply((PTransform)ParDo.of((DoFn)new BeamSqlOutputToConsoleFn("helloworld")));
        PAssert.that((PCollection)((PCollection)rows.apply((PTransform)ParDo.of((DoFn)new TestUtils.BeamSqlRow2StringDoFn())))).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.builder().addField("order_id", Schema.FieldType.INT32).addField("sum_site_id", Schema.FieldType.INT32).addNullableField("buyer", Schema.FieldType.STRING).build()).addRows(1, 3, "james", 2, 5, "bond", 3, 3, null).getStringRows());
        this.pipeline.run();
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testLeftOuterJoinError() throws Exception {
        String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM  ORDER_DETAILS1 o2  LEFT OUTER JOIN (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  on  o1.order_id=o2.order_id";
        this.pipeline.enableAbandonedNodeEnforcement(false);
        BeamSideInputJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.run();
    }

    @Test
    public void testRightOuterJoin() throws Exception {
        String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM  ORDER_DETAILS1 o2  RIGHT OUTER JOIN (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  on  o1.order_id=o2.order_id";
        PCollection<Row> rows = BeamSideInputJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that((PCollection)((PCollection)rows.apply((PTransform)ParDo.of((DoFn)new TestUtils.BeamSqlRow2StringDoFn())))).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.builder().addField("order_id", Schema.FieldType.INT32).addField("sum_site_id", Schema.FieldType.INT32).addNullableField("buyer", Schema.FieldType.STRING).build()).addRows(1, 3, "james", 2, 5, "bond", 3, 3, null).getStringRows());
        this.pipeline.run();
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testRightOuterJoinError() throws Exception {
        String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  RIGHT OUTER JOIN  ORDER_DETAILS1 o2  on  o1.order_id=o2.order_id";
        this.pipeline.enableAbandonedNodeEnforcement(false);
        BeamSideInputJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.run();
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testFullOuterJoinError() throws Exception {
        String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM  ORDER_DETAILS1 o2  FULL OUTER JOIN (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  on  o1.order_id=o2.order_id";
        this.pipeline.enableAbandonedNodeEnforcement(false);
        BeamSideInputJoinRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        this.pipeline.run();
    }
}

