package org.apache.apex.malhar.sql;

import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.TimeZone;
import javax.validation.ConstraintViolationException;
import org.apache.apex.malhar.sql.table.CSVMessageFormat;
import org.apache.apex.malhar.sql.table.FileEndpoint;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/apex/malhar/sql/FileEndpointTest.class */
public class FileEndpointTest {
    private TimeZone defaultTZ;
    private static String outputFolder = "target/output/";

    @Rule
    public TestName testName = new TestName();

    /* loaded from: input_file:org/apache/apex/malhar/sql/FileEndpointTest$Application.class */
    public static class Application implements StreamingApplication {
        String model;

        public Application(String str) {
            this.model = str;
        }

        public void populateDAG(DAG dag, Configuration configuration) {
            SQLExecEnvironment.getEnvironment().withModel(this.model).executeSQL(dag, "SELECT STREAM ROWTIME, PRODUCT FROM ORDERS");
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/sql/FileEndpointTest$ApplicationSelectInsertWithAPI.class */
    public static class ApplicationSelectInsertWithAPI implements StreamingApplication {
        public void populateDAG(DAG dag, Configuration configuration) {
            SQLExecEnvironment.getEnvironment().registerTable("ORDERS", new FileEndpoint("src/test/resources/input.csv", new CSVMessageFormat("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}"))).registerTable("SALES", new FileEndpoint(FileEndpointTest.outputFolder, "out.tmp", new CSVMessageFormat("{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}},{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}},{\"name\":\"Product\",\"type\":\"String\"}]}"))).registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str").executeSQL(dag, "INSERT INTO SALES SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM ORDERS WHERE ID > 3 AND PRODUCT LIKE 'paint%'");
        }
    }

    public static String apex_concat_str(String str, String str2) {
        return str + str2;
    }

    @Before
    public void setUp() throws Exception {
        this.defaultTZ = TimeZone.getDefault();
        TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
        outputFolder += this.testName.getMethodName() + "/";
    }

    @After
    public void tearDown() throws Exception {
        TimeZone.setDefault(this.defaultTZ);
    }

    @Test
    public void testApplication() throws Exception {
        String readFileToString = FileUtils.readFileToString(new File("src/test/resources/model/model_file_csv.json"));
        PrintStream printStream = System.out;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        System.setOut(new PrintStream(byteArrayOutputStream));
        try {
            LocalMode newInstance = LocalMode.newInstance();
            newInstance.prepareDAG(new Application(readFileToString), new Configuration(false));
            LocalMode.Controller controller = newInstance.getController();
            controller.runAsync();
            waitTillStdoutIsPopulated(byteArrayOutputStream, 30000);
            controller.shutdown();
        } catch (ConstraintViolationException e) {
            Assert.fail("constraint violations: " + e.getConstraintViolations());
        } catch (Exception e2) {
            Assert.fail("Exception: " + e2);
        }
        System.setOut(printStream);
        Collection filter = Collections2.filter(Arrays.asList(byteArrayOutputStream.toString().split(System.lineSeparator())), Predicates.containsPattern("Delta Record:"));
        String[] strArr = (String[]) filter.toArray(new String[filter.size()]);
        Assert.assertEquals(6L, strArr.length);
        Assert.assertTrue(strArr[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1"));
        Assert.assertTrue(strArr[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2"));
        Assert.assertTrue(strArr[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3"));
        Assert.assertTrue(strArr[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4"));
        Assert.assertTrue(strArr[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5"));
        Assert.assertTrue(strArr[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6"));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v15, types: [java.util.Collection] */
    private boolean waitTillStdoutIsPopulated(ByteArrayOutputStream byteArrayOutputStream, int i) throws InterruptedException, IOException {
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList newArrayList = Lists.newArrayList();
        while (System.currentTimeMillis() - currentTimeMillis < i) {
            byteArrayOutputStream.flush();
            newArrayList = Collections2.filter(Arrays.asList(byteArrayOutputStream.toString().split(System.lineSeparator())), Predicates.containsPattern("Delta Record:"));
            if (newArrayList.size() != 0) {
                break;
            }
            Thread.sleep(500L);
        }
        return newArrayList.size() != 0;
    }

    @Test
    public void testApplicationSelectInsertWithAPI() throws Exception {
        try {
            LocalMode newInstance = LocalMode.newInstance();
            newInstance.prepareDAG(new ApplicationSelectInsertWithAPI(), new Configuration(false));
            LocalMode.Controller controller = newInstance.getController();
            controller.runAsync();
            Assert.assertTrue(waitTillFileIsPopulated(outputFolder, 40000));
            controller.shutdown();
        } catch (Exception e) {
            Assert.fail("constraint violations: " + e);
        }
        List readLines = FileUtils.readLines(new File(outputFolder + new File(outputFolder).list()[0]));
        Assert.assertTrue(Arrays.deepEquals((String[]) readLines.toArray(new String[readLines.size()]), new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4", "", "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5", ""}));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Removed duplicated region for block: B:21:0x00ca  */
    /* JADX WARN: Type inference failed for: r0v40, types: [java.util.List] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private boolean waitTillFileIsPopulated(java.lang.String r8, int r9) throws java.io.IOException, java.lang.InterruptedException {
        /*
            Method dump skipped, instructions count: 286
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.apex.malhar.sql.FileEndpointTest.waitTillFileIsPopulated(java.lang.String, int):boolean");
    }
}
