package org.apache.flink.test.recovery;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.class */
public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailureRecoveryTest {
    private static final int DATA_COUNT = 10000;

    /* loaded from: input_file:org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase$SleepyDurableGenerateSequence.class */
    public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> {
        private static final long SLEEP_TIME = 50;
        private final File coordinateDir;
        private final long end;

        public SleepyDurableGenerateSequence(File file, long j) {
            this.coordinateDir = file;
            this.end = j;
        }

        public void run(Collector<Long> collector) throws Exception {
            OperatorState operatorState;
            StreamingRuntimeContext runtimeContext = getRuntimeContext();
            if (runtimeContext.containsState("collected")) {
                operatorState = runtimeContext.getState("collected");
            } else {
                operatorState = new OperatorState(0L);
                runtimeContext.registerState("collected", operatorState);
            }
            long numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
            long indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
            long j = this.end % numberOfParallelSubtasks > indexOfThisSubtask ? (this.end / numberOfParallelSubtasks) + 1 : this.end / numberOfParallelSubtasks;
            File file = new File(this.coordinateDir, "proceed");
            boolean z = true;
            for (long longValue = ((Long) operatorState.getState()).longValue(); longValue < j; longValue++) {
                if (z) {
                    if (file.exists()) {
                        z = false;
                    } else {
                        Thread.sleep(SLEEP_TIME);
                    }
                }
                collector.collect(Long.valueOf((longValue * numberOfParallelSubtasks) + indexOfThisSubtask));
                operatorState.update(Long.valueOf(longValue));
            }
        }

        public void cancel() {
        }
    }

    @Override // org.apache.flink.test.recovery.AbstractProcessFailureRecoveryTest
    public void testProgram(int i, final File file) throws Exception {
        final File file2 = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), UUID.randomUUID().toString());
        Assert.assertTrue("Cannot create directory for temp output", file2.mkdirs());
        StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", i, new String[0]);
        createRemoteEnvironment.setParallelism(4);
        createRemoteEnvironment.setNumberOfExecutionRetries(1);
        createRemoteEnvironment.enableCheckpointing(200L);
        createRemoteEnvironment.addSource(new SleepyDurableGenerateSequence(file, 10000L)).map(new RichMapFunction<Long, Long>() { // from class: org.apache.flink.test.recovery.ProcessFailureStreamingRecoveryITCase.1
            private boolean markerCreated = false;

            public Long map(Long l) throws Exception {
                if (!this.markerCreated) {
                    AbstractProcessFailureRecoveryTest.touchFile(new File(file, "ready_" + getRuntimeContext().getIndexOfThisSubtask()));
                    this.markerCreated = true;
                }
                return l;
            }
        }).addSink(new RichSinkFunction<Long>() { // from class: org.apache.flink.test.recovery.ProcessFailureStreamingRecoveryITCase.2
            private transient FileChannel writer;

            public void open(Configuration configuration) throws IOException {
                this.writer = new RandomAccessFile(new File(file2, "task-" + getRuntimeContext().getIndexOfThisSubtask() + "-" + UUID.randomUUID().toString()), "rws").getChannel();
            }

            public void invoke(Long l) throws Exception {
                this.writer.write(ByteBuffer.wrap((l + "\n").getBytes(Charset.defaultCharset())));
            }

            public void close() throws Exception {
                this.writer.close();
            }
        });
        try {
            createRemoteEnvironment.execute();
            fileBatchHasEveryNumberLower(4, DATA_COUNT, file2);
            if (file2.exists()) {
                FileUtils.deleteDirectory(file2);
            }
        } catch (Throwable th) {
            if (file2.exists()) {
                FileUtils.deleteDirectory(file2);
            }
            throw th;
        }
    }

    private static void fileBatchHasEveryNumberLower(int i, int i2, File file) throws IOException {
        HashSet hashSet = new HashSet(i2);
        File[] listFiles = file.listFiles();
        Assert.assertNotNull(listFiles);
        Assert.assertTrue("Not enough output files", listFiles.length >= i);
        for (File file2 : listFiles) {
            Assert.assertTrue("Output file does not exist", file2.exists());
            BufferedReader bufferedReader = new BufferedReader(new FileReader(file2));
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine != null) {
                    hashSet.add(Integer.valueOf(Integer.parseInt(readLine)));
                }
            }
            bufferedReader.close();
        }
        for (int i3 = 0; i3 < i2; i3++) {
            if (!hashSet.contains(Integer.valueOf(i3))) {
                Assert.fail("Missing number: " + i3);
            }
        }
    }
}
