package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.fs.FilterStreamCodec;
import com.datatorrent.lib.io.fs.FilterStreamProvider;
import com.datatorrent.lib.testbench.RandomWordGenerator;
import com.datatorrent.lib.util.TestUtils;
import com.datatorrent.netlet.util.DTThrowable;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.io.LimitInputStream;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.zip.GZIPInputStream;
import javax.annotation.Nonnull;
import javax.crypto.Cipher;
import javax.crypto.CipherInputStream;
import javax.crypto.CipherOutputStream;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.IvParameterSpec;
import javax.validation.ConstraintViolationException;
import org.apache.apex.malhar.lib.join.POJOPartitionJoinOperatorTest;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.class */
public class AbstractFileOutputOperatorTest {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFileOutputOperatorTest.class);
    private static final String SINGLE_FILE = "single.txt";
    private static final String EVEN_FILE = "even.txt";
    private static final String ODD_FILE = "odd.txt";

    @Rule
    public FSTestWatcher testMeta = new FSTestWatcher();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest$CounterFilterOutputStream.class */
    public static class CounterFilterOutputStream extends FilterOutputStream {
        long counter;
        int refCount;

        public CounterFilterOutputStream(OutputStream outputStream) {
            super(outputStream);
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream
        public void write(int i) throws IOException {
            this.refCount++;
            super.write(i);
            this.refCount--;
            if (this.refCount == 0) {
                this.counter++;
            }
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream
        public void write(@Nonnull byte[] bArr) throws IOException {
            this.refCount++;
            super.write(bArr);
            this.refCount--;
            if (this.refCount == 0) {
                this.counter += bArr.length;
            }
        }

        @Override // java.io.FilterOutputStream, java.io.OutputStream
        public void write(@Nonnull byte[] bArr, int i, int i2) throws IOException {
            this.refCount++;
            super.write(bArr, i, i2);
            this.refCount--;
            if (this.refCount == 0) {
                this.counter += i2;
            }
        }

        public long getCounter() {
            return this.counter;
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest$CounterFilterStreamContext.class */
    private static class CounterFilterStreamContext implements FilterStreamContext<CounterFilterOutputStream> {
        private CounterFilterOutputStream counterStream;

        private CounterFilterStreamContext() {
        }

        public void init(OutputStream outputStream) {
            this.counterStream = new CounterFilterOutputStream(outputStream);
        }

        public boolean isDoInit() {
            return this.counterStream == null;
        }

        /* renamed from: getFilterStream, reason: merged with bridge method [inline-methods] */
        public CounterFilterOutputStream m39getFilterStream() {
            return this.counterStream;
        }

        public void finalizeContext() throws IOException {
        }

        public long getCounter() {
            if (isDoInit()) {
                return 0L;
            }
            return this.counterStream.getCounter();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest$EvenOddHDFSExactlyOnceWriter.class */
    public static class EvenOddHDFSExactlyOnceWriter extends AbstractFileOutputOperator<Integer> {
        private EvenOddHDFSExactlyOnceWriter() {
        }

        protected FileSystem getFSInstance() throws IOException {
            return FileSystem.getLocal(new Configuration()).getRaw();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public String getFileName(Integer num) {
            return num.intValue() % 2 == 0 ? AbstractFileOutputOperatorTest.EVEN_FILE : AbstractFileOutputOperatorTest.ODD_FILE;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public byte[] getBytesForTuple(Integer num) {
            return (num.toString() + "\n").getBytes();
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest$FSTestWatcher.class */
    public static class FSTestWatcher extends TestUtils.TestInfo {
        public boolean writeToTmp = false;
        public OperatorContextTestHelper.TestIdOperatorContext testOperatorContext;

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.datatorrent.lib.util.TestUtils.TestInfo
        public void starting(Description description) {
            super.starting(description);
            TestUtils.deleteTargetTestClassFolder(description);
            try {
                FileUtils.forceMkdir(new File(getDir()));
                Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
                defaultAttributeMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 60);
                defaultAttributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
                this.testOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(0, defaultAttributeMap);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void finished(Description description) {
            super.finished(description);
            TestUtils.deleteTargetTestClassFolder(description);
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest$SingleHDFSByteExactlyOnceWriter.class */
    private static class SingleHDFSByteExactlyOnceWriter extends AbstractFileOutputOperator<byte[]> {
        SingleHDFSByteExactlyOnceWriter() {
        }

        protected FileSystem getFSInstance() throws IOException {
            return FileSystem.getLocal(new Configuration()).getRaw();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public String getFileName(byte[] bArr) {
            return AbstractFileOutputOperatorTest.SINGLE_FILE;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public byte[] getBytesForTuple(byte[] bArr) {
            return bArr;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest$SingleHDFSExactlyOnceWriter.class */
    public static class SingleHDFSExactlyOnceWriter extends AbstractFileOutputOperator<Integer> {
        private SingleHDFSExactlyOnceWriter() {
        }

        protected FileSystem getFSInstance() throws IOException {
            return FileSystem.getLocal(new Configuration()).getRaw();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public String getFileName(Integer num) {
            return AbstractFileOutputOperatorTest.SINGLE_FILE;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public byte[] getBytesForTuple(Integer num) {
            return (num.toString() + "\n").getBytes();
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest$ValidationTestApp.class */
    private static class ValidationTestApp implements StreamingApplication {
        private final File testDir;
        private final Long maxLength;
        private final AbstractFileOutputOperator<byte[]> fsWriter;

        ValidationTestApp(File file, Long l, AbstractFileOutputOperator<byte[]> abstractFileOutputOperator) {
            this.testDir = file;
            this.maxLength = l;
            this.fsWriter = abstractFileOutputOperator;
        }

        public void populateDAG(DAG dag, Configuration configuration) {
            RandomWordGenerator randomWordGenerator = new RandomWordGenerator();
            randomWordGenerator.setTuplesPerWindow(2);
            dag.addOperator("random", randomWordGenerator);
            if (this.maxLength != null) {
                this.fsWriter.setMaxLength(this.maxLength.longValue());
            }
            this.fsWriter.setFilePath(this.testDir.getPath());
            dag.addOperator("fswriter", this.fsWriter);
            dag.addStream("fswriterstream", randomWordGenerator.output, this.fsWriter.input);
        }
    }

    private void populateFile(String str, String str2) throws IOException {
        File file = new File(this.testMeta.getDir() + "/" + str);
        file.createNewFile();
        FileWriter fileWriter = new FileWriter(file);
        fileWriter.write(str2);
        fileWriter.close();
    }

    public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator abstractFileOutputOperator, long j) {
        if (j >= -1) {
            abstractFileOutputOperator.beforeCheckpoint(j);
        }
        Kryo kryo = new Kryo();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Output output = new Output(byteArrayOutputStream);
        kryo.writeObject(output, abstractFileOutputOperator);
        output.close();
        Input input = new Input(byteArrayOutputStream.toByteArray());
        AbstractFileOutputOperator abstractFileOutputOperator2 = (AbstractFileOutputOperator) kryo.readObject(input, abstractFileOutputOperator.getClass());
        input.close();
        return abstractFileOutputOperator2;
    }

    public static void restoreCheckPoint(AbstractFileOutputOperator abstractFileOutputOperator, AbstractFileOutputOperator abstractFileOutputOperator2) {
        abstractFileOutputOperator2.counts = abstractFileOutputOperator.counts;
        abstractFileOutputOperator2.endOffsets = abstractFileOutputOperator.endOffsets;
        abstractFileOutputOperator2.openPart = abstractFileOutputOperator.openPart;
        abstractFileOutputOperator2.filePath = abstractFileOutputOperator.filePath;
        abstractFileOutputOperator2.maxOpenFiles = abstractFileOutputOperator.maxOpenFiles;
        abstractFileOutputOperator2.replication = abstractFileOutputOperator.replication;
        abstractFileOutputOperator2.totalBytesWritten = abstractFileOutputOperator.totalBytesWritten;
        abstractFileOutputOperator2.maxLength = abstractFileOutputOperator.maxLength;
        abstractFileOutputOperator2.rollingFile = abstractFileOutputOperator.rollingFile;
        abstractFileOutputOperator2.getFileNameToTmpName().putAll(abstractFileOutputOperator.getFileNameToTmpName());
        abstractFileOutputOperator2.getFinalizedFiles().putAll(abstractFileOutputOperator.getFinalizedFiles());
    }

    public static void checkOutput(int i, String str, String str2) {
        if (i >= 0) {
            str = str + "." + i;
        }
        String str3 = null;
        try {
            str3 = FileUtils.readFileToString(new File(str));
        } catch (IOException e) {
            DTThrowable.rethrow(e);
        }
        Assert.assertEquals("Single file " + i + " output contents", str2, str3);
    }

    @Test
    public void testSingleFileCompletedWrite() {
        testSingleFileCompletedWriteHelper(new SingleHDFSExactlyOnceWriter());
        checkOutput(-1, this.testMeta.getDir() + File.separator + SINGLE_FILE, "0\n1\n2\n3\n");
    }

    @Test
    public void testSingleFileCompletedWriteTmp() {
        this.testMeta.writeToTmp = true;
        testSingleFileCompletedWrite();
    }

    @Test
    public void testSingleFileCompletedWriteOverwriteInitial() throws IOException {
        populateFile(SINGLE_FILE, "0\n1\n2\n");
        testSingleFileCompletedWriteHelper(new SingleHDFSExactlyOnceWriter());
        checkOutput(-1, this.testMeta.getDir() + File.separator + SINGLE_FILE, "0\n1\n2\n3\n");
    }

    @Test
    public void testSingleFileCompletedWriteOverwriteInitialTmp() throws IOException {
        this.testMeta.writeToTmp = true;
        testSingleFileCompletedWriteOverwriteInitial();
    }

    private void testSingleFileCompletedWriteHelper(SingleHDFSExactlyOnceWriter singleHDFSExactlyOnceWriter) {
        singleHDFSExactlyOnceWriter.setFilePath(this.testMeta.getDir());
        singleHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        singleHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        singleHDFSExactlyOnceWriter.beginWindow(0L);
        singleHDFSExactlyOnceWriter.input.put(0);
        singleHDFSExactlyOnceWriter.input.put(1);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.beginWindow(1L);
        singleHDFSExactlyOnceWriter.input.put(2);
        singleHDFSExactlyOnceWriter.input.put(3);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.requestFinalize(SINGLE_FILE);
        singleHDFSExactlyOnceWriter.committed(1L);
        singleHDFSExactlyOnceWriter.teardown();
    }

    @Test
    public void testSingleFileFailedWrite() {
        testSingleFileFailedWriteHelper(new SingleHDFSExactlyOnceWriter());
        checkOutput(-1, this.testMeta.getDir() + File.separator + SINGLE_FILE, "0\n1\n4\n5\n6\n7\n");
    }

    @Test
    public void testSingleFileFailedWriteTmp() {
        this.testMeta.writeToTmp = true;
        testSingleFileFailedWrite();
    }

    @Test
    public void testSingleFileFailedWriteOverwriteInitial() throws IOException {
        SingleHDFSExactlyOnceWriter singleHDFSExactlyOnceWriter = new SingleHDFSExactlyOnceWriter();
        populateFile(SINGLE_FILE, "0\n1\n2\n");
        testSingleFileFailedWriteHelper(singleHDFSExactlyOnceWriter);
        checkOutput(-1, this.testMeta.getDir() + File.separator + SINGLE_FILE, "0\n1\n4\n5\n6\n7\n");
    }

    @Test
    public void testSingleFileFailedWriteOverwriteInitiaTmp() throws IOException {
        this.testMeta.writeToTmp = true;
        testSingleFileFailedWriteOverwriteInitial();
    }

    private void testSingleFileFailedWriteHelper(SingleHDFSExactlyOnceWriter singleHDFSExactlyOnceWriter) {
        singleHDFSExactlyOnceWriter.setFilePath(new File(this.testMeta.getDir()).getAbsolutePath());
        singleHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        singleHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        singleHDFSExactlyOnceWriter.beginWindow(0L);
        singleHDFSExactlyOnceWriter.input.put(0);
        singleHDFSExactlyOnceWriter.requestFinalize(SINGLE_FILE);
        singleHDFSExactlyOnceWriter.input.put(1);
        singleHDFSExactlyOnceWriter.endWindow();
        AbstractFileOutputOperator checkpoint = checkpoint(singleHDFSExactlyOnceWriter, -1L);
        singleHDFSExactlyOnceWriter.beginWindow(1L);
        singleHDFSExactlyOnceWriter.input.put(2);
        singleHDFSExactlyOnceWriter.teardown();
        restoreCheckPoint(checkpoint, singleHDFSExactlyOnceWriter);
        singleHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        singleHDFSExactlyOnceWriter.beginWindow(1L);
        singleHDFSExactlyOnceWriter.input.put(4);
        singleHDFSExactlyOnceWriter.requestFinalize(SINGLE_FILE);
        singleHDFSExactlyOnceWriter.input.put(5);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.beginWindow(2L);
        singleHDFSExactlyOnceWriter.input.put(6);
        singleHDFSExactlyOnceWriter.input.put(7);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.committed(2L);
        singleHDFSExactlyOnceWriter.teardown();
    }

    @Test
    public void testMultiFileCompletedWrite() {
        testMultiFileCompletedWriteHelper(new EvenOddHDFSExactlyOnceWriter());
        checkOutput(-1, this.testMeta.getDir() + File.separator + EVEN_FILE, "0\n2\n4\n6\n");
        checkOutput(-1, this.testMeta.getDir() + File.separator + ODD_FILE, "1\n3\n5\n7\n");
    }

    @Test
    public void testMultiFileCompletedWriteTmp() {
        this.testMeta.writeToTmp = true;
        testMultiFileCompletedWrite();
    }

    @Test
    public void testMultiFileCompletedWriteCache1() {
        EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter = new EvenOddHDFSExactlyOnceWriter();
        evenOddHDFSExactlyOnceWriter.setMaxOpenFiles(1);
        testMultiFileCompletedWriteHelper(evenOddHDFSExactlyOnceWriter);
        checkOutput(-1, this.testMeta.getDir() + File.separator + EVEN_FILE, "0\n2\n4\n6\n");
        checkOutput(-1, this.testMeta.getDir() + File.separator + ODD_FILE, "1\n3\n5\n7\n");
    }

    @Test
    public void testMultiFileCompletedWriteCache1Tmp() {
        this.testMeta.writeToTmp = true;
        testMultiFileCompletedWriteCache1();
    }

    @Test
    public void testMultiFileCompletedWriteOverwriteInitial() throws IOException {
        populateFile(EVEN_FILE, "0\n2\n");
        populateFile(ODD_FILE, "1\n3\n");
        testMultiFileCompletedWriteHelper(new EvenOddHDFSExactlyOnceWriter());
        checkOutput(-1, this.testMeta.getDir() + File.separator + EVEN_FILE, "0\n2\n4\n6\n");
        checkOutput(-1, this.testMeta.getDir() + File.separator + ODD_FILE, "1\n3\n5\n7\n");
    }

    @Test
    public void testMultiFileCompletedWriteOverwriteInitialTmp() throws IOException {
        this.testMeta.writeToTmp = true;
        testMultiFileCompletedWriteOverwriteInitial();
    }

    @Test
    public void testMultiFileCompletedWriteOverwriteCache1Initial() throws IOException {
        populateFile(EVEN_FILE, "0\n2\n");
        populateFile(ODD_FILE, "1\n3\n");
        EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter = new EvenOddHDFSExactlyOnceWriter();
        evenOddHDFSExactlyOnceWriter.setMaxOpenFiles(1);
        testMultiFileCompletedWriteHelperCache1(evenOddHDFSExactlyOnceWriter);
        checkOutput(-1, this.testMeta.getDir() + File.separator + EVEN_FILE, "0\n2\n4\n6\n");
        checkOutput(-1, this.testMeta.getDir() + File.separator + ODD_FILE, "1\n3\n5\n7\n");
    }

    @Test
    public void testMultiFileCompletedWriteOverwriteCache1InitialTmp() throws IOException {
        this.testMeta.writeToTmp = true;
        testMultiFileCompletedWriteOverwriteCache1Initial();
    }

    private void testMultiFileCompletedWriteHelperCache1(EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter) {
        evenOddHDFSExactlyOnceWriter.setFilePath(new File(this.testMeta.getDir()).getAbsolutePath());
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(0L);
        evenOddHDFSExactlyOnceWriter.input.put(0);
        evenOddHDFSExactlyOnceWriter.input.put(1);
        evenOddHDFSExactlyOnceWriter.input.put(2);
        evenOddHDFSExactlyOnceWriter.input.put(3);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.beginWindow(1L);
        evenOddHDFSExactlyOnceWriter.input.put(4);
        evenOddHDFSExactlyOnceWriter.input.put(5);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.requestFinalize(EVEN_FILE);
        evenOddHDFSExactlyOnceWriter.requestFinalize(ODD_FILE);
        evenOddHDFSExactlyOnceWriter.beforeCheckpoint(1L);
        evenOddHDFSExactlyOnceWriter.committed(1L);
    }

    private void testMultiFileCompletedWriteHelper(EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter) {
        evenOddHDFSExactlyOnceWriter.setFilePath(new File(this.testMeta.getDir()).getAbsolutePath());
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(0L);
        evenOddHDFSExactlyOnceWriter.input.put(0);
        evenOddHDFSExactlyOnceWriter.input.put(1);
        evenOddHDFSExactlyOnceWriter.input.put(2);
        evenOddHDFSExactlyOnceWriter.input.put(3);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.beginWindow(1L);
        evenOddHDFSExactlyOnceWriter.input.put(4);
        evenOddHDFSExactlyOnceWriter.input.put(5);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.requestFinalize(ODD_FILE);
        evenOddHDFSExactlyOnceWriter.requestFinalize(EVEN_FILE);
        evenOddHDFSExactlyOnceWriter.beforeCheckpoint(1L);
        evenOddHDFSExactlyOnceWriter.committed(1L);
    }

    @Test
    public void testMultiFileFailedWrite() {
        testMultiFileFailedWriteHelper(new EvenOddHDFSExactlyOnceWriter());
        checkOutput(-1, this.testMeta.getDir() + File.separator + EVEN_FILE, "0\n2\n6\n8\n");
        checkOutput(-1, this.testMeta.getDir() + File.separator + ODD_FILE, "1\n3\n7\n9\n");
    }

    @Test
    public void testMultiFileFailedWriteTmp() {
        this.testMeta.writeToTmp = true;
        testMultiFileFailedWrite();
    }

    @Test
    public void testMultiFileFailedWriteCache1() {
        EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter = new EvenOddHDFSExactlyOnceWriter();
        evenOddHDFSExactlyOnceWriter.setMaxOpenFiles(1);
        testMultiFileFailedWriteHelper(evenOddHDFSExactlyOnceWriter);
        checkOutput(-1, this.testMeta.getDir() + File.separator + EVEN_FILE, "0\n2\n6\n8\n");
        checkOutput(-1, this.testMeta.getDir() + File.separator + ODD_FILE, "1\n3\n7\n9\n");
    }

    @Test
    public void testMultiFileFailedWriteCache1Tmp() {
        this.testMeta.writeToTmp = true;
        testMultiFileFailedWriteCache1();
    }

    private void testMultiFileFailedWriteHelper(EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter) {
        evenOddHDFSExactlyOnceWriter.setFilePath(new File(this.testMeta.getDir()).getAbsolutePath());
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(0L);
        evenOddHDFSExactlyOnceWriter.input.put(0);
        evenOddHDFSExactlyOnceWriter.input.put(1);
        evenOddHDFSExactlyOnceWriter.input.put(2);
        evenOddHDFSExactlyOnceWriter.input.put(3);
        evenOddHDFSExactlyOnceWriter.requestFinalize(EVEN_FILE);
        evenOddHDFSExactlyOnceWriter.endWindow();
        AbstractFileOutputOperator checkpoint = checkpoint(evenOddHDFSExactlyOnceWriter, -1L);
        evenOddHDFSExactlyOnceWriter.beginWindow(1L);
        evenOddHDFSExactlyOnceWriter.input.put(4);
        evenOddHDFSExactlyOnceWriter.input.put(5);
        evenOddHDFSExactlyOnceWriter.requestFinalize(ODD_FILE);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.teardown();
        restoreCheckPoint(checkpoint, evenOddHDFSExactlyOnceWriter);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(2L);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.input.put(8);
        evenOddHDFSExactlyOnceWriter.input.put(9);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.beforeCheckpoint(2L);
        evenOddHDFSExactlyOnceWriter.committed(2L);
    }

    @Test
    public void testSingleRollingFileCompletedWrite() {
        testSingleRollingFileCompletedWriteHelper(new SingleHDFSExactlyOnceWriter());
        String str = this.testMeta.getDir() + File.separator + SINGLE_FILE;
        checkOutput(0, str, "0\n1\n2\n");
        checkOutput(1, str, "3\n4\n5\n");
    }

    @Test
    public void testSingleRollingFileCompletedWriteTmp() {
        this.testMeta.writeToTmp = true;
        testSingleRollingFileCompletedWrite();
    }

    @Test
    public void testSingleRollingFileCompletedWriteOverwriteInitial() throws IOException {
        populateFile("single.txt.0", "0\n1\n2\n");
        populateFile("single.txt.1", "0\n1\n2\n");
        populateFile("single.txt.2", "0\n1\n2\n");
        testSingleRollingFileCompletedWriteHelper(new SingleHDFSExactlyOnceWriter());
        String str = this.testMeta.getDir() + File.separator + SINGLE_FILE;
        checkOutput(0, str, "0\n1\n2\n");
        checkOutput(1, str, "3\n4\n5\n");
    }

    @Test
    public void testSingleRollingFileCompletedWriteOverwriteInitialTmp() throws IOException {
        this.testMeta.writeToTmp = true;
        testSingleRollingFileCompletedWriteOverwriteInitial();
    }

    private void testSingleRollingFileCompletedWriteHelper(SingleHDFSExactlyOnceWriter singleHDFSExactlyOnceWriter) {
        singleHDFSExactlyOnceWriter.setFilePath(this.testMeta.getDir());
        singleHDFSExactlyOnceWriter.setMaxLength(4L);
        singleHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        singleHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        singleHDFSExactlyOnceWriter.beginWindow(0L);
        singleHDFSExactlyOnceWriter.input.put(0);
        singleHDFSExactlyOnceWriter.input.put(1);
        singleHDFSExactlyOnceWriter.input.put(2);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.beginWindow(1L);
        singleHDFSExactlyOnceWriter.input.put(3);
        singleHDFSExactlyOnceWriter.input.put(4);
        singleHDFSExactlyOnceWriter.input.put(5);
        singleHDFSExactlyOnceWriter.input.put(6);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.committed(1L);
        singleHDFSExactlyOnceWriter.teardown();
    }

    @Test
    public void testSingleRollingFileFailedWrite() {
        testSingleRollingFileFailedWriteHelper(new SingleHDFSExactlyOnceWriter());
        String str = this.testMeta.getDir() + File.separator + SINGLE_FILE;
        checkOutput(0, str, "0\n1\n2\n");
        checkOutput(1, str, "3\n4\n5\n");
        checkOutput(2, str, "6\n7\n8\n");
    }

    @Test
    public void testSingleRollingFileFailedWriteTmp() {
        this.testMeta.writeToTmp = true;
        testSingleRollingFileFailedWrite();
    }

    private void testSingleRollingFileFailedWriteHelper(SingleHDFSExactlyOnceWriter singleHDFSExactlyOnceWriter) {
        singleHDFSExactlyOnceWriter.setMaxLength(4L);
        singleHDFSExactlyOnceWriter.setFilePath(this.testMeta.getDir());
        singleHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        singleHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        singleHDFSExactlyOnceWriter.beginWindow(0L);
        singleHDFSExactlyOnceWriter.input.put(0);
        singleHDFSExactlyOnceWriter.input.put(1);
        singleHDFSExactlyOnceWriter.input.put(2);
        singleHDFSExactlyOnceWriter.endWindow();
        AbstractFileOutputOperator checkpoint = checkpoint(singleHDFSExactlyOnceWriter, -1L);
        singleHDFSExactlyOnceWriter.beginWindow(1L);
        singleHDFSExactlyOnceWriter.input.put(3);
        singleHDFSExactlyOnceWriter.input.put(4);
        singleHDFSExactlyOnceWriter.teardown();
        restoreCheckPoint(checkpoint, singleHDFSExactlyOnceWriter);
        singleHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        singleHDFSExactlyOnceWriter.beginWindow(1L);
        singleHDFSExactlyOnceWriter.input.put(3);
        singleHDFSExactlyOnceWriter.input.put(4);
        singleHDFSExactlyOnceWriter.input.put(5);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.beginWindow(2L);
        singleHDFSExactlyOnceWriter.input.put(6);
        singleHDFSExactlyOnceWriter.input.put(7);
        singleHDFSExactlyOnceWriter.input.put(8);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.committed(2L);
        singleHDFSExactlyOnceWriter.teardown();
    }

    @Test
    public void testSingleRollingFileFailedWrite1() {
        SingleHDFSExactlyOnceWriter singleHDFSExactlyOnceWriter = new SingleHDFSExactlyOnceWriter();
        singleHDFSExactlyOnceWriter.setFilePath(this.testMeta.getDir());
        singleHDFSExactlyOnceWriter.setMaxLength(4L);
        singleHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        singleHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        singleHDFSExactlyOnceWriter.beginWindow(0L);
        singleHDFSExactlyOnceWriter.input.put(0);
        singleHDFSExactlyOnceWriter.input.put(1);
        singleHDFSExactlyOnceWriter.input.put(2);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.beginWindow(1L);
        singleHDFSExactlyOnceWriter.input.put(3);
        singleHDFSExactlyOnceWriter.input.put(4);
        singleHDFSExactlyOnceWriter.endWindow();
        AbstractFileOutputOperator checkpoint = checkpoint(singleHDFSExactlyOnceWriter, -1L);
        AbstractFileOutputOperator checkpoint2 = checkpoint(singleHDFSExactlyOnceWriter, -1L);
        LOG.debug("Checkpoint endOffsets={}", checkpoint.endOffsets);
        singleHDFSExactlyOnceWriter.beginWindow(2L);
        singleHDFSExactlyOnceWriter.input.put(5);
        singleHDFSExactlyOnceWriter.teardown();
        restoreCheckPoint(checkpoint, singleHDFSExactlyOnceWriter);
        LOG.debug("Checkpoint endOffsets={}", checkpoint.endOffsets);
        singleHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        singleHDFSExactlyOnceWriter.beginWindow(2L);
        singleHDFSExactlyOnceWriter.input.put(5);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.beginWindow(3L);
        singleHDFSExactlyOnceWriter.input.put(6);
        singleHDFSExactlyOnceWriter.input.put(7);
        singleHDFSExactlyOnceWriter.input.put(8);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.teardown();
        restoreCheckPoint(checkpoint2, singleHDFSExactlyOnceWriter);
        singleHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        singleHDFSExactlyOnceWriter.committed(2L);
        String str = this.testMeta.getDir() + File.separator + SINGLE_FILE;
        checkOutput(0, str, "0\n1\n2\n");
        checkOutput(1, str, "3\n4\n");
    }

    @Test
    public void testSingleRollingFileFailedWrite1Tmp() {
        this.testMeta.writeToTmp = true;
        testSingleRollingFileFailedWrite1();
    }

    @Test
    public void testMultiRollingFileCompletedWrite() {
        testMultiRollingFileCompletedWriteHelper(new EvenOddHDFSExactlyOnceWriter());
    }

    @Test
    public void testMultiRollingFileCompletedWriteTmp() {
        this.testMeta.writeToTmp = true;
        testMultiRollingFileCompletedWrite();
    }

    @Test
    public void testMultiRollingFileCompletedWriteCache1() {
        EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter = new EvenOddHDFSExactlyOnceWriter();
        evenOddHDFSExactlyOnceWriter.setMaxOpenFiles(1);
        testMultiRollingFileCompletedWriteHelper(evenOddHDFSExactlyOnceWriter);
    }

    @Test
    public void testMultiRollingFileCompletedWriteCache1Tmp() {
        this.testMeta.writeToTmp = true;
        testMultiRollingFileCompletedWriteCache1();
    }

    @Test
    public void testMultiRollingFileCompletedWriteOverwrite() {
        testMultiRollingFileCompletedWriteHelper(new EvenOddHDFSExactlyOnceWriter());
    }

    @Test
    public void testMultiRollingFileCompletedWriteOverwriteTmp() {
        this.testMeta.writeToTmp = true;
        testMultiRollingFileCompletedWriteOverwrite();
    }

    @Test
    public void testMultiRollingFileCompletedWriteOverwriteCache1() {
        EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter = new EvenOddHDFSExactlyOnceWriter();
        evenOddHDFSExactlyOnceWriter.setMaxOpenFiles(1);
        testMultiRollingFileCompletedWriteHelperCache1(evenOddHDFSExactlyOnceWriter);
    }

    @Test
    public void testMultiRollingFileCompletedWriteOverwriteCache1Tmp() {
        this.testMeta.writeToTmp = true;
        testMultiRollingFileCompletedWriteOverwriteCache1();
    }

    private void testMultiRollingFileCompletedWriteHelperCache1(EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter) {
        evenOddHDFSExactlyOnceWriter.setMaxLength(4L);
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        evenOddHDFSExactlyOnceWriter.setFilePath(new File(this.testMeta.getDir()).getAbsolutePath());
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(0L);
        evenOddHDFSExactlyOnceWriter.input.put(0);
        evenOddHDFSExactlyOnceWriter.input.put(1);
        evenOddHDFSExactlyOnceWriter.input.put(2);
        evenOddHDFSExactlyOnceWriter.input.put(3);
        evenOddHDFSExactlyOnceWriter.input.put(4);
        evenOddHDFSExactlyOnceWriter.input.put(5);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.beginWindow(1L);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.input.put(8);
        evenOddHDFSExactlyOnceWriter.input.put(9);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.committed(1L);
        String str = this.testMeta.getDir() + File.separator + EVEN_FILE;
        checkOutput(0, str, "0\n2\n4\n");
        checkOutput(1, str, "6\n8\n6\n");
        String str2 = this.testMeta.getDir() + File.separator + ODD_FILE;
        checkOutput(0, str2, "1\n3\n5\n");
        checkOutput(1, str2, "7\n9\n7\n");
    }

    private void testMultiRollingFileCompletedWriteHelper(EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter) {
        evenOddHDFSExactlyOnceWriter.setMaxLength(4L);
        evenOddHDFSExactlyOnceWriter.setFilePath(new File(this.testMeta.getDir()).getAbsolutePath());
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(0L);
        evenOddHDFSExactlyOnceWriter.input.put(0);
        evenOddHDFSExactlyOnceWriter.input.put(1);
        evenOddHDFSExactlyOnceWriter.input.put(2);
        evenOddHDFSExactlyOnceWriter.input.put(3);
        evenOddHDFSExactlyOnceWriter.input.put(4);
        evenOddHDFSExactlyOnceWriter.input.put(5);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.beginWindow(1L);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.input.put(8);
        evenOddHDFSExactlyOnceWriter.input.put(9);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.committed(1L);
        String str = this.testMeta.getDir() + File.separator + EVEN_FILE;
        checkOutput(0, str, "0\n2\n4\n");
        checkOutput(1, str, "6\n8\n6\n");
        String str2 = this.testMeta.getDir() + File.separator + ODD_FILE;
        checkOutput(0, str2, "1\n3\n5\n");
        checkOutput(1, str2, "7\n9\n7\n");
    }

    @Test
    public void testMultiRollingFileFailedWrite() {
        testMultiRollingFileFailedWriteHelperHelper(new EvenOddHDFSExactlyOnceWriter());
    }

    @Test
    public void testMultiRollingFileFailedWriteTmp() {
        this.testMeta.writeToTmp = true;
        testMultiRollingFileFailedWrite();
    }

    @Test
    public void testMultiRollingFileFailedWriteCache1() {
        EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter = new EvenOddHDFSExactlyOnceWriter();
        evenOddHDFSExactlyOnceWriter.setMaxOpenFiles(1);
        testMultiRollingFileFailedWriteHelperHelper(evenOddHDFSExactlyOnceWriter);
    }

    @Test
    public void testMultiRollingFileFailedWriteCache1Tmp() {
        this.testMeta.writeToTmp = true;
        testMultiRollingFileFailedWriteCache1();
    }

    private void testMultiRollingFileFailedWriteHelperHelper(EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter) {
        testMultiRollingFileFailedWriteHelper(evenOddHDFSExactlyOnceWriter);
        String str = this.testMeta.getDir() + File.separator + EVEN_FILE;
        checkOutput(0, str, "0\n2\n4\n");
        checkOutput(1, str, "6\n8\n6\n");
        String str2 = this.testMeta.getDir() + File.separator + ODD_FILE;
        checkOutput(0, str2, "1\n3\n5\n");
        checkOutput(1, str2, "7\n9\n7\n");
    }

    private void testMultiRollingFileFailedWriteHelper(EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter) {
        evenOddHDFSExactlyOnceWriter.setFilePath(new File(this.testMeta.getDir()).getAbsolutePath());
        evenOddHDFSExactlyOnceWriter.setMaxLength(4L);
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(0L);
        evenOddHDFSExactlyOnceWriter.input.put(0);
        evenOddHDFSExactlyOnceWriter.input.put(1);
        evenOddHDFSExactlyOnceWriter.endWindow();
        AbstractFileOutputOperator checkpoint = checkpoint(evenOddHDFSExactlyOnceWriter, -1L);
        evenOddHDFSExactlyOnceWriter.beginWindow(1L);
        evenOddHDFSExactlyOnceWriter.input.put(2);
        evenOddHDFSExactlyOnceWriter.input.put(3);
        evenOddHDFSExactlyOnceWriter.teardown();
        restoreCheckPoint(checkpoint, evenOddHDFSExactlyOnceWriter);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(1L);
        evenOddHDFSExactlyOnceWriter.input.put(2);
        evenOddHDFSExactlyOnceWriter.input.put(3);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.beginWindow(2L);
        evenOddHDFSExactlyOnceWriter.input.put(4);
        evenOddHDFSExactlyOnceWriter.input.put(5);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.beginWindow(3L);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.input.put(8);
        evenOddHDFSExactlyOnceWriter.input.put(9);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.committed(3L);
    }

    @Test
    public void testMultiRollingFileFailedWriteOverwrite() throws IOException {
        testMultiRollingFileFailedWriteOverwriteHelper(new EvenOddHDFSExactlyOnceWriter());
    }

    @Test
    public void testMultiRollingFileFailedWriteOverwriteTmp() throws IOException {
        this.testMeta.writeToTmp = true;
        testMultiRollingFileFailedWriteOverwrite();
    }

    @Test
    public void testMultiRollingFileFailedWriteOverwriteCache1() throws IOException {
        EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter = new EvenOddHDFSExactlyOnceWriter();
        evenOddHDFSExactlyOnceWriter.setMaxOpenFiles(1);
        String str = this.testMeta.getDir() + File.separator + EVEN_FILE;
        String str2 = this.testMeta.getDir() + File.separator + ODD_FILE;
        populateFile("even.txt.0", "0\n2\n4\n");
        populateFile("odd.txt.0", "1\n3\n5\n");
        testMultiRollingFileFailedWriteOverwriteHelperCache1(evenOddHDFSExactlyOnceWriter);
        checkOutput(0, str, "0\n4\n6\n");
        checkOutput(1, str, "8\n6\n10\n");
        checkOutput(0, str2, "1\n5\n7\n");
        checkOutput(1, str2, "9\n7\n11\n");
    }

    @Test
    public void testMultiRollingFileFailedWriteOverwriteCache1Tmp() throws IOException {
        this.testMeta.writeToTmp = true;
        testMultiRollingFileFailedWriteOverwriteCache1();
    }

    private void testMultiRollingFileFailedWriteOverwriteHelperCache1(EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter) {
        evenOddHDFSExactlyOnceWriter.setFilePath(new File(this.testMeta.getDir()).getAbsolutePath());
        evenOddHDFSExactlyOnceWriter.setMaxLength(4L);
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(0L);
        evenOddHDFSExactlyOnceWriter.input.put(0);
        evenOddHDFSExactlyOnceWriter.input.put(1);
        evenOddHDFSExactlyOnceWriter.endWindow();
        AbstractFileOutputOperator checkpoint = checkpoint(evenOddHDFSExactlyOnceWriter, -1L);
        evenOddHDFSExactlyOnceWriter.beginWindow(1L);
        evenOddHDFSExactlyOnceWriter.input.put(2);
        evenOddHDFSExactlyOnceWriter.input.put(3);
        evenOddHDFSExactlyOnceWriter.teardown();
        restoreCheckPoint(checkpoint, evenOddHDFSExactlyOnceWriter);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(1L);
        evenOddHDFSExactlyOnceWriter.input.put(4);
        evenOddHDFSExactlyOnceWriter.input.put(5);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.beginWindow(2L);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.input.put(8);
        evenOddHDFSExactlyOnceWriter.input.put(9);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.input.put(10);
        evenOddHDFSExactlyOnceWriter.input.put(11);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.committed(2L);
    }

    private void testMultiRollingFileFailedWriteOverwriteHelper(EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter) throws IOException {
        String str = this.testMeta.getDir() + File.separator + EVEN_FILE;
        String str2 = this.testMeta.getDir() + File.separator + ODD_FILE;
        populateFile("even.txt.0", "0\n2\n4\n");
        populateFile("odd.txt.0", "1\n3\n5\n");
        evenOddHDFSExactlyOnceWriter.setFilePath(new File(this.testMeta.getDir()).getAbsolutePath());
        evenOddHDFSExactlyOnceWriter.setMaxLength(4L);
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(0L);
        evenOddHDFSExactlyOnceWriter.input.process(0);
        evenOddHDFSExactlyOnceWriter.input.process(1);
        evenOddHDFSExactlyOnceWriter.endWindow();
        AbstractFileOutputOperator checkpoint = checkpoint(evenOddHDFSExactlyOnceWriter, -1L);
        evenOddHDFSExactlyOnceWriter.beginWindow(1L);
        evenOddHDFSExactlyOnceWriter.input.process(2);
        evenOddHDFSExactlyOnceWriter.input.process(3);
        evenOddHDFSExactlyOnceWriter.teardown();
        restoreCheckPoint(checkpoint, evenOddHDFSExactlyOnceWriter);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(1L);
        evenOddHDFSExactlyOnceWriter.input.process(4);
        evenOddHDFSExactlyOnceWriter.input.process(5);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.beginWindow(2L);
        evenOddHDFSExactlyOnceWriter.input.process(6);
        evenOddHDFSExactlyOnceWriter.input.process(7);
        evenOddHDFSExactlyOnceWriter.input.process(8);
        evenOddHDFSExactlyOnceWriter.input.process(9);
        evenOddHDFSExactlyOnceWriter.input.process(6);
        evenOddHDFSExactlyOnceWriter.input.process(7);
        evenOddHDFSExactlyOnceWriter.input.process(10);
        evenOddHDFSExactlyOnceWriter.input.process(11);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.committed(2L);
        checkOutput(0, str, "0\n4\n6\n");
        checkOutput(1, str, "8\n6\n10\n");
        checkOutput(0, str2, "1\n5\n7\n");
        checkOutput(1, str2, "9\n7\n11\n");
    }

    @Test
    public void singleFileMultiRollingFailure() {
        SingleHDFSExactlyOnceWriter singleHDFSExactlyOnceWriter = new SingleHDFSExactlyOnceWriter();
        singleHDFSExactlyOnceWriter.setFilePath(new File(this.testMeta.getDir()).getAbsolutePath());
        singleHDFSExactlyOnceWriter.setMaxLength(4L);
        singleFileMultiRollingFailureHelper(singleHDFSExactlyOnceWriter);
        String str = this.testMeta.getDir() + File.separator + SINGLE_FILE;
        checkOutput(0, str, "0\n1\n2\n");
        checkOutput(1, str, "3\n4\n0\n");
        checkOutput(2, str, "1\n2\n3\n");
        checkOutput(3, str, "4\n5\n6\n");
    }

    @Test
    public void singleFileMultiRollingFailureTmp() {
        this.testMeta.writeToTmp = true;
        singleFileMultiRollingFailure();
    }

    private void singleFileMultiRollingFailureHelper(SingleHDFSExactlyOnceWriter singleHDFSExactlyOnceWriter) {
        singleHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        singleHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        singleHDFSExactlyOnceWriter.beginWindow(0L);
        singleHDFSExactlyOnceWriter.input.put(0);
        singleHDFSExactlyOnceWriter.input.put(1);
        singleHDFSExactlyOnceWriter.input.put(2);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.beginWindow(1L);
        singleHDFSExactlyOnceWriter.input.put(3);
        singleHDFSExactlyOnceWriter.input.put(4);
        AbstractFileOutputOperator checkpoint = checkpoint(singleHDFSExactlyOnceWriter, -1L);
        singleHDFSExactlyOnceWriter.input.put(3);
        singleHDFSExactlyOnceWriter.input.put(4);
        singleHDFSExactlyOnceWriter.input.put(5);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.beginWindow(2L);
        singleHDFSExactlyOnceWriter.input.put(6);
        singleHDFSExactlyOnceWriter.input.put(7);
        singleHDFSExactlyOnceWriter.input.put(8);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.teardown();
        restoreCheckPoint(checkpoint, singleHDFSExactlyOnceWriter);
        singleHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        singleHDFSExactlyOnceWriter.beginWindow(1L);
        singleHDFSExactlyOnceWriter.input.put(0);
        singleHDFSExactlyOnceWriter.input.put(1);
        singleHDFSExactlyOnceWriter.input.put(2);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.beginWindow(2L);
        singleHDFSExactlyOnceWriter.input.put(3);
        singleHDFSExactlyOnceWriter.input.put(4);
        singleHDFSExactlyOnceWriter.input.put(5);
        singleHDFSExactlyOnceWriter.input.put(6);
        singleHDFSExactlyOnceWriter.endWindow();
        singleHDFSExactlyOnceWriter.committed(2L);
    }

    @Test
    public void validateNothingWrongTest() {
        LocalMode.runApp(new ValidationTestApp(new File(this.testMeta.getDir()), null, new SingleHDFSByteExactlyOnceWriter()), 1);
    }

    @Test
    public void validateNegativeMaxLengthTest() {
        boolean z = false;
        try {
            LocalMode.runApp(new ValidationTestApp(new File(this.testMeta.getDir()), -1L, new SingleHDFSByteExactlyOnceWriter()), 1);
        } catch (RuntimeException e) {
            if (e.getCause() instanceof ConstraintViolationException) {
                z = true;
            }
        }
        Assert.assertEquals("Max length validation not thrown with -1 max length", true, Boolean.valueOf(z));
    }

    @Test
    public void testPeriodicRotation() {
        EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter = new EvenOddHDFSExactlyOnceWriter();
        File file = new File(this.testMeta.getDir());
        evenOddHDFSExactlyOnceWriter.setFilePath(this.testMeta.getDir());
        evenOddHDFSExactlyOnceWriter.setRotationWindows(30);
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(this.testMeta.writeToTmp);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        for (int i = 0; i < 30; i++) {
            evenOddHDFSExactlyOnceWriter.beginWindow(i);
            for (int i2 = 0; i2 < i; i2++) {
                evenOddHDFSExactlyOnceWriter.input.put(Integer.valueOf((2 * i2) + 1));
            }
            evenOddHDFSExactlyOnceWriter.endWindow();
        }
        evenOddHDFSExactlyOnceWriter.committed(29L);
        TreeSet treeSet = new TreeSet();
        treeSet.add("odd.txt.0");
        Collection<File> listFiles = FileUtils.listFiles(file, (String[]) null, false);
        Assert.assertEquals("Number of part files", 1L, listFiles.size());
        Assert.assertEquals("Part file names", treeSet, getFileNames(listFiles));
        for (int i3 = 30; i3 < 120; i3++) {
            evenOddHDFSExactlyOnceWriter.beginWindow(i3);
            for (int i4 = 0; i4 < i3; i4++) {
                evenOddHDFSExactlyOnceWriter.input.put(Integer.valueOf(i4));
            }
            evenOddHDFSExactlyOnceWriter.endWindow();
        }
        evenOddHDFSExactlyOnceWriter.committed(119L);
        Collection<File> listFiles2 = FileUtils.listFiles(file, (String[]) null, false);
        Assert.assertEquals("Number of part files", 7L, listFiles2.size());
        for (int i5 = 0; i5 < 3; i5++) {
            treeSet.add("even.txt." + i5);
        }
        for (int i6 = 1; i6 < 4; i6++) {
            treeSet.add("odd.txt." + i6);
        }
        Assert.assertEquals("Part file names", treeSet, getFileNames(listFiles2));
        for (int i7 = 120; i7 < 180; i7++) {
            evenOddHDFSExactlyOnceWriter.beginWindow(i7);
            for (int i8 = 0; i8 < i7; i8++) {
                evenOddHDFSExactlyOnceWriter.input.put(Integer.valueOf(i8 * 2));
            }
            evenOddHDFSExactlyOnceWriter.endWindow();
        }
        evenOddHDFSExactlyOnceWriter.committed(179L);
        Collection<File> listFiles3 = FileUtils.listFiles(file, (String[]) null, false);
        Assert.assertEquals("Number of part files", 9L, listFiles3.size());
        for (int i9 = 3; i9 < 5; i9++) {
            treeSet.add("even.txt." + i9);
        }
        Assert.assertEquals("Part file names", treeSet, getFileNames(listFiles3));
        evenOddHDFSExactlyOnceWriter.teardown();
    }

    @Test
    public void testPeriodicRotationTmp() {
        this.testMeta.writeToTmp = true;
        testPeriodicRotation();
    }

    @Test
    public void testPeriodicRotationWithEviction() throws InterruptedException {
        EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter = new EvenOddHDFSExactlyOnceWriter();
        File file = new File(this.testMeta.getDir());
        evenOddHDFSExactlyOnceWriter.setFilePath(this.testMeta.getDir());
        evenOddHDFSExactlyOnceWriter.setRotationWindows(30);
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(true);
        evenOddHDFSExactlyOnceWriter.setExpireStreamAfterAccessMillis(1L);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        for (int i = 0; i < 30; i++) {
            evenOddHDFSExactlyOnceWriter.beginWindow(i);
            if (i == 0) {
                evenOddHDFSExactlyOnceWriter.input.put(Integer.valueOf(i));
            }
            Thread.sleep(100L);
            evenOddHDFSExactlyOnceWriter.endWindow();
        }
        evenOddHDFSExactlyOnceWriter.committed(29L);
        TreeSet treeSet = new TreeSet();
        treeSet.add("even.txt.0");
        Collection<File> listFiles = FileUtils.listFiles(file, (String[]) null, false);
        Assert.assertEquals("Number of part files", 1L, listFiles.size());
        Assert.assertEquals("Part file names", treeSet, getFileNames(listFiles));
        for (int i2 = 30; i2 < 120; i2++) {
            evenOddHDFSExactlyOnceWriter.beginWindow(i2);
            evenOddHDFSExactlyOnceWriter.endWindow();
        }
        evenOddHDFSExactlyOnceWriter.committed(119L);
        Collection<File> listFiles2 = FileUtils.listFiles(file, (String[]) null, false);
        Assert.assertEquals("Number of part files", 1L, listFiles2.size());
        Assert.assertEquals("Part file names", treeSet, getFileNames(listFiles2));
    }

    @Test
    public void testCompression() throws IOException {
        EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter = new EvenOddHDFSExactlyOnceWriter();
        evenOddHDFSExactlyOnceWriter.setFilterStreamProvider(new FilterStreamCodec.GZipFilterStreamProvider());
        File file = new File(this.testMeta.getDir(), EVEN_FILE);
        File file2 = new File(this.testMeta.getDir(), ODD_FILE);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        evenOddHDFSExactlyOnceWriter.setFilePath(this.testMeta.getDir());
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(false);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        for (int i = 0; i < 10; i++) {
            evenOddHDFSExactlyOnceWriter.beginWindow(i);
            for (int i2 = 0; i2 < 1000; i2++) {
                evenOddHDFSExactlyOnceWriter.input.put(Integer.valueOf(i));
            }
            evenOddHDFSExactlyOnceWriter.endWindow();
            if (i % 2 == 1) {
                evenOddHDFSExactlyOnceWriter.beforeCheckpoint(i);
                arrayList.add(Long.valueOf(file.length()));
                arrayList2.add(Long.valueOf(file2.length()));
            }
        }
        evenOddHDFSExactlyOnceWriter.teardown();
        checkCompressedFile(file, arrayList, 0, 5, POJOPartitionJoinOperatorTest.TOTAL_TUPLES_PROCESS, null, null);
        checkCompressedFile(file2, arrayList2, 1, 5, POJOPartitionJoinOperatorTest.TOTAL_TUPLES_PROCESS, null, null);
    }

    @Test
    public void testRecoveryOfOpenFiles() {
        EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter = new EvenOddHDFSExactlyOnceWriter();
        evenOddHDFSExactlyOnceWriter.setMaxLength(4L);
        evenOddHDFSExactlyOnceWriter.setFilePath(new File(this.testMeta.getDir()).getAbsolutePath());
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(true);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.beginWindow(0L);
        evenOddHDFSExactlyOnceWriter.input.put(0);
        evenOddHDFSExactlyOnceWriter.input.put(1);
        evenOddHDFSExactlyOnceWriter.input.put(2);
        evenOddHDFSExactlyOnceWriter.input.put(3);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.beforeCheckpoint(0L);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        evenOddHDFSExactlyOnceWriter.input.put(4);
        evenOddHDFSExactlyOnceWriter.input.put(5);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.beginWindow(1L);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.input.put(8);
        evenOddHDFSExactlyOnceWriter.input.put(9);
        evenOddHDFSExactlyOnceWriter.input.put(6);
        evenOddHDFSExactlyOnceWriter.input.put(7);
        evenOddHDFSExactlyOnceWriter.endWindow();
        evenOddHDFSExactlyOnceWriter.committed(1L);
        String str = this.testMeta.getDir() + File.separator + EVEN_FILE;
        checkOutput(0, str, "0\n2\n4\n");
        String str2 = this.testMeta.getDir() + File.separator + ODD_FILE;
        checkOutput(0, str2, "1\n3\n5\n");
        checkOutput(1, str, "6\n8\n6\n");
        checkOutput(1, str2, "7\n9\n7\n");
    }

    private void checkCompressedFile(File file, List<Long> list, int i, int i2, int i3, SecretKey secretKey, byte[] bArr) throws IOException {
        InputStream inputStream = null;
        GZIPInputStream gZIPInputStream = null;
        BufferedReader bufferedReader = null;
        Cipher cipher = null;
        if (secretKey != null) {
            try {
                cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
                cipher.init(2, secretKey, new IvParameterSpec(bArr));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        int i4 = 0;
        try {
            try {
                InputStream fileInputStream = new FileInputStream(file);
                InputStream inputStream2 = fileInputStream;
                if (secretKey != null) {
                    try {
                        inputStream2 = new CipherInputStream(fileInputStream, cipher);
                    } catch (Exception e2) {
                        throw new RuntimeException(e2);
                    }
                }
                long j = 0;
                Iterator<Long> it = list.iterator();
                while (it.hasNext()) {
                    long longValue = it.next().longValue();
                    if (longValue != 0) {
                        gZIPInputStream = new GZIPInputStream(new LimitInputStream(inputStream2, longValue - j));
                        bufferedReader = new BufferedReader(new InputStreamReader(gZIPInputStream));
                        String str = "" + (i + (i4 * 2));
                        int i5 = 0;
                        while (true) {
                            String readLine = bufferedReader.readLine();
                            if (readLine == null) {
                                break;
                            }
                            Assert.assertEquals("File line", str, readLine);
                            i5++;
                            if (i5 % i3 == 0) {
                                i4++;
                                str = "" + (i + (i4 * 2));
                            }
                        }
                        j = longValue;
                    }
                }
                if (bufferedReader != null) {
                    bufferedReader.close();
                } else if (gZIPInputStream != null) {
                    gZIPInputStream.close();
                } else if (inputStream2 != null) {
                    inputStream2.close();
                }
            } catch (Exception e3) {
                e3.printStackTrace();
                if (0 != 0) {
                    bufferedReader.close();
                } else if (0 != 0) {
                    gZIPInputStream.close();
                } else if (0 != 0) {
                    inputStream.close();
                }
            }
            Assert.assertEquals("Total", i2, i4);
        } catch (Throwable th) {
            if (0 != 0) {
                bufferedReader.close();
            } else if (0 != 0) {
                gZIPInputStream.close();
            } else if (0 != 0) {
                inputStream.close();
            }
            throw th;
        }
    }

    @Test
    public void testChainFilters() throws NoSuchAlgorithmException, IOException {
        EvenOddHDFSExactlyOnceWriter evenOddHDFSExactlyOnceWriter = new EvenOddHDFSExactlyOnceWriter();
        KeyGenerator keyGenerator = KeyGenerator.getInstance("AES");
        keyGenerator.init(128);
        final SecretKey generateKey = keyGenerator.generateKey();
        byte[] bytes = "TestParam16bytes".getBytes();
        final IvParameterSpec ivParameterSpec = new IvParameterSpec(bytes);
        FilterStreamProvider.FilterChainStreamProvider filterChainStreamProvider = new FilterStreamProvider.FilterChainStreamProvider();
        filterChainStreamProvider.addStreamProvider(new FilterStreamCodec.GZipFilterStreamProvider());
        final CounterFilterStreamContext counterFilterStreamContext = new CounterFilterStreamContext();
        final CounterFilterStreamContext counterFilterStreamContext2 = new CounterFilterStreamContext();
        filterChainStreamProvider.addStreamProvider(new FilterStreamProvider.SimpleFilterReusableStreamProvider<CounterFilterOutputStream, OutputStream>() { // from class: com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.1
            protected FilterStreamContext<CounterFilterOutputStream> createFilterStreamContext(OutputStream outputStream) throws IOException {
                if (counterFilterStreamContext.isDoInit()) {
                    counterFilterStreamContext.init(outputStream);
                    return counterFilterStreamContext;
                }
                counterFilterStreamContext2.init(outputStream);
                return counterFilterStreamContext2;
            }
        });
        filterChainStreamProvider.addStreamProvider(new FilterStreamProvider.SimpleFilterReusableStreamProvider<CipherOutputStream, OutputStream>() { // from class: com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.2
            protected FilterStreamContext<CipherOutputStream> createFilterStreamContext(OutputStream outputStream) throws IOException {
                try {
                    Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
                    cipher.init(1, generateKey, ivParameterSpec);
                    return new FilterStreamCodec.CipherFilterStreamContext(outputStream, cipher);
                } catch (Exception e) {
                    throw new IOException(e);
                }
            }
        });
        evenOddHDFSExactlyOnceWriter.setFilterStreamProvider(filterChainStreamProvider);
        File file = new File(this.testMeta.getDir(), EVEN_FILE);
        File file2 = new File(this.testMeta.getDir(), ODD_FILE);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        evenOddHDFSExactlyOnceWriter.setFilePath(this.testMeta.getDir());
        evenOddHDFSExactlyOnceWriter.setAlwaysWriteToTmp(false);
        evenOddHDFSExactlyOnceWriter.setup(this.testMeta.testOperatorContext);
        for (int i = 0; i < 10; i++) {
            evenOddHDFSExactlyOnceWriter.beginWindow(i);
            for (int i2 = 0; i2 < 1000; i2++) {
                evenOddHDFSExactlyOnceWriter.input.put(Integer.valueOf(i));
            }
            evenOddHDFSExactlyOnceWriter.endWindow();
            if (i % 2 == 1) {
                evenOddHDFSExactlyOnceWriter.beforeCheckpoint(i);
                arrayList.add(Long.valueOf(counterFilterStreamContext.getCounter()));
                arrayList2.add(Long.valueOf(counterFilterStreamContext2.getCounter()));
            }
        }
        evenOddHDFSExactlyOnceWriter.teardown();
        checkCompressedFile(file, arrayList, 0, 5, POJOPartitionJoinOperatorTest.TOTAL_TUPLES_PROCESS, generateKey, bytes);
        checkCompressedFile(file2, arrayList2, 1, 5, POJOPartitionJoinOperatorTest.TOTAL_TUPLES_PROCESS, generateKey, bytes);
    }

    private Set<String> getFileNames(Collection<File> collection) {
        TreeSet treeSet = new TreeSet();
        Iterator<File> it = collection.iterator();
        while (it.hasNext()) {
            treeSet.add(it.next().getName());
        }
        return treeSet;
    }
}
