/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql;

import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BeamRecordCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.BeamSql;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.junit.Rule;
import org.junit.Test;

public class BeamSqlDslJoinTest {
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    private static final BeamRecordSqlType SOURCE_RECORD_TYPE = BeamRecordSqlType.create(Arrays.asList("order_id", "site_id", "price"), Arrays.asList(4, 4, 4));
    private static final BeamRecordCoder SOURCE_CODER = SOURCE_RECORD_TYPE.getRecordCoder();
    private static final BeamRecordSqlType RESULT_RECORD_TYPE = BeamRecordSqlType.create(Arrays.asList("order_id", "site_id", "price", "order_id0", "site_id0", "price0"), Arrays.asList(4, 4, 4, 4, 4, 4));
    private static final BeamRecordCoder RESULT_CODER = RESULT_RECORD_TYPE.getRecordCoder();

    @Test
    public void testInnerJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PAssert.that(this.queryFromOrderTables(sql)).containsInAnyOrder(TestUtils.RowsBuilder.of(RESULT_RECORD_TYPE).addRows(2, 3, 3, 1, 2, 3).getRows());
        this.pipeline.run();
    }

    @Test
    public void testLeftOuterJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 LEFT OUTER JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PAssert.that(this.queryFromOrderTables(sql)).containsInAnyOrder(TestUtils.RowsBuilder.of(RESULT_RECORD_TYPE).addRows(1, 2, 3, null, null, null, 2, 3, 3, 1, 2, 3, 3, 4, 5, null, null, null).getRows());
        this.pipeline.run();
    }

    @Test
    public void testRightOuterJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 RIGHT OUTER JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PAssert.that(this.queryFromOrderTables(sql)).containsInAnyOrder(TestUtils.RowsBuilder.of(RESULT_RECORD_TYPE).addRows(2, 3, 3, 1, 2, 3, null, null, null, 2, 3, 3, null, null, null, 3, 4, 5).getRows());
        this.pipeline.run();
    }

    @Test
    public void testFullOuterJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 FULL OUTER JOIN ORDER_DETAILS2 o2 on  o1.order_id=o2.site_id AND o2.price=o1.site_id";
        PAssert.that(this.queryFromOrderTables(sql)).containsInAnyOrder(TestUtils.RowsBuilder.of(RESULT_RECORD_TYPE).addRows(2, 3, 3, 1, 2, 3, 1, 2, 3, null, null, null, 3, 4, 5, null, null, null, null, null, null, 2, 3, 3, null, null, null, 3, 4, 5).getRows());
        this.pipeline.run();
    }

    @Test(expected=IllegalStateException.class)
    public void testException_nonEqualJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1 JOIN ORDER_DETAILS2 o2 on  o1.order_id>o2.site_id";
        this.pipeline.enableAbandonedNodeEnforcement(false);
        this.queryFromOrderTables(sql);
        this.pipeline.run();
    }

    @Test(expected=IllegalStateException.class)
    public void testException_crossJoin() throws Exception {
        String sql = "SELECT *  FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
        this.pipeline.enableAbandonedNodeEnforcement(false);
        this.queryFromOrderTables(sql);
        this.pipeline.run();
    }

    private PCollection<BeamRecord> queryFromOrderTables(String sql) {
        return ((PCollection)PCollectionTuple.of((TupleTag)new TupleTag("ORDER_DETAILS1"), (PCollection)BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1.buildIOReader((Pipeline)this.pipeline).setCoder((Coder)SOURCE_CODER)).and(new TupleTag("ORDER_DETAILS2"), BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2.buildIOReader((Pipeline)this.pipeline).setCoder((Coder)SOURCE_CODER)).apply("join", (PTransform)BeamSql.queryMulti((String)sql))).setCoder((Coder)RESULT_CODER);
    }
}

