/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.impl.rel;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.impl.rel.BaseRelTest;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

public class BeamMatchRelTest {
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();

    @Test
    public void matchLogicalPlanTest() {
        Schema schemaType = Schema.builder().addInt32Field("id").addStringField("name").addInt32Field("proctime").build();
        BaseRelTest.registerTable("TestTable", (BeamSqlTable)TestBoundedTable.of((Schema)schemaType).addRows(new Object[]{1, "a", 1, 1, "b", 2, 1, "c", 3}));
        String sql = "SELECT * FROM TestTable MATCH_RECOGNIZE (PARTITION BY id ORDER BY proctime ALL ROWS PER MATCH PATTERN (A B C) DEFINE A AS name = 'a', B AS name = 'b', C AS name = 'c' ) AS T";
        PCollection<Row> result = BaseRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that(result).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.FieldType.INT32, "id", Schema.FieldType.STRING, "name", Schema.FieldType.INT32, "proctime").addRows(1, "a", 1, 1, "b", 2, 1, "c", 3).getRows());
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void matchQuantifierTest() {
        Schema schemaType = Schema.builder().addInt32Field("id").addStringField("name").addInt32Field("proctime").build();
        BaseRelTest.registerTable("TestTable", (BeamSqlTable)TestBoundedTable.of((Schema)schemaType).addRows(new Object[]{1, "a", 1, 1, "a", 2, 1, "b", 3, 1, "c", 4}));
        String sql = "SELECT * FROM TestTable MATCH_RECOGNIZE (PARTITION BY id ORDER BY proctime ALL ROWS PER MATCH PATTERN (A+ B C) DEFINE A AS name = 'a', B AS name = 'b', C AS name = 'c' ) AS T";
        PCollection<Row> result = BaseRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that(result).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.FieldType.INT32, "id", Schema.FieldType.STRING, "name", Schema.FieldType.INT32, "proctime").addRows(1, "a", 1, 1, "a", 2, 1, "b", 3, 1, "c", 4).getRows());
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void matchMeasuresTest() {
        Schema schemaType = Schema.builder().addInt32Field("id").addStringField("name").addInt32Field("proctime").build();
        BaseRelTest.registerTable("TestTable", (BeamSqlTable)TestBoundedTable.of((Schema)schemaType).addRows(new Object[]{1, "a", 1, 1, "a", 2, 1, "b", 3, 1, "c", 4, 1, "b", 8, 1, "a", 7, 1, "c", 9, 2, "a", 6, 2, "b", 10, 2, "c", 11, 5, "a", 0}));
        String sql = "SELECT * FROM TestTable MATCH_RECOGNIZE (PARTITION BY id ORDER BY proctime MEASURES LAST (A.proctime) AS atime, B.proctime AS btime, C.proctime AS ctime PATTERN (A+ B C) DEFINE A AS name = 'a', B AS name = 'b', C AS name = 'c' ) AS T WHERE T.id > 0";
        PCollection<Row> result = BaseRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that(result).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.FieldType.INT32, "id", Schema.FieldType.INT32, "T.atime", Schema.FieldType.INT32, "T.btime", Schema.FieldType.INT32, "T.ctime").addRows(1, 2, 3, 4, 1, 7, 8, 9, 2, 6, 10, 11).getRows());
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void matchPrevFunctionTest() {
        Schema schemaType = Schema.builder().addInt32Field("transTime").addInt32Field("price").build();
        BaseRelTest.registerTable("TestTable", (BeamSqlTable)TestBoundedTable.of((Schema)schemaType).addRows(new Object[]{3, 1, 1, 3, 2, 2, 4, 5, 5, 6}));
        String sql = "SELECT * FROM TestTable MATCH_RECOGNIZE (ORDER BY transTime MEASURES LAST (A.price) AS beforePrice, FIRST (B.price) AS afterPrice PATTERN (A+ B+) DEFINE A AS price < PREV(A.price), B AS price > PREV(B.price) ) AS T ";
        PCollection<Row> result = BaseRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that(result).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.FieldType.INT32, "beforePrice", Schema.FieldType.INT32, "afterPrice").addRows(1, 5).getRows());
        this.pipeline.run().waitUntilFinish();
    }

    @Ignore(value="NFA has not been fully implemented for now.")
    @Test
    public void matchNFATest() {
        Schema schemaType = Schema.builder().addStringField("Symbol").addDateTimeField("TradeDay").addInt32Field("Price").build();
        BaseRelTest.registerTable("Ticker", (BeamSqlTable)TestBoundedTable.of((Schema)schemaType).addRows(new Object[]{"a", "2020-07-01", 32, "a", "2020-06-01", 34, "a", "2020-07-02", 31, "a", "2020-08-30", 30, "a", "2020-08-31", 35, "a", "2020-10-01", 28, "a", "2020-10-15", 30, "a", "2020-11-01", 22, "a", "2020-11-08", 29, "a", "2020-12-10", 30, "b", "2020-12-01", 22, "c", "2020-05-16", 27, "c", "2020-09-14", 26, "c", "2020-10-13", 30}));
        String sql = "SELECT M.Symbol, M.Matchno, M.Startp, M.Bottomp, M.Endp, M.AvgpFROM Ticker MATCH_RECOGNIZE (PARTITION BY Symbol ORDER BY Tradeday MEASURES MATCH_NUMBER() AS Matchno, A.price AS Startp, LAST (B.Price) AS Bottomp, LAST (C.Price) AS ENDp, AVG (U.Price) AS Avgp AFTER MATCH SKIP PAST LAST ROW PATTERN (A B+ C+) SUBSET U = (A, B, C) DEFINE B AS B.Price < PREV (B.Price), C AS C.Price > PREV (C.Price) ) AS T";
        PCollection<Row> result = BaseRelTest.compilePipeline(sql, (Pipeline)this.pipeline);
        PAssert.that(result).containsInAnyOrder(TestUtils.RowsBuilder.of(Schema.FieldType.INT32, "id", Schema.FieldType.STRING, "name", Schema.FieldType.INT32, "proctime").addRows(1, "a", 1, 1, "b", 2, 1, "c", 3).getRows());
        this.pipeline.run().waitUntilFinish();
    }
}

