package org.apache.flink.streaming.siddhi;

import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedList;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
import org.apache.flink.streaming.siddhi.source.Event;
import org.apache.flink.streaming.siddhi.source.RandomEventSource;
import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
import org.apache.flink.streaming.siddhi.source.RandomWordSource;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/streaming/siddhi/SiddhiCEPITCase.class */
public class SiddhiCEPITCase extends AbstractTestBase implements Serializable {

    @Rule
    public transient TemporaryFolder tempFolder = new TemporaryFolder();

    @Test
    public void testSimpleWriteAndRead() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource fromElements = executionEnvironment.fromElements(new Event[]{Event.of(1, "start", 1.0d), Event.of(2, "middle", 2.0d), Event.of(3, "end", 3.0d), Event.of(4, "start", 4.0d), Event.of(5, "middle", 5.0d), Event.of(6, "end", 6.0d)});
        fromElements.transform("transformer", TypeInformation.of(Event.class), new StreamMap(new MapFunction<Event, Event>() { // from class: org.apache.flink.streaming.siddhi.SiddhiCEPITCase.1
            public Event map(Event event) throws Exception {
                return event;
            }
        })).writeAsText(this.tempFolder.newFile().toURI().toString());
        executionEnvironment.execute();
        Assert.assertEquals(6L, getLineCount(r0));
    }

    @Test
    public void testSimplePojoStreamAndReturnPojo() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream returns = SiddhiCEP.define("inputStream", executionEnvironment.fromElements(new Event[]{Event.of(1, "start", 1.0d), Event.of(2, "middle", 2.0d), Event.of(3, "end", 3.0d), Event.of(4, "start", 4.0d), Event.of(5, "middle", 5.0d), Event.of(6, "end", 6.0d)}), new String[]{"id", "name", "price"}).cql("from inputStream insert into  outputStream").returns("outputStream", Event.class);
        this.tempFolder.newFile().toURI().toString();
        returns.print();
        executionEnvironment.execute();
    }

    @Test
    public void testUnboundedPojoSourceAndReturnTuple() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SiddhiCEP.define("inputStream", executionEnvironment.addSource(new RandomEventSource(5)), new String[]{"id", "name", "price", "timestamp"}).cql("from inputStream select timestamp, id, name, price insert into  outputStream").returns("outputStream").map(new MapFunction<Tuple4<Long, Integer, String, Double>, Integer>() { // from class: org.apache.flink.streaming.siddhi.SiddhiCEPITCase.2
            public Integer map(Tuple4<Long, Integer, String, Double> tuple4) throws Exception {
                return (Integer) tuple4.f1;
            }
        }).writeAsText(this.tempFolder.newFile().toURI().toString(), FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        Assert.assertEquals(5L, getLineCount(r0));
    }

    @Test
    public void testUnboundedTupleSourceAndReturnTuple() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SiddhiCEP.define("inputStream", executionEnvironment.addSource(new RandomTupleSource(5).closeDelay(1500L)).keyBy(new int[]{1}), new String[]{"id", "name", "price", "timestamp"}).cql("from inputStream select timestamp, id, name, price insert into  outputStream").returns("outputStream").writeAsText(this.tempFolder.newFile().toURI().toString(), FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        Assert.assertEquals(5L, getLineCount(r0));
    }

    @Test
    public void testUnboundedPrimitiveTypeSourceAndReturnTuple() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SiddhiCEP.define("wordStream", executionEnvironment.addSource(new RandomWordSource(5).closeDelay(1500L)), new String[]{"words"}).cql("from wordStream select words insert into  outputStream").returns("outputStream").writeAsText(this.tempFolder.newFile().toURI().toString(), FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        Assert.assertEquals(5L, getLineCount(r0));
    }

    @Test(expected = InvalidTypesException.class)
    public void testUnboundedPojoSourceButReturnInvalidTupleType() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SiddhiCEP.define("inputStream", executionEnvironment.addSource(new RandomEventSource(5).closeDelay(1500L)), new String[]{"id", "name", "price", "timestamp"}).cql("from inputStream select timestamp, id, name, price insert into  outputStream").returns("outputStream").map(new MapFunction<Tuple5<Long, Integer, String, Double, Long>, Long>() { // from class: org.apache.flink.streaming.siddhi.SiddhiCEPITCase.3
            public Long map(Tuple5<Long, Integer, String, Double, Long> tuple5) throws Exception {
                return (Long) tuple5.f0;
            }
        }).writeAsText(this.tempFolder.newFile().toURI().toString(), FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        Assert.assertEquals(5L, getLineCount(r0));
        executionEnvironment.execute();
    }

    @Test
    public void testUnboundedPojoStreamAndReturnMap() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        SiddhiCEP.define("inputStream", executionEnvironment.addSource(new RandomEventSource(5)), new String[]{"id", "name", "price", "timestamp"}).cql("from inputStream select timestamp, id, name, price insert into  outputStream").returnAsMap("outputStream").writeAsText(this.tempFolder.newFile().toURI().toString(), FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        Assert.assertEquals(5L, getLineCount(r0));
    }

    @Test
    public void testUnboundedPojoStreamAndReturnPojo() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource addSource = executionEnvironment.addSource(new RandomEventSource(5));
        addSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() { // from class: org.apache.flink.streaming.siddhi.SiddhiCEPITCase.4
            public long extractAscendingTimestamp(Event event) {
                return event.getTimestamp();
            }
        });
        SiddhiCEP.define("inputStream", addSource, new String[]{"id", "name", "price", "timestamp"}).cql("from inputStream select timestamp, id, name, price insert into  outputStream").returns("outputStream", Event.class).writeAsText(this.tempFolder.newFile().toURI().toString(), FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        Assert.assertEquals(5L, getLineCount(r0));
    }

    @Test
    public void testMultipleUnboundedPojoStreamSimpleUnion() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource addSource = executionEnvironment.addSource(new RandomEventSource(2), "input1");
        SiddhiCEP.define("inputStream1", addSource, new String[]{"id", "name", "price", "timestamp"}).union("inputStream2", executionEnvironment.addSource(new RandomEventSource(2), "input2"), new String[]{"id", "name", "price", "timestamp"}).union("inputStream3", executionEnvironment.addSource(new RandomEventSource(2), "input2"), new String[]{"id", "name", "price", "timestamp"}).cql("from inputStream1 select timestamp, id, name, price insert into outputStream;from inputStream2 select timestamp, id, name, price insert into outputStream;from inputStream3 select timestamp, id, name, price insert into outputStream;").returns("outputStream", Event.class).writeAsText(this.tempFolder.newFile().toURI().toString(), FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        Assert.assertEquals(6L, getLineCount(r0));
    }

    @Test
    public void testMultipleUnboundedPojoStreamUnionAndJoinWithWindow() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SiddhiCEP.define("inputStream1", executionEnvironment.addSource(new RandomEventSource(5), "input1").keyBy(new String[]{"id"}), new String[]{"id", "name", "price", "timestamp"}).union("inputStream2", executionEnvironment.addSource(new RandomEventSource(5), "input2").keyBy(new String[]{"id"}), new String[]{"id", "name", "price", "timestamp"}).cql("from inputStream1#window.length(5) as s1 join inputStream2#window.time(500) as s2 on s1.id == s2.id select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 insert into JoinStream;").returnAsMap("JoinStream").writeAsText(this.tempFolder.newFile().toURI().toString(), FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        Assert.assertEquals(5L, getLineCount(r0));
    }

    @Test
    public void testUnboundedPojoStreamSimplePatternMatch() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream returnAsMap = SiddhiCEP.define("inputStream1", executionEnvironment.addSource(new RandomEventSource(5).closeDelay(1500L), "input1").keyBy(new String[]{"name"}), new String[]{"id", "name", "price", "timestamp"}).union("inputStream2", executionEnvironment.addSource(new RandomEventSource(5).closeDelay(1500L), "input2").keyBy(new String[]{"name"}), new String[]{"id", "name", "price", "timestamp"}).cql("from every s1 = inputStream1[id == 2]  -> s2 = inputStream2[id == 3] select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 insert into outputStream").returnAsMap("outputStream");
        String uri = this.tempFolder.newFile().toURI().toString();
        returnAsMap.writeAsText(uri, FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        Assert.assertEquals(1L, getLineCount(uri));
        compareResultsByLinesInMemory("{id_1=2, name_1=test_event, id_2=3, name_2=test_event}", uri);
    }

    @Test
    public void testUnboundedPojoStreamSimpleSequences() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource addSource = executionEnvironment.addSource(new RandomEventSource(5).closeDelay(1500L), "input1");
        SiddhiCEP.define("inputStream1", addSource.keyBy(new String[]{"name"}), new String[]{"id", "name", "price", "timestamp"}).union("inputStream2", addSource.keyBy(new String[]{"name"}), new String[]{"id", "name", "price", "timestamp"}).cql("from every s1 = inputStream1[id == 2]+ , s2 = inputStream2[id == 3]? within 1000 second select s1[0].name as n1, s2.name as n2 insert into outputStream").returnAsMap("outputStream").writeAsText(this.tempFolder.newFile().toURI().toString(), FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        Assert.assertEquals(1L, getLineCount(r0));
    }

    private static int getLineCount(String str) throws IOException {
        LinkedList linkedList = new LinkedList();
        readAllResultLines(linkedList, str);
        return linkedList.size();
    }

    @Test
    public void testCustomizeSiddhiFunctionExtension() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource addSource = executionEnvironment.addSource(new RandomEventSource(5));
        SiddhiCEP siddhiEnvironment = SiddhiCEP.getSiddhiEnvironment(executionEnvironment);
        siddhiEnvironment.registerExtension("custom:plus", CustomPlusFunctionExtension.class);
        siddhiEnvironment.from("inputStream", addSource, new String[]{"id", "name", "price", "timestamp"}).cql("from inputStream select timestamp, id, name, custom:plus(price,price) as doubled_price insert into  outputStream").returnAsMap("outputStream").writeAsText(this.tempFolder.newFile().toURI().toString(), FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        Assert.assertEquals(5L, getLineCount(r0));
    }

    @Test
    public void testRegisterStreamAndExtensionWithSiddhiCEPEnvironment() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource addSource = executionEnvironment.addSource(new RandomEventSource(5), "input1");
        DataStreamSource addSource2 = executionEnvironment.addSource(new RandomEventSource(5), "input2");
        SiddhiCEP siddhiEnvironment = SiddhiCEP.getSiddhiEnvironment(executionEnvironment);
        siddhiEnvironment.registerExtension("custom:plus", CustomPlusFunctionExtension.class);
        siddhiEnvironment.registerStream("inputStream1", addSource.keyBy(new String[]{"id"}), new String[]{"id", "name", "price", "timestamp"});
        siddhiEnvironment.registerStream("inputStream2", addSource2.keyBy(new String[]{"id"}), new String[]{"id", "name", "price", "timestamp"});
        siddhiEnvironment.from("inputStream1").union(new String[]{"inputStream2"}).cql("from inputStream1#window.length(5) as s1 join inputStream2#window.time(500) as s2 on s1.id == s2.id select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 insert into JoinStream;").returns("JoinStream").writeAsText(this.tempFolder.newFile().toURI().toString(), FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        Assert.assertEquals(5L, getLineCount(r0));
    }

    @Test(expected = UndefinedStreamException.class)
    public void testTriggerUndefinedStreamException() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource addSource = executionEnvironment.addSource(new RandomEventSource(5), "input1");
        SiddhiCEP siddhiEnvironment = SiddhiCEP.getSiddhiEnvironment(executionEnvironment);
        siddhiEnvironment.registerStream("inputStream1", addSource.keyBy(new String[]{"id"}), new String[]{"id", "name", "price", "timestamp"});
        siddhiEnvironment.from("inputStream1").union(new String[]{"inputStream2"}).cql("from inputStream1#window.length(5) as s1 join inputStream2#window.time(500) as s2 on s1.id == s2.id select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 insert into JoinStream;").returnAsMap("JoinStream").writeAsText(this.tempFolder.newFile().toURI().toString(), FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
    }
}
