/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql;

import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRelBoundedVsBoundedTest;
import org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class BeamSqlDslJoinTest {
    @Rule
    public final ExpectedException thrown = ExpectedException.none();
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    private static final Schema SOURCE_ROW_TYPE = Schema.builder().addNullableField("order_id", Schema.FieldType.INT32).addNullableField("site_id", Schema.FieldType.INT32).addNullableField("price", Schema.FieldType.INT32).build();
    private static final Schema RESULT_ROW_TYPE = Schema.builder().addNullableField("order_id", Schema.FieldType.INT32).addNullableField("site_id", Schema.FieldType.INT32).addNullableField("price", Schema.FieldType.INT32).addNullableField("order_id0", Schema.FieldType.INT32).addNullableField("site_id0", Schema.FieldType.INT32).addNullableField("price0", Schema.FieldType.INT32).build();

    @Test
    public void testInnerJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PAssert.that(this.queryFromOrderTables(sql)).containsInAnyOrder(TestUtils.RowsBuilder.of(RESULT_ROW_TYPE).addRows(2, 3, 3, 1, 2, 3).getRows());
        this.pipeline.run();
    }

    @Test
    public void testLeftOuterJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 LEFT OUTER JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PAssert.that(this.queryFromOrderTables(sql)).containsInAnyOrder(TestUtils.RowsBuilder.of(RESULT_ROW_TYPE).addRows(1, 2, 3, null, null, null, 2, 3, 3, 1, 2, 3, 3, 4, 5, null, null, null).getRows());
        this.pipeline.run();
    }

    @Test
    public void testRightOuterJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 RIGHT OUTER JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PAssert.that(this.queryFromOrderTables(sql)).containsInAnyOrder(TestUtils.RowsBuilder.of(RESULT_ROW_TYPE).addRows(2, 3, 3, 1, 2, 3, null, null, null, 2, 3, 3, null, null, null, 3, 4, 5).getRows());
        this.pipeline.run();
    }

    @Test
    public void testFullOuterJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 FULL OUTER JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PAssert.that(this.queryFromOrderTables(sql)).containsInAnyOrder(TestUtils.RowsBuilder.of(RESULT_ROW_TYPE).addRows(2, 3, 3, 1, 2, 3, 1, 2, 3, null, null, null, 3, 4, 5, null, null, null, null, null, null, 2, 3, 3, null, null, null, 3, 4, 5).getRows());
        this.pipeline.run();
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testException_nonEqualJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id>o2.site_id";
        this.pipeline.enableAbandonedNodeEnforcement(false);
        this.queryFromOrderTables(sql);
        this.pipeline.run();
    }

    @Test(expected=UnsupportedOperationException.class)
    public void testException_crossJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
        this.pipeline.enableAbandonedNodeEnforcement(false);
        this.queryFromOrderTables(sql);
        this.pipeline.run();
    }

    @Test
    public void testJoinsUnboundedWithinWindowsWithDefaultTrigger() throws Exception {
        String sql = "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PCollection orders = (PCollection)this.ordersUnbounded().apply("window", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardSeconds((long)50L))));
        PCollectionTuple inputs = TestUtils.tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);
        PAssert.that((PCollection)((PCollection)inputs.apply("sql", (PTransform)SqlTransform.query((String)sql)))).containsInAnyOrder(TestUtils.RowsBuilder.of(RESULT_ROW_TYPE).addRows(1, 2, 2, 2, 2, 1, 1, 4, 3, 3, 3, 1).getRows());
        this.pipeline.run();
    }

    @Test
    public void testRejectsUnboundedWithinWindowsWithEndOfWindowTrigger() throws Exception {
        String sql = "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PCollection orders = (PCollection)this.ordersUnbounded().apply("window", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardSeconds((long)50L))).triggering((Trigger)AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO).accumulatingFiredPanes());
        PCollectionTuple inputs = TestUtils.tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage(Matchers.stringContainsInOrder(Arrays.asList("once per window", "default trigger")));
        inputs.apply("sql", (PTransform)SqlTransform.query((String)sql));
        this.pipeline.run();
    }

    @Test
    public void testRejectsGlobalWindowsWithDefaultTriggerInUnboundedInput() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PCollection<Row> orders = this.ordersUnbounded();
        PCollectionTuple inputs = TestUtils.tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage(Matchers.stringContainsInOrder(Arrays.asList("once per window", "default trigger")));
        inputs.apply("sql", (PTransform)SqlTransform.query((String)sql));
        this.pipeline.run();
    }

    @Test
    public void testRejectsGlobalWindowsWithEndOfWindowTrigger() throws Exception {
        String sql = "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PCollection orders = (PCollection)this.ordersUnbounded().apply("window", (PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO).accumulatingFiredPanes());
        PCollectionTuple inputs = TestUtils.tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage(Matchers.stringContainsInOrder(Arrays.asList("once per window", "default trigger")));
        inputs.apply("sql", (PTransform)SqlTransform.query((String)sql));
        this.pipeline.run();
    }

    @Test
    public void testRejectsNonGlobalWindowsWithRepeatingTrigger() throws Exception {
        String sql = "SELECT o1.order_id, o1.price, o1.site_id, o2.order_id, o2.price, o2.site_id  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PCollection orders = (PCollection)this.ordersUnbounded().apply("window", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardSeconds((long)203L))).triggering((Trigger)Repeatedly.forever((Trigger)AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardMinutes((long)2L)).accumulatingFiredPanes());
        PCollectionTuple inputs = TestUtils.tuple("ORDER_DETAILS1", orders, "ORDER_DETAILS2", orders);
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage(Matchers.stringContainsInOrder(Arrays.asList("once per window", "default trigger")));
        inputs.apply("sql", (PTransform)SqlTransform.query((String)sql));
        this.pipeline.run();
    }

    private PCollection<Row> ordersUnbounded() {
        DateTime ts = DateTimeUtils.parseTimestampWithoutTimeZone("2017-1-1 1:0:0");
        return TestUtils.rowsBuilderOf(Schema.builder().addInt32Field("order_id").addInt32Field("price").addInt32Field("site_id").addDateTimeField("timestamp").build()).addRows(1, 2, 2, ts.plusSeconds(0), 2, 2, 1, ts.plusSeconds(40), 1, 4, 3, ts.plusSeconds(60), 3, 2, 1, ts.plusSeconds(65), 3, 3, 1, ts.plusSeconds(70)).getPCollectionBuilder().withTimestampField("timestamp").inPipeline((Pipeline)this.pipeline).buildUnbounded();
    }

    private PCollection<Row> queryFromOrderTables(String sql) {
        return ((PCollection)TestUtils.tuple("ORDER_DETAILS1", BeamCoGBKJoinRelBoundedVsBoundedTest.ORDER_DETAILS1.buildIOReader(this.pipeline.begin()).setRowSchema(SOURCE_ROW_TYPE), "ORDER_DETAILS2", BeamCoGBKJoinRelBoundedVsBoundedTest.ORDER_DETAILS2.buildIOReader(this.pipeline.begin()).setRowSchema(SOURCE_ROW_TYPE)).apply("join", (PTransform)SqlTransform.query((String)sql))).setRowSchema(RESULT_ROW_TYPE);
    }
}

