package org.apache.apex.malhar.sql;

import com.datatorrent.contrib.formatter.CsvFormatter;
import com.datatorrent.contrib.parser.CsvParser;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator;
import org.apache.apex.malhar.sql.table.CSVMessageFormat;
import org.apache.apex.malhar.sql.table.FileEndpoint;
import org.apache.apex.malhar.sql.table.KafkaEndpoint;
import org.apache.apex.malhar.sql.table.StreamEndpoint;
import org.apache.commons.io.FileUtils;
import org.junit.Test;

/* loaded from: input_file:org/apache/apex/malhar/sql/SerDeTest.class */
public class SerDeTest {
    @Test
    public void testSQLWithApexFactory() throws IOException, ClassNotFoundException {
        String readFileToString = FileUtils.readFileToString(new File("src/test/resources/model/model_file_csv.json"));
        LogicalPlan logicalPlan = new LogicalPlan();
        SQLExecEnvironment.getEnvironment().withModel(readFileToString).executeSQL(logicalPlan, "SELECT STREAM ROWTIME, PRODUCT FROM ORDERS");
        logicalPlan.validate();
    }

    @Test
    public void testSQLWithAPI() throws ClassNotFoundException, IOException {
        LogicalPlan logicalPlan = new LogicalPlan();
        SQLExecEnvironment.getEnvironment().registerTable("ORDERS", new FileEndpoint("dummyFilePath", new CSVMessageFormat("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}"))).executeSQL(logicalPlan, "SELECT STREAM FLOOR(ROWTIME TO HOUR), SUBSTRING(PRODUCT, 0, 5) FROM ORDERS WHERE id > 3");
        logicalPlan.validate();
    }

    @Test
    public void testSQLSelectInsertWithAPI() throws IOException, ClassNotFoundException {
        LogicalPlan logicalPlan = new LogicalPlan();
        SQLExecEnvironment.getEnvironment().registerTable("ORDERS", new FileEndpoint("dummyFilePathInput", new CSVMessageFormat("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}"))).registerTable("SALES", new FileEndpoint("dummyFilePathOutput", "out.tmp", new CSVMessageFormat("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss\"}},{\"name\":\"Product\",\"type\":\"String\"}]}"))).executeSQL(logicalPlan, "INSERT INTO SALES SELECT STREAM FLOOR(ROWTIME TO HOUR), SUBSTRING(PRODUCT, 0, 5) FROM ORDERS WHERE id > 3");
        logicalPlan.validate();
    }

    @Test
    public void testJoin() throws IOException, ClassNotFoundException {
        LogicalPlan logicalPlan = new LogicalPlan();
        SQLExecEnvironment.getEnvironment().registerTable("ORDERS", new KafkaEndpoint("localhost:9092", "testdata0", new CSVMessageFormat("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}"))).registerTable("CATEGORY", new KafkaEndpoint("localhost:9092", "testdata1", new CSVMessageFormat("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Category\",\"type\":\"String\"}]}"))).registerTable("SALES", new KafkaEndpoint("localhost:9092", "testresult", new CSVMessageFormat("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}},{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"Category\",\"type\":\"String\"}]}"))).registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str").executeSQL(logicalPlan, "INSERT INTO SALES SELECT STREAM A.ROWTIME, FLOOR(A.ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(A.PRODUCT, 6, 7)), B.CATEGORY FROM ORDERS AS A JOIN CATEGORY AS B ON A.id = B.id WHERE A.id > 3 AND A.PRODUCT LIKE 'paint%'");
        logicalPlan.validate();
    }

    @Test
    public void testJoinFilter() throws IOException, ClassNotFoundException {
        LogicalPlan logicalPlan = new LogicalPlan();
        SQLExecEnvironment.getEnvironment().registerTable("ORDERS", new KafkaEndpoint("localhost:9092", "testdata0", new CSVMessageFormat("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}"))).registerTable("CATEGORY", new KafkaEndpoint("localhost:9092", "testdata1", new CSVMessageFormat("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Category\",\"type\":\"String\"}]}"))).registerTable("SALES", new KafkaEndpoint("localhost:9092", "testresult", new CSVMessageFormat("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}},{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"Category\",\"type\":\"String\"}]}"))).registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str").executeSQL(logicalPlan, "INSERT INTO SALES SELECT STREAM A.ROWTIME, FLOOR(A.ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(A.PRODUCT, 6, 7)), B.CATEGORY FROM ORDERS AS A JOIN CATEGORY AS B ON A.id = B.id AND A.id > 3WHERE A.PRODUCT LIKE 'paint%'");
        logicalPlan.validate();
    }

    @Test
    public void testPortEndpoint() throws IOException, ClassNotFoundException {
        LogicalPlan logicalPlan = new LogicalPlan();
        KafkaSinglePortInputOperator addOperator = logicalPlan.addOperator("KafkaInput", KafkaSinglePortInputOperator.class);
        addOperator.setTopics("testdata0");
        addOperator.setInitialOffset("EARLIEST");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        addOperator.setConsumerProps(properties);
        addOperator.setClusters("localhost:9092");
        CsvParser addOperator2 = logicalPlan.addOperator("CSVParser", CsvParser.class);
        addOperator2.setSchema("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}");
        logicalPlan.addStream("KafkaToCSV", addOperator.outputPort, addOperator2.in);
        CsvFormatter addOperator3 = logicalPlan.addOperator("CSVFormatter", CsvFormatter.class);
        addOperator3.setSchema("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}},{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss Z\"}},{\"name\":\"Product\",\"type\":\"String\"}]}");
        KafkaSinglePortOutputOperator addOperator4 = logicalPlan.addOperator("KafkaOutput", KafkaSinglePortOutputOperator.class);
        addOperator4.setTopic("testresult");
        Properties properties2 = new Properties();
        properties2.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties2.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties2.put("bootstrap.servers", "localhost:9092");
        addOperator4.setProperties(properties2);
        logicalPlan.addStream("CSVToKafka", addOperator3.out, addOperator4.inputPort);
        SQLExecEnvironment.getEnvironment().registerTable("ORDERS", new StreamEndpoint(addOperator2.out, InputPOJO.class)).registerTable("SALES", new StreamEndpoint(addOperator3.in, OutputPOJO.class)).registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str").executeSQL(logicalPlan, "INSERT INTO SALES SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM ORDERS WHERE ID > 3 AND PRODUCT LIKE 'paint%'");
        logicalPlan.validate();
    }
}
