package com.datatorrent.contrib.avro;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.helper.TestPortContext;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import javax.validation.ConstraintViolationException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.python.google.common.collect.Lists;

/* loaded from: input_file:com/datatorrent/contrib/avro/AvroFileInputOperatorTest.class */
public class AvroFileInputOperatorTest {
    private static final String AVRO_SCHEMA = "{\"namespace\":\"abc\",\"type\":\"record\",\"doc\":\"Order schema\",\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\",\"type\": \"long\"},{\"name\":\"customerId\",\"type\": \"int\"},{\"name\":\"total\",\"type\": \"double\"},{\"name\":\"customerName\",\"type\": \"string\"}]}";
    private static final String FILENAME = "/tmp/simpleorder.avro";
    private static final String OTHER_FILE = "/tmp/simpleorder2.avro";
    private static final String ERROR_FILE = "/tmp/errorFile.avro";
    CollectorTestSink<Object> output = new CollectorTestSink<>();
    CollectorTestSink<Object> completedFilesPort = new CollectorTestSink<>();
    CollectorTestSink<Object> errorRecordsPort = new CollectorTestSink<>();
    AvroFileInputOperator avroFileInput = new AvroFileInputOperator();
    private List<GenericRecord> recordList = null;

    @Rule
    public TestMeta testMeta = new TestMeta();

    /* loaded from: input_file:com/datatorrent/contrib/avro/AvroFileInputOperatorTest$AvroReaderApplication.class */
    public static class AvroReaderApplication implements StreamingApplication {
        AvroFileInputOperator avroFileInputOperator;

        public AvroFileInputOperator getAvroFileInput() {
            return this.avroFileInputOperator;
        }

        public void setAvroFileInputOperator(AvroFileInputOperator avroFileInputOperator) {
            this.avroFileInputOperator = avroFileInputOperator;
        }

        public void populateDAG(DAG dag, Configuration configuration) {
            dag.addStream("pojo", dag.addOperator("avroInputOperator", getAvroFileInput()).output, dag.addOperator("GenericRecordOp", new ConsoleOutputOperator()).input).setLocality(DAG.Locality.CONTAINER_LOCAL);
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/avro/AvroFileInputOperatorTest$AvroToPojoApplication.class */
    public static class AvroToPojoApplication implements StreamingApplication {
        AvroFileInputOperator avroFileInputOperator;
        AvroToPojo avroToPojo;

        public AvroFileInputOperator getAvroFileInput() {
            return this.avroFileInputOperator;
        }

        public void setAvroFileInputOperator(AvroFileInputOperator avroFileInputOperator) {
            this.avroFileInputOperator = avroFileInputOperator;
        }

        public void setAvroToPojo(AvroToPojo avroToPojo) {
            this.avroToPojo = avroToPojo;
        }

        public AvroToPojo getAvroToPojo() {
            return this.avroToPojo;
        }

        public void populateDAG(DAG dag, Configuration configuration) {
            AvroFileInputOperator addOperator = dag.addOperator("avroInputOperator", getAvroFileInput());
            AvroToPojo addOperator2 = dag.addOperator("AvroToPojo", getAvroToPojo());
            ConsoleOutputOperator addOperator3 = dag.addOperator("GenericRecordOp", new ConsoleOutputOperator());
            dag.getMeta(addOperator2).getMeta(addOperator2.output).getAttributes().put(Context.PortContext.TUPLE_CLASS, SimpleOrder.class);
            dag.addStream("GenericRecords", addOperator.output, addOperator2.data).setLocality(DAG.Locality.THREAD_LOCAL);
            dag.addStream("POJO", addOperator2.output, addOperator3.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/avro/AvroFileInputOperatorTest$SimpleOrder.class */
    public static class SimpleOrder {
        private Integer customerId;
        private Long orderId;
        private Double total;
        private String customerName;

        public SimpleOrder() {
        }

        public SimpleOrder(int i, long j, double d, String str) {
            setCustomerId(Integer.valueOf(i));
            setOrderId(Long.valueOf(j));
            setTotal(Double.valueOf(d));
            setCustomerName(str);
        }

        public String getCustomerName() {
            return this.customerName;
        }

        public void setCustomerName(String str) {
            this.customerName = str;
        }

        public Integer getCustomerId() {
            return this.customerId;
        }

        public void setCustomerId(Integer num) {
            this.customerId = num;
        }

        public Long getOrderId() {
            return this.orderId;
        }

        public void setOrderId(Long l) {
            this.orderId = l;
        }

        public Double getTotal() {
            return this.total;
        }

        public void setTotal(Double d) {
            this.total = d;
        }

        public String toString() {
            return "SimpleOrder [customerId=" + this.customerId + ", orderId=" + this.orderId + ", total=" + this.total + ", customerName=" + this.customerName + "]";
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/avro/AvroFileInputOperatorTest$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        public String dir = null;
        Context.OperatorContext context;
        Context.PortContext portContext;

        protected void starting(Description description) {
            this.dir = "target/" + description.getClassName() + "/" + description.getMethodName();
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap.put(Context.DAGContext.APPLICATION_PATH, this.dir);
            this.context = new OperatorContextTestHelper.TestIdOperatorContext(1, defaultAttributeMap);
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap2 = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap2.put(Context.PortContext.TUPLE_CLASS, SimpleOrder.class);
            this.portContext = new TestPortContext(defaultAttributeMap2);
        }

        protected void finished(Description description) {
            try {
                FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Test
    public void testSingleFileAvroReads() throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        createAvroInput(7);
        writeAvroFile(new File(FILENAME));
        this.avroFileInput.output.setSink(this.output);
        this.avroFileInput.completedFilesPort.setSink(this.completedFilesPort);
        this.avroFileInput.errorRecordsPort.setSink(this.errorRecordsPort);
        this.avroFileInput.setDirectory(this.testMeta.dir);
        this.avroFileInput.setup(this.testMeta.context);
        this.avroFileInput.beginWindow(0L);
        this.avroFileInput.emitTuples();
        this.avroFileInput.emitTuples();
        Assert.assertEquals("Record count", 7, this.avroFileInput.recordCount);
        this.avroFileInput.endWindow();
        Assert.assertEquals("number tuples", 7, this.output.collectedTuples.size());
        Assert.assertEquals("Error tuples", 0L, this.errorRecordsPort.collectedTuples.size());
        Assert.assertEquals("Completed File", 1L, this.completedFilesPort.collectedTuples.size());
        this.avroFileInput.teardown();
    }

    @Test
    public void testMultipleFileAvroReads() throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        createAvroInput(7);
        writeAvroFile(new File(FILENAME));
        writeAvroFile(new File(OTHER_FILE));
        this.avroFileInput.output.setSink(this.output);
        this.avroFileInput.completedFilesPort.setSink(this.completedFilesPort);
        this.avroFileInput.errorRecordsPort.setSink(this.errorRecordsPort);
        this.avroFileInput.setDirectory(this.testMeta.dir);
        this.avroFileInput.setup(this.testMeta.context);
        this.avroFileInput.beginWindow(0L);
        this.avroFileInput.emitTuples();
        this.avroFileInput.beginWindow(1L);
        this.avroFileInput.emitTuples();
        Assert.assertEquals("number tuples after window 0", 7, this.output.collectedTuples.size());
        this.avroFileInput.emitTuples();
        this.avroFileInput.endWindow();
        Assert.assertEquals("Error tuples", 0L, this.errorRecordsPort.collectedTuples.size());
        Assert.assertEquals("number tuples after window 1", 2 * 7, this.output.collectedTuples.size());
        Assert.assertEquals("Completed File", 2L, this.completedFilesPort.collectedTuples.size());
        this.avroFileInput.teardown();
    }

    @Test
    public void testInvalidFormatFailure() throws Exception {
        FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
        writeErrorFile(7, new File(ERROR_FILE));
        this.avroFileInput.output.setSink(this.output);
        this.avroFileInput.setDirectory(this.testMeta.dir);
        this.avroFileInput.setup(this.testMeta.context);
        this.avroFileInput.beginWindow(0L);
        this.avroFileInput.emitTuples();
        this.avroFileInput.emitTuples();
        this.avroFileInput.endWindow();
        Assert.assertEquals("number tuples after window 1", 0L, this.output.collectedTuples.size());
        this.avroFileInput.teardown();
    }

    private void createAvroInput(int i) {
        this.recordList = Lists.newArrayList();
        while (i > 0) {
            GenericRecord record = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA));
            record.put("orderId", Long.valueOf(i * 1));
            record.put("customerId", Integer.valueOf(i * 2));
            record.put("total", Double.valueOf(i * 1.5d));
            record.put("customerName", "*" + i + "*");
            i--;
            this.recordList.add(record);
        }
    }

    private void writeErrorFile(int i, File file) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        HashSet newHashSet = Sets.newHashSet();
        for (int i2 = 0; i2 < 5; i2++) {
            newHashSet.add("f0l" + i2);
        }
        newArrayList.addAll(newHashSet);
        FileUtils.write(file, StringUtils.join(newHashSet, '\n'));
        FileUtils.moveFileToDirectory(new File(file.getAbsolutePath()), new File(this.testMeta.dir), true);
    }

    private void writeAvroFile(File file) throws IOException {
        DataFileWriter dataFileWriter = new DataFileWriter(new GenericDatumWriter(new Schema.Parser().parse(AVRO_SCHEMA)));
        dataFileWriter.create(new Schema.Parser().parse(AVRO_SCHEMA), file);
        Iterator<GenericRecord> it = this.recordList.iterator();
        while (it.hasNext()) {
            dataFileWriter.append(it.next());
        }
        dataFileWriter.close();
        FileUtils.moveFileToDirectory(new File(file.getAbsolutePath()), new File(this.testMeta.dir), true);
    }

    @Test
    public void testApplication() throws IOException, Exception {
        try {
            FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
            createAvroInput(7);
            writeAvroFile(new File(FILENAME));
            createAvroInput(7 - 2);
            writeAvroFile(new File(OTHER_FILE));
            this.avroFileInput.setDirectory(this.testMeta.dir);
            LocalMode newInstance = LocalMode.newInstance();
            Configuration configuration = new Configuration(false);
            AvroReaderApplication avroReaderApplication = new AvroReaderApplication();
            avroReaderApplication.setAvroFileInputOperator(this.avroFileInput);
            newInstance.prepareDAG(avroReaderApplication, configuration);
            newInstance.getController().run(10000L);
        } catch (ConstraintViolationException e) {
            Assert.fail("constraint violations: " + e.getConstraintViolations());
        }
    }

    @Test
    public void testApplicationWithPojoConversion() throws IOException, Exception {
        try {
            FileContext.getLocalFSFileContext().delete(new Path(new File(this.testMeta.dir).getAbsolutePath()), true);
            createAvroInput(7);
            writeAvroFile(new File(FILENAME));
            createAvroInput(7 - 2);
            writeAvroFile(new File(OTHER_FILE));
            this.avroFileInput.setDirectory(this.testMeta.dir);
            AvroToPojo avroToPojo = new AvroToPojo();
            avroToPojo.setPojoClass(SimpleOrder.class);
            LocalMode newInstance = LocalMode.newInstance();
            Configuration configuration = new Configuration(false);
            AvroToPojoApplication avroToPojoApplication = new AvroToPojoApplication();
            avroToPojoApplication.setAvroFileInputOperator(this.avroFileInput);
            avroToPojoApplication.setAvroToPojo(avroToPojo);
            newInstance.prepareDAG(avroToPojoApplication, configuration);
            newInstance.getController().run(10000L);
        } catch (ConstraintViolationException e) {
            Assert.fail("constraint violations: " + e.getConstraintViolations());
        }
    }
}
