/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;

public class ContinuousFileProcessingRescalingTest {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReaderScalingDown() throws Exception {
        OneShotLatch waitingLatch = new OneShotLatch();
        OneShotLatch triggerLatch1 = new OneShotLatch();
        BlockingFileInputFormat format1 = new BlockingFileInputFormat(triggerLatch1, waitingLatch, new Path("test"), 20, 5);
        FileInputSplit[] splits = format1.createInputSplits(2);
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness1 = this.getTestHarness(format1, 2, 0);
        testHarness1.open();
        testHarness1.processElement((StreamRecord<TimestampedFileInputSplit>)new StreamRecord((Object)this.getTimestampedSplit(0L, splits[0])));
        if (!triggerLatch1.isTriggered()) {
            triggerLatch1.await();
        }
        OneShotLatch triggerLatch2 = new OneShotLatch();
        BlockingFileInputFormat format2 = new BlockingFileInputFormat(triggerLatch2, waitingLatch, new Path("test"), 20, 15);
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness2 = this.getTestHarness(format2, 2, 1);
        testHarness2.open();
        testHarness2.processElement((StreamRecord<TimestampedFileInputSplit>)new StreamRecord((Object)this.getTimestampedSplit(0L, splits[1])));
        if (!triggerLatch2.isTriggered()) {
            triggerLatch2.await();
        }
        testHarness1.getOutput().clear();
        testHarness2.getOutput().clear();
        OperatorSubtaskState mergedState = AbstractStreamOperatorTestHarness.repackageState(testHarness2.snapshot(0L, 0L), testHarness1.snapshot(0L, 0L));
        OneShotLatch wLatch = new OneShotLatch();
        OneShotLatch tLatch = new OneShotLatch();
        BlockingFileInputFormat format = new BlockingFileInputFormat(wLatch, tLatch, new Path("test"), 20, 5);
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness = this.getTestHarness(format, 1, 0);
        testHarness.initializeState(mergedState);
        testHarness.open();
        wLatch.trigger();
        tLatch.trigger();
        waitingLatch.trigger();
        Object object = testHarness1.getCheckpointLock();
        synchronized (object) {
            testHarness1.close();
        }
        object = testHarness2.getCheckpointLock();
        synchronized (object) {
            testHarness2.close();
        }
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.close();
        }
        ArrayDeque<Object> expectedResult = new ArrayDeque<Object>();
        this.putElementsInQ(expectedResult, testHarness1.getOutput());
        this.putElementsInQ(expectedResult, testHarness2.getOutput());
        ArrayDeque<Object> actualResult = new ArrayDeque<Object>();
        this.putElementsInQ(actualResult, testHarness.getOutput());
        Assert.assertEquals((long)20L, (long)actualResult.size());
        Assert.assertArrayEquals((Object[])expectedResult.toArray(), (Object[])actualResult.toArray());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReaderScalingUp() throws Exception {
        OneShotLatch waitingLatch1 = new OneShotLatch();
        OneShotLatch triggerLatch1 = new OneShotLatch();
        BlockingFileInputFormat format1 = new BlockingFileInputFormat(triggerLatch1, waitingLatch1, new Path("test"), 20, 5);
        FileInputSplit[] splits = format1.createInputSplits(2);
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness1 = this.getTestHarness(format1, 1, 0);
        testHarness1.open();
        testHarness1.processElement((StreamRecord<TimestampedFileInputSplit>)new StreamRecord((Object)this.getTimestampedSplit(0L, splits[0])));
        testHarness1.processElement((StreamRecord<TimestampedFileInputSplit>)new StreamRecord((Object)this.getTimestampedSplit(1L, splits[1])));
        if (!triggerLatch1.isTriggered()) {
            triggerLatch1.await();
        }
        OperatorSubtaskState snapshot = testHarness1.snapshot(0L, 0L);
        testHarness1.getOutput().clear();
        waitingLatch1.trigger();
        OneShotLatch triggerLatch2 = new OneShotLatch();
        OneShotLatch waitingLatch2 = new OneShotLatch();
        BlockingFileInputFormat format2 = new BlockingFileInputFormat(triggerLatch2, waitingLatch2, new Path("test"), 20, 15);
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness2 = this.getTestHarness(format2, 2, 0);
        testHarness2.setup();
        testHarness2.initializeState(snapshot);
        testHarness2.open();
        BlockingFileInputFormat format3 = new BlockingFileInputFormat(triggerLatch2, waitingLatch2, new Path("test"), 20, 15);
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness3 = this.getTestHarness(format3, 2, 1);
        testHarness3.setup();
        testHarness3.initializeState(snapshot);
        testHarness3.open();
        triggerLatch2.trigger();
        waitingLatch2.trigger();
        Object object = testHarness1.getCheckpointLock();
        synchronized (object) {
            testHarness1.close();
        }
        object = testHarness2.getCheckpointLock();
        synchronized (object) {
            testHarness2.close();
        }
        object = testHarness3.getCheckpointLock();
        synchronized (object) {
            testHarness3.close();
        }
        ArrayDeque<Object> expectedResult = new ArrayDeque<Object>();
        this.putElementsInQ(expectedResult, testHarness1.getOutput());
        ArrayDeque<Object> actualResult = new ArrayDeque<Object>();
        this.putElementsInQ(actualResult, testHarness2.getOutput());
        this.putElementsInQ(actualResult, testHarness3.getOutput());
        Assert.assertEquals((long)35L, (long)actualResult.size());
        Assert.assertArrayEquals((Object[])expectedResult.toArray(), (Object[])actualResult.toArray());
    }

    private void putElementsInQ(Queue<Object> res, Queue<Object> partial) {
        for (Object e : partial) {
            if (e instanceof Watermark) continue;
            res.add(e);
        }
    }

    private OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> getTestHarness(BlockingFileInputFormat format, int noOfTasks, int taksIdx) throws Exception {
        ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator((FileInputFormat)format);
        reader.setOutputType(TypeExtractor.getInputFormatTypes((InputFormat)format), new ExecutionConfig());
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness = new OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String>((OneInputStreamOperator<TimestampedFileInputSplit, String>)reader, 10, noOfTasks, taksIdx);
        testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
        return testHarness;
    }

    private TimestampedFileInputSplit getTimestampedSplit(long modTime, FileInputSplit split) {
        Preconditions.checkNotNull((Object)split);
        return new TimestampedFileInputSplit(modTime, split.getSplitNumber(), split.getPath(), split.getStart(), split.getLength(), split.getHostnames());
    }

    private static class BlockingFileInputFormat
    extends FileInputFormat<String>
    implements CheckpointableInputFormat<FileInputSplit, Integer> {
        private final OneShotLatch triggerLatch;
        private final OneShotLatch waitingLatch;
        private final int elementsBeforeCheckpoint;
        private final int linesPerSplit;
        private FileInputSplit split;
        private int state;

        BlockingFileInputFormat(OneShotLatch triggerLatch, OneShotLatch waitingLatch, Path filePath, int sizeOfSplit, int elementsBeforeCheckpoint) {
            super(filePath);
            this.triggerLatch = triggerLatch;
            this.waitingLatch = waitingLatch;
            this.elementsBeforeCheckpoint = elementsBeforeCheckpoint;
            this.linesPerSplit = sizeOfSplit;
            this.state = 0;
        }

        public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
            FileInputSplit[] splits = new FileInputSplit[minNumSplits];
            for (int i = 0; i < minNumSplits; ++i) {
                splits[i] = new FileInputSplit(i, this.getFilePaths()[0], (long)(i * this.linesPerSplit + 1), (long)this.linesPerSplit, null);
            }
            return splits;
        }

        public void open(FileInputSplit fileSplit) throws IOException {
            this.split = fileSplit;
            this.state = 0;
        }

        public void reopen(FileInputSplit split, Integer state) throws IOException {
            this.split = split;
            this.state = state;
        }

        public Integer getCurrentState() throws IOException {
            return this.state;
        }

        public boolean reachedEnd() throws IOException {
            if (this.state == this.elementsBeforeCheckpoint) {
                this.triggerLatch.trigger();
                if (!this.waitingLatch.isTriggered()) {
                    try {
                        this.waitingLatch.await();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            return this.state == this.linesPerSplit;
        }

        public String nextRecord(String reuse) throws IOException {
            return this.reachedEnd() ? null : this.split.getSplitNumber() + ": test line " + this.state++;
        }
    }
}

