package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.Date;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsUnboundedTest.class */
public class BeamJoinRelUnboundedVsUnboundedTest extends BaseRelTest {

    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    private static final BeamSqlEnv BEAM_SQL_ENV = new BeamSqlEnv();
    public static final Date FIRST_DATE = new Date(1);
    public static final Date SECOND_DATE = new Date(3600001);
    private static final Duration WINDOW_SIZE = Duration.standardHours(1);

    @BeforeClass
    public static void prepare() {
        BEAM_SQL_ENV.registerTable("ORDER_DETAILS", MockedUnboundedTable.of(4, "order_id", 4, "site_id", 4, "price", 93, "order_time").timestampColumnIndex(3).addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE).addRows(WINDOW_SIZE.plus(Duration.standardMinutes(1L)), 2, 2, 7, SECOND_DATE, 2, 3, 8, SECOND_DATE, 1, 3, 3, FIRST_DATE).addRows(WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1L)), 2, 3, 3, SECOND_DATE));
    }

    @Test
    public void testInnerJoin() throws Exception {
        PAssert.that(compilePipeline("SELECT * FROM (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  JOIN (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2  on  o1.order_id=o2.order_id", this.pipeline, BEAM_SQL_ENV).apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))).containsInAnyOrder(TestUtils.RowsBuilder.of(4, "order_id", 4, "sum_site_id", 4, "order_id0", 4, "sum_site_id0").addRows(1, 3, 1, 3, 2, 5, 2, 5).getStringRows());
        this.pipeline.run();
    }

    @Test
    public void testLeftOuterJoin() throws Exception {
        PAssert.that(compilePipeline("SELECT * FROM (select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  LEFT OUTER JOIN (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2  on  o1.order_id=o2.order_id", this.pipeline, BEAM_SQL_ENV).apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))).containsInAnyOrder(TestUtils.RowsBuilder.of(4, "order_id", 4, "sum_site_id", 4, "order_id0", 4, "sum_site_id0").addRows(1, 1, 1, 3, 2, 2, null, null, 2, 2, 2, 5, 3, 3, null, null).getStringRows());
        this.pipeline.run();
    }

    @Test
    public void testRightOuterJoin() throws Exception {
        PAssert.that(compilePipeline("SELECT * FROM (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  RIGHT OUTER JOIN (select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2  on  o1.order_id=o2.order_id", this.pipeline, BEAM_SQL_ENV).apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))).containsInAnyOrder(TestUtils.RowsBuilder.of(4, "order_id", 4, "sum_site_id", 4, "order_id0", 4, "sum_site_id0").addRows(1, 3, 1, 1, null, null, 2, 2, 2, 5, 2, 2, null, null, 3, 3).getStringRows());
        this.pipeline.run();
    }

    @Test
    public void testFullOuterJoin() throws Exception {
        PCollection<BeamRecord> compilePipeline = compilePipeline("SELECT * FROM (select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1  FULL OUTER JOIN (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2  on  o1.order_id1=o2.order_id", this.pipeline, BEAM_SQL_ENV);
        compilePipeline.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello")));
        PAssert.that(compilePipeline.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))).containsInAnyOrder(TestUtils.RowsBuilder.of(4, "order_id1", 4, "sum_site_id", 4, "order_id", 4, "sum_site_id0").addRows(1, 1, 1, 3, 6, 2, null, null, 7, 2, null, null, 8, 3, null, null, null, null, 2, 5).getStringRows());
        this.pipeline.run();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testWindowsMismatch() throws Exception {
        this.pipeline.enableAbandonedNodeEnforcement(false);
        compilePipeline("SELECT * FROM (select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1  LEFT OUTER JOIN (select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS           GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2  on  o1.order_id=o2.order_id", this.pipeline, BEAM_SQL_ENV);
        this.pipeline.run();
    }
}
