package com.datatorrent.lib.io.block;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.block.AbstractBlockReader;
import com.datatorrent.lib.io.block.AbstractFSBlockReader;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/block/FSLineReaderTest.class */
public class FSLineReaderTest {

    @Rule
    public TestMeta testMeta = new TestMeta();
    private static final Logger LOG = LoggerFactory.getLogger(FSLineReaderTest.class);

    /* loaded from: input_file:com/datatorrent/lib/io/block/FSLineReaderTest$BlockReader.class */
    public static final class BlockReader extends AbstractFSBlockReader.AbstractFSLineReader<String> {
        private final Pattern datePattern = Pattern.compile("\\d{2}?/\\d{2}?/\\d{4}?");

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: convertToRecord, reason: merged with bridge method [inline-methods] */
        public String m35convertToRecord(byte[] bArr) {
            String str = new String(bArr);
            String[] split = str.split(",");
            if (split.length <= 0 || !this.datePattern.matcher(split[0]).find()) {
                return null;
            }
            return str;
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/block/FSLineReaderTest$TestMeta.class */
    public class TestMeta extends TestWatcher {
        String dataFilePath;
        File dataFile;
        Context.OperatorContext readerContext;
        AbstractFSBlockReader<String> blockReader;
        CollectorTestSink<Object> blockMetadataSink;
        CollectorTestSink<Object> messageSink;
        List<String[]> messages = Lists.newArrayList();
        String appId;

        public TestMeta() {
        }

        protected void starting(Description description) {
            this.dataFilePath = "src/test/resources/reader_test_data.csv";
            this.dataFile = new File(this.dataFilePath);
            this.appId = Long.toHexString(System.currentTimeMillis());
            this.blockReader = FSLineReaderTest.this.getBlockReader();
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap.put(DAG.APPLICATION_ID, this.appId);
            defaultAttributeMap.put(Context.OperatorContext.SPIN_MILLIS, 10);
            this.readerContext = new OperatorContextTestHelper.TestIdOperatorContext(1, defaultAttributeMap);
            this.blockReader.setup(this.readerContext);
            this.messageSink = new CollectorTestSink<>();
            this.blockReader.messages.setSink(this.messageSink);
            this.blockMetadataSink = new CollectorTestSink<>();
            this.blockReader.blocksMetadataOutput.setSink(this.blockMetadataSink);
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(this.dataFile.getAbsolutePath())));
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        bufferedReader.close();
                        return;
                    }
                    this.messages.add(readLine.split(","));
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        protected void finished(Description description) {
            this.blockReader.teardown();
        }
    }

    AbstractFSBlockReader<String> getBlockReader() {
        return new BlockReader();
    }

    @Test
    public void test() {
        BlockMetadata.FileBlockMetadata fileBlockMetadata = new BlockMetadata.FileBlockMetadata(this.testMeta.dataFile.getAbsolutePath(), 0L, 0L, this.testMeta.dataFile.length(), true, -1L);
        this.testMeta.blockReader.beginWindow(1L);
        this.testMeta.blockReader.blocksMetadataInput.process(fileBlockMetadata);
        this.testMeta.blockReader.endWindow();
        List list = this.testMeta.messageSink.collectedTuples;
        Assert.assertEquals("No of records", this.testMeta.messages.size(), list.size());
        for (int i = 0; i < list.size(); i++) {
            Assert.assertTrue("line " + i, Arrays.equals(((String) ((AbstractBlockReader.ReaderRecord) list.get(i)).getRecord()).split(","), this.testMeta.messages.get(i)));
        }
    }

    @Test
    public void testMultipleBlocks() {
        int length = (int) ((this.testMeta.dataFile.length() / 1000) + (this.testMeta.dataFile.length() % 1000 == 0 ? 0 : 1));
        this.testMeta.blockReader.beginWindow(1L);
        int i = 0;
        while (i < length) {
            this.testMeta.blockReader.blocksMetadataInput.process(new BlockMetadata.FileBlockMetadata(this.testMeta.dataFile.getAbsolutePath(), i, i * 1000, i == length - 1 ? this.testMeta.dataFile.length() : (i + 1) * 1000, i == length - 1, i - 1));
            i++;
        }
        this.testMeta.blockReader.endWindow();
        List list = this.testMeta.messageSink.collectedTuples;
        Assert.assertEquals("No of records", this.testMeta.messages.size(), list.size());
        for (int i2 = 0; i2 < list.size(); i2++) {
            Assert.assertTrue("line " + i2, Arrays.equals(((String) ((AbstractBlockReader.ReaderRecord) list.get(i2)).getRecord()).split(","), this.testMeta.messages.get(i2)));
        }
    }

    @Test
    public void testNonConsecutiveBlocks() {
        int length = (int) ((this.testMeta.dataFile.length() / 1000) + (this.testMeta.dataFile.length() % 1000 == 0 ? 0 : 1));
        this.testMeta.blockReader.beginWindow(1L);
        int i = 0;
        while (i < length) {
            this.testMeta.blockReader.blocksMetadataInput.process(new BlockMetadata.FileBlockMetadata(this.testMeta.dataFile.getAbsolutePath(), i, i * 1000, i == length - 1 ? this.testMeta.dataFile.length() : (i + 1) * 1000, i == length - 1, -1L));
            i++;
        }
        this.testMeta.blockReader.endWindow();
        List list = this.testMeta.messageSink.collectedTuples;
        Assert.assertEquals("No of records", this.testMeta.messages.size(), list.size());
        for (int i2 = 0; i2 < list.size(); i2++) {
            Assert.assertTrue("line " + i2, Arrays.equals(((String) ((AbstractBlockReader.ReaderRecord) list.get(i2)).getRecord()).split(","), this.testMeta.messages.get(i2)));
        }
    }
}
