package org.apache.apex.malhar.contrib.parser;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.lib.helper.TestPortContext;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
import com.datatorrent.lib.testbench.CollectorTestSink;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;
import java.util.ListIterator;
import javax.validation.ConstraintViolationException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.python.google.common.collect.Lists;

/* loaded from: input_file:org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest.class */
public class StreamingJsonParserTest {
    public static final String fieldInfoInitMap = "id:id:INTEGER,name:name:STRING,gpa:gpa:DOUBLE";
    public static final String nestedFieldInfoMap = "id:id:INTEGER,name:name:STRING,gpa:gpa:DOUBLE,streetAddress:streetAddress:STRING,city:city:STRING,state:state:STRING,postalCode:postalCode:STRING";
    public static final String invalidFieldInfoMap = "Field1:id:INTEGER,name:name:STRING,gpa:gpa:DOUBLE";
    private static final String FILENAME = "/tmp/streaming.json";
    CollectorTestSink<Object> outputSink = new CollectorTestSink<>();
    CollectorTestSink<Object> errorSink = new CollectorTestSink<>();
    StreamingJsonParser jsonParser = new StreamingJsonParser();
    private List<String> recordList = null;

    @Rule
    public TestMeta testMeta = new TestMeta();

    /* loaded from: input_file:org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest$FileInputOperator.class */
    public static class FileInputOperator extends AbstractFileInputOperator<String> {
        public final transient DefaultOutputPort<byte[]> output = new DefaultOutputPort<>();
        protected transient BufferedReader br;

        protected InputStream openFile(Path path) throws IOException {
            InputStream openFile = super.openFile(path);
            this.br = new BufferedReader(new InputStreamReader(openFile));
            return openFile;
        }

        protected void closeFile(InputStream inputStream) throws IOException {
            super.closeFile(inputStream);
            this.br.close();
            this.br = null;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: readEntity, reason: merged with bridge method [inline-methods] */
        public String m122readEntity() throws IOException {
            return this.br.readLine();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void emit(String str) {
            this.output.emit(str.getBytes());
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest$JsonStreamingParserApp.class */
    public static class JsonStreamingParserApp implements StreamingApplication {
        StreamingJsonParser parser;
        FileInputOperator fileInput;

        public FileInputOperator getFileInput() {
            return this.fileInput;
        }

        public void setFileInput(FileInputOperator fileInputOperator) {
            this.fileInput = fileInputOperator;
        }

        public StreamingJsonParser getParser() {
            return this.parser;
        }

        public void setParser(StreamingJsonParser streamingJsonParser) {
            this.parser = streamingJsonParser;
        }

        public void populateDAG(DAG dag, Configuration configuration) {
            FileInputOperator addOperator = dag.addOperator("fileInput", getFileInput());
            StreamingJsonParser addOperator2 = dag.addOperator("parser", getParser());
            dag.getMeta(addOperator2).getMeta(addOperator2.output).getAttributes().put(Context.PortContext.TUPLE_CLASS, Person.class);
            ConsoleOutputOperator addOperator3 = dag.addOperator("output", new ConsoleOutputOperator());
            dag.addStream("Input", addOperator.output, addOperator2.in).setLocality(DAG.Locality.CONTAINER_LOCAL);
            dag.addStream("pojo", addOperator2.output, addOperator3.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest$Person.class */
    public static class Person {
        private Integer id;
        private String name;
        private Double gpa;
        private String streetAddress;
        private String city;
        private String postalCode;
        private String state;

        public Integer getId() {
            return this.id;
        }

        public void setId(Integer num) {
            this.id = num;
        }

        public String getName() {
            return this.name;
        }

        public void setName(String str) {
            this.name = str;
        }

        public Double getGpa() {
            return this.gpa;
        }

        public void setGpa(Double d) {
            this.gpa = d;
        }

        public String getStreetAddress() {
            return this.streetAddress;
        }

        public void setStreetAddress(String str) {
            this.streetAddress = str;
        }

        public String getCity() {
            return this.city;
        }

        public void setCity(String str) {
            this.city = str;
        }

        public String getPostalCode() {
            return this.postalCode;
        }

        public void setPostalCode(String str) {
            this.postalCode = str;
        }

        public String getState() {
            return this.state;
        }

        public void setState(String str) {
            this.state = str;
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/contrib/parser/StreamingJsonParserTest$TestMeta.class */
    public class TestMeta extends TestWatcher {
        Context.OperatorContext context;
        Context.PortContext portContext;
        public String dir = null;

        public TestMeta() {
        }

        protected void starting(Description description) {
            this.dir = "target/" + description.getClassName() + "/" + description.getMethodName();
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap.put(Context.PortContext.TUPLE_CLASS, Person.class);
            this.portContext = new TestPortContext(defaultAttributeMap);
            super.starting(description);
            StreamingJsonParserTest.this.jsonParser.output.setup(StreamingJsonParserTest.this.testMeta.portContext);
            StreamingJsonParserTest.this.jsonParser.output.setSink(StreamingJsonParserTest.this.outputSink);
            StreamingJsonParserTest.this.jsonParser.err.setSink(StreamingJsonParserTest.this.errorSink);
        }

        protected void finished(Description description) {
            try {
                FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
                StreamingJsonParserTest.this.jsonParser.teardown();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Test
    public void testReads() throws Exception {
        createReaderInput(5);
        this.jsonParser.setFieldMappingString(fieldInfoInitMap);
        this.jsonParser.setup(this.testMeta.context);
        this.jsonParser.output.setup(this.testMeta.portContext);
        this.jsonParser.beginWindow(0L);
        ListIterator<String> listIterator = this.recordList.listIterator();
        while (listIterator.hasNext()) {
            this.jsonParser.in.process(listIterator.next().getBytes());
        }
        this.jsonParser.endWindow();
        Assert.assertEquals("Number of tuples", 5, this.outputSink.collectedTuples.size());
        Assert.assertEquals("Name is", "name-5", ((Person) this.outputSink.collectedTuples.get(0)).getName());
        this.jsonParser.teardown();
    }

    @Test
    public void testNestedReads() throws Exception {
        createReaderInput(4);
        this.jsonParser.setFieldMappingString(nestedFieldInfoMap);
        this.jsonParser.setup(this.testMeta.context);
        this.jsonParser.output.setup(this.testMeta.portContext);
        this.jsonParser.beginWindow(0L);
        ListIterator<String> listIterator = this.recordList.listIterator();
        while (listIterator.hasNext()) {
            this.jsonParser.in.process(listIterator.next().getBytes());
        }
        this.jsonParser.endWindow();
        Person person = (Person) this.outputSink.collectedTuples.get(0);
        Assert.assertEquals("Number of tuples", 4, this.outputSink.collectedTuples.size());
        Assert.assertEquals("Name is", "name-4", person.getName());
        this.jsonParser.teardown();
    }

    @Test
    public void testReadsWithReflection() throws Exception {
        createReaderInput(6);
        this.jsonParser.setFieldMappingString((String) null);
        this.jsonParser.setup(this.testMeta.context);
        this.jsonParser.output.setup(this.testMeta.portContext);
        this.jsonParser.beginWindow(0L);
        ListIterator<String> listIterator = this.recordList.listIterator();
        while (listIterator.hasNext()) {
            this.jsonParser.in.process(listIterator.next().getBytes());
        }
        this.jsonParser.endWindow();
        Person person = (Person) this.outputSink.collectedTuples.get(0);
        Assert.assertEquals("Number of tuples", 6, this.outputSink.collectedTuples.size());
        Assert.assertEquals("Name is", "name-6", person.getName());
        this.jsonParser.teardown();
    }

    @Test
    public void testInvalidKeyMapping() throws Exception {
        createReaderInput(6);
        this.jsonParser.setFieldMappingString(invalidFieldInfoMap);
        this.jsonParser.setup(this.testMeta.context);
        this.jsonParser.output.setup(this.testMeta.portContext);
        this.jsonParser.beginWindow(0L);
        ListIterator<String> listIterator = this.recordList.listIterator();
        while (listIterator.hasNext()) {
            this.jsonParser.in.process(listIterator.next().getBytes());
        }
        this.jsonParser.endWindow();
        Person person = (Person) this.outputSink.collectedTuples.get(0);
        Assert.assertEquals("Number of tuples", 6, this.outputSink.collectedTuples.size());
        Assert.assertEquals("Id is", (Object) null, person.getId());
        Assert.assertEquals("Name is", "name-6", person.getName());
        this.jsonParser.teardown();
    }

    private void createReaderInput(int i) {
        this.recordList = Lists.newArrayList();
        while (i > 0) {
            StringBuilder sb = new StringBuilder();
            sb.append("{").append("\"id\"").append(":").append(i).append(",");
            sb.append("\"name\":").append("\"").append("name-" + i).append("\"").append(",");
            sb.append("\"Elective-0\":").append("\"").append("elective-" + (i * 1)).append("\"").append(",");
            sb.append("\"Elective-1\":").append("\"").append("elective-" + (i * 2)).append("\"").append(",");
            sb.append("\"Elective-2\":").append("\"").append("elective-" + (i * 3)).append("\"").append(",");
            sb.append("\"Elective-3\":").append("\"").append("elective-" + (i * 4)).append("\"").append(",");
            sb.append("\"gpa\":").append(i * 2.0d).append(",");
            sb.append("\"address\":{\"streetAddress\": \"21 2nd Street\",\"city\": \"New York\",\"state\": \"NY\",\"postalCode\": \"10021\"}").append("}");
            i--;
            this.recordList.add(sb.toString());
        }
    }

    private void writeJsonInputFile(File file) {
        try {
            if (!file.exists()) {
                file.createNewFile();
            }
            BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(file.getAbsoluteFile()));
            ListIterator<String> listIterator = this.recordList.listIterator();
            while (listIterator.hasNext()) {
                bufferedWriter.write(listIterator.next().toString());
            }
            bufferedWriter.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testApplicationWithPojoConversion() throws IOException, Exception {
        try {
            FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
            createReaderInput(7);
            writeJsonInputFile(new File(FILENAME));
            FileInputOperator fileInputOperator = new FileInputOperator();
            fileInputOperator.setDirectory(this.testMeta.dir);
            LocalMode newInstance = LocalMode.newInstance();
            Configuration configuration = new Configuration(false);
            JsonStreamingParserApp jsonStreamingParserApp = new JsonStreamingParserApp();
            jsonStreamingParserApp.setParser(this.jsonParser);
            jsonStreamingParserApp.setFileInput(fileInputOperator);
            newInstance.prepareDAG(jsonStreamingParserApp, configuration);
            newInstance.getController().run(10000L);
        } catch (ConstraintViolationException e) {
            Assert.fail("constraint violations: " + e.getConstraintViolations());
        }
    }
}
