/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.legacy.io.TextInputFormat;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.AssumptionViolatedException;
import org.junit.Before;

public class ContinuousFileProcessingCheckpointITCase
extends StreamFaultToleranceTestBase {
    private static final int NO_OF_FILES = 5;
    private static final int LINES_PER_FILE = 150;
    private static final long INTERVAL = 100L;
    private static File baseDir;
    private static FileSystem localFs;
    private static String localFsURI;
    private FileCreator fc;
    private static Map<Integer, Set<String>> actualCollectedContent;

    @Before
    public void createHDFS() throws IOException {
        if (this.failoverStrategy.equals((Object)StreamFaultToleranceTestBase.FailoverStrategy.RestartPipelinedRegionFailoverStrategy)) {
            throw new AssumptionViolatedException("ignored ContinuousFileProcessingCheckpointITCase when using RestartPipelinedRegionStrategy");
        }
        baseDir = new File("./target/localfs/fs_tests").getAbsoluteFile();
        FileUtil.fullyDelete((File)baseDir);
        Configuration hdConf = new Configuration();
        localFsURI = "file:///" + baseDir + "/";
        localFs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
    }

    @After
    public void destroyHDFS() {
        if (baseDir != null) {
            FileUtil.fullyDelete((File)baseDir);
        }
    }

    @Override
    public void testProgram(StreamExecutionEnvironment env) {
        env.enableCheckpointing(10L);
        this.fc = new FileCreator();
        this.fc.start();
        TextInputFormat format = new TextInputFormat(new Path(localFsURI));
        format.setFilesFilter(FilePathFilter.createDefaultFilter());
        DataStreamSource inputStream = env.readFile((FileInputFormat)format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, 100L);
        TestingSinkFunction sink = new TestingSinkFunction();
        inputStream.flatMap((FlatMapFunction)new FlatMapFunction<String, String>(){

            public void flatMap(String value, Collector<String> out) throws Exception {
                out.collect((Object)value);
            }
        }).addSink((SinkFunction)sink).setParallelism(1);
    }

    @Override
    public void postSubmit() throws Exception {
        this.fc.join();
        Map<Integer, Set<String>> collected = actualCollectedContent;
        Assert.assertEquals((long)collected.size(), (long)this.fc.getFileContent().size());
        for (Integer fileIdx : this.fc.getFileContent().keySet()) {
            Assert.assertTrue((boolean)collected.keySet().contains(fileIdx));
            ArrayList cntnt = new ArrayList(collected.get(fileIdx));
            Collections.sort(cntnt, new Comparator<String>(){

                @Override
                public int compare(String o1, String o2) {
                    return ContinuousFileProcessingCheckpointITCase.this.getLineNo(o1) - ContinuousFileProcessingCheckpointITCase.this.getLineNo(o2);
                }
            });
            StringBuilder cntntStr = new StringBuilder();
            for (String line : cntnt) {
                cntntStr.append(line);
            }
            Assert.assertEquals((Object)this.fc.getFileContent().get(fileIdx), (Object)cntntStr.toString());
        }
        collected.clear();
        actualCollectedContent.clear();
        this.fc.clean();
    }

    private int getLineNo(String line) {
        String[] tkns = line.split("\\s");
        return Integer.parseInt(tkns[tkns.length - 1]);
    }

    private Tuple2<org.apache.hadoop.fs.Path, String> fillWithData(String base, String fileName, int fileIdx, String sampleLine) throws IOException, InterruptedException {
        assert (localFs != null);
        org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
        FSDataOutputStream stream = localFs.create(tmp);
        StringBuilder str = new StringBuilder();
        for (int i = 0; i < 150; ++i) {
            String line = fileIdx + ": " + sampleLine + " " + i + "\n";
            str.append(line);
            stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
        }
        stream.close();
        return new Tuple2((Object)tmp, (Object)str.toString());
    }

    static {
        actualCollectedContent = new HashMap<Integer, Set<String>>();
    }

    private class FileCreator
    extends Thread {
        private final Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<org.apache.hadoop.fs.Path>();
        private final Map<Integer, String> fileContents = new HashMap<Integer, String>();
        private long lastCreatedModTime = Long.MIN_VALUE;

        private FileCreator() {
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 5; ++i) {
                    Tuple2<org.apache.hadoop.fs.Path, String> tmpFile;
                    long modTime;
                    do {
                        Thread.sleep(50L);
                        tmpFile = ContinuousFileProcessingCheckpointITCase.this.fillWithData(localFsURI, "file", i, "This is test line.");
                        modTime = localFs.getFileStatus((org.apache.hadoop.fs.Path)tmpFile.f0).getModificationTime();
                        if (modTime > this.lastCreatedModTime) continue;
                        localFs.delete((org.apache.hadoop.fs.Path)tmpFile.f0, false);
                    } while (modTime <= this.lastCreatedModTime);
                    this.lastCreatedModTime = modTime;
                    org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(localFsURI + "/file" + i);
                    localFs.rename((org.apache.hadoop.fs.Path)tmpFile.f0, file);
                    Assert.assertTrue((boolean)localFs.exists(file));
                    this.filesCreated.add(file);
                    this.fileContents.put(i, (String)tmpFile.f1);
                }
            }
            catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }

        void clean() throws IOException {
            assert (localFs != null);
            for (org.apache.hadoop.fs.Path path : this.filesCreated) {
                localFs.delete(path, false);
            }
            this.fileContents.clear();
        }

        Map<Integer, String> getFileContent() {
            return this.fileContents;
        }
    }

    private static class TestingSinkFunction
    extends RichSinkFunction<String>
    implements ListCheckpointed<Tuple2<Long, Map<Integer, Set<String>>>>,
    CheckpointListener {
        private boolean hasRestoredAfterFailure = false;
        private volatile int successfulCheckpoints = 0;
        private long elementsToFailure;
        private long elementCounter = 0L;
        private Map<Integer, Set<String>> actualContent = new HashMap<Integer, Set<String>>();

        TestingSinkFunction() {
        }

        public void open(OpenContext openContext) throws Exception {
            Assert.assertEquals((long)1L, (long)this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks());
            long failurePosMin = 60L;
            long failurePosMax = 105L;
            this.elementsToFailure = new Random().nextLong() % (failurePosMax - failurePosMin) + failurePosMin;
        }

        public void invoke(String value) throws Exception {
            int fileIdx = this.getFileIdx(value);
            Set<String> content = this.actualContent.get(fileIdx);
            if (content == null) {
                content = new HashSet<String>();
                this.actualContent.put(fileIdx, content);
            }
            if (!content.add(value + "\n")) {
                Assert.fail((String)("Duplicate line: " + value));
                System.exit(0);
            }
            ++this.elementCounter;
            if (this.elementCounter >= 750L) {
                actualCollectedContent = this.actualContent;
                throw new SuppressRestartsException((Throwable)new SuccessException());
            }
            if (!this.hasRestoredAfterFailure && this.successfulCheckpoints < 2) {
                Thread.sleep(5L);
            }
            if (!this.hasRestoredAfterFailure && this.successfulCheckpoints >= 2 && this.elementCounter >= this.elementsToFailure) {
                throw new Exception("Task Failure @ elem: " + this.elementCounter + " / " + this.elementsToFailure);
            }
        }

        public List<Tuple2<Long, Map<Integer, Set<String>>>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            Tuple2 state = new Tuple2((Object)this.elementCounter, this.actualContent);
            return Collections.singletonList(state);
        }

        public void restoreState(List<Tuple2<Long, Map<Integer, Set<String>>>> state) throws Exception {
            Tuple2<Long, Map<Integer, Set<String>>> s = state.get(0);
            this.elementCounter = (Long)s.f0;
            this.actualContent = (Map)s.f1;
            this.hasRestoredAfterFailure = this.elementCounter != 0L;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            ++this.successfulCheckpoints;
        }

        public void notifyCheckpointAborted(long checkpointId) {
        }

        private int getFileIdx(String line) {
            String[] tkns = line.split(":");
            return Integer.parseInt(tkns[0]);
        }
    }
}

