package org.apache.apex.malhar.lib.fs;

import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.util.BaseOperator;
import java.io.File;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/lib/fs/FSRecordReaderTest.class */
public class FSRecordReaderTest {
    private String inputDir;
    static String outputDir;
    private static final String FILE_1 = "file1.txt";
    private static final String FILE_2 = "file2.txt";
    private static final String FILE_1_DATA = "1234\n567890\nabcde\nfgh\ni\njklmop";
    private static final String FILE_2_DATA = "qr\nstuvw\nxyz\n";

    @Rule
    public TestMeta testMeta = new TestMeta();
    private static Logger LOG = LoggerFactory.getLogger(FSRecordReaderTest.class);

    /* loaded from: input_file:org/apache/apex/malhar/lib/fs/FSRecordReaderTest$DelimitedApplication.class */
    private static class DelimitedApplication implements StreamingApplication {
        private DelimitedApplication() {
        }

        public void populateDAG(DAG dag, Configuration configuration) {
            FSRecordReaderModule addModule = dag.addModule("HDFSRecordReaderModule", FSRecordReaderModule.class);
            addModule.setMode("delimited_record");
            dag.addStream("records", addModule.records, dag.addOperator("Validator", new DelimitedValidator()).data);
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/fs/FSRecordReaderTest$DelimitedValidator.class */
    public static class DelimitedValidator extends BaseOperator {
        static Set<String> records = new HashSet();
        public final transient DefaultInputPort<byte[]> data = new DefaultInputPort<byte[]>() { // from class: org.apache.apex.malhar.lib.fs.FSRecordReaderTest.DelimitedValidator.1
            public void process(byte[] bArr) {
                DelimitedValidator.records.add(new String(bArr));
            }
        };

        public static Set<String> getRecords() {
            return records;
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/fs/FSRecordReaderTest$FixedWidthApplication.class */
    private static class FixedWidthApplication implements StreamingApplication {
        private FixedWidthApplication() {
        }

        public void populateDAG(DAG dag, Configuration configuration) {
            FSRecordReaderModule addModule = dag.addModule("HDFSRecordReaderModule", FSRecordReaderModule.class);
            addModule.setMode("FIXED_WIDTH_RECORD");
            dag.addStream("records", addModule.records, dag.addOperator("Validator", new FixedWidthValidator()).data);
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/fs/FSRecordReaderTest$FixedWidthValidator.class */
    public static class FixedWidthValidator extends BaseOperator {
        Set<String> records = new HashSet();
        public final transient DefaultInputPort<byte[]> data = new DefaultInputPort<byte[]>() { // from class: org.apache.apex.malhar.lib.fs.FSRecordReaderTest.FixedWidthValidator.1
            public void process(byte[] bArr) {
                FixedWidthValidator.this.records.add(new String(bArr));
            }
        };

        public void teardown() {
            Assert.assertEquals(new HashSet(Arrays.asList("1234\n567", "890\nabcd", "e\nfgh\ni\n", "jklmop", "qr\nstuvw", "\nxyz\n")), this.records);
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/fs/FSRecordReaderTest$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        public String baseDirectory;

        protected void starting(Description description) {
            this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
        }
    }

    @Before
    public void setup() throws Exception {
        this.inputDir = this.testMeta.baseDirectory + File.separator + "input";
        FileUtils.writeStringToFile(new File(this.inputDir + File.separator + FILE_1), FILE_1_DATA);
        FileUtils.writeStringToFile(new File(this.inputDir + File.separator + FILE_2), FILE_2_DATA);
    }

    @Test
    public void testDelimitedRecords() throws Exception {
        DelimitedApplication delimitedApplication = new DelimitedApplication();
        LocalMode newInstance = LocalMode.newInstance();
        Configuration configuration = new Configuration(false);
        configuration.set("dt.operator.HDFSRecordReaderModule.prop.files", this.inputDir);
        configuration.set("dt.operator.HDFSRecordReaderModule.prop.blocksThreshold", "1");
        configuration.set("dt.operator.HDFSRecordReaderModule.prop.scanIntervalMillis", "10000");
        newInstance.prepareDAG(delimitedApplication, configuration);
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(true);
        controller.runAsync();
        HashSet hashSet = new HashSet(Arrays.asList(FILE_1_DATA.split("\n")));
        hashSet.addAll(Arrays.asList(FILE_2_DATA.split("\n")));
        while (DelimitedValidator.records.size() != hashSet.size()) {
            LOG.debug("Waiting for app to finish");
            Thread.sleep(1000L);
        }
        controller.shutdown();
        Assert.assertEquals(hashSet, DelimitedValidator.records);
    }

    @Test
    public void testFixedWidthRecords() throws Exception {
        FixedWidthApplication fixedWidthApplication = new FixedWidthApplication();
        LocalMode newInstance = LocalMode.newInstance();
        Configuration configuration = new Configuration(false);
        configuration.set("dt.operator.HDFSRecordReaderModule.prop.files", this.inputDir);
        configuration.set("dt.operator.HDFSRecordReaderModule.prop.recordLength", "8");
        configuration.set("dt.operator.HDFSRecordReaderModule.prop.blocksThreshold", "1");
        configuration.set("dt.operator.HDFSRecordReaderModule.prop.scanIntervalMillis", "10000");
        newInstance.prepareDAG(fixedWidthApplication, configuration);
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(true);
        controller.runAsync();
        LOG.debug("Waiting for app to finish");
        Thread.sleep(1000L);
        controller.shutdown();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testMissingRecordLength() throws Exception {
        FixedWidthApplication fixedWidthApplication = new FixedWidthApplication();
        LocalMode newInstance = LocalMode.newInstance();
        Configuration configuration = new Configuration(false);
        configuration.set("dt.operator.HDFSRecordReaderModule.prop.files", this.inputDir);
        configuration.set("dt.operator.HDFSRecordReaderModule.prop.blocksThreshold", "1");
        configuration.set("dt.operator.HDFSRecordReaderModule.prop.scanIntervalMillis", "10000");
        newInstance.prepareDAG(fixedWidthApplication, configuration);
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(true);
        controller.runAsync();
        LOG.debug("Waiting for app to finish");
        Thread.sleep(1000L);
        controller.shutdown();
    }
}
