package org.apache.flink.connector.file.sink;

import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.class */
public class StreamingExecutionFileSinkITCase extends FileSinkITBase {
    private static final Map<String, CountDownLatch> LATCH_MAP = new ConcurrentHashMap();
    private String latchId;

    /* loaded from: input_file:org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase$StreamingExecutionTestSource.class */
    private static class StreamingExecutionTestSource extends RichParallelSourceFunction<Integer> implements CheckpointListener, CheckpointedFunction {
        private final String latchId;
        private final int numberOfRecords;
        private final boolean isFailoverScenario;
        private ListState<Integer> nextValueState;
        private int nextValue;
        private volatile boolean isCanceled;
        private volatile boolean snapshottedAfterAllRecordsOutput;
        private volatile boolean isWaitingCheckpointComplete;
        private volatile boolean hasCompletedCheckpoint;

        public StreamingExecutionTestSource(String str, int i, boolean z) {
            this.latchId = str;
            this.numberOfRecords = i;
            this.isFailoverScenario = z;
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.nextValueState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("nextValue", Integer.class));
            if (this.nextValueState.get() == null || !((Iterable) this.nextValueState.get()).iterator().hasNext()) {
                return;
            }
            this.nextValue = ((Integer) ((Iterable) this.nextValueState.get()).iterator().next()).intValue();
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            if (!this.isFailoverScenario || getRuntimeContext().getAttemptNumber() != 0) {
                sendRecordsUntil(this.numberOfRecords, sourceContext);
                this.isWaitingCheckpointComplete = true;
                ((CountDownLatch) StreamingExecutionFileSinkITCase.LATCH_MAP.get(this.latchId)).await();
            } else {
                sendRecordsUntil((int) (this.numberOfRecords * 0.4d * 0.5d), sourceContext);
                while (!this.hasCompletedCheckpoint) {
                    Thread.sleep(50L);
                }
                sendRecordsUntil((int) (this.numberOfRecords * 0.4d), sourceContext);
                if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
                    throw new RuntimeException("Designated Exception");
                }
                while (true) {
                    Thread.sleep(50L);
                }
            }
        }

        private void sendRecordsUntil(int i, SourceFunction.SourceContext<Integer> sourceContext) {
            while (!this.isCanceled && this.nextValue < i) {
                synchronized (sourceContext.getCheckpointLock()) {
                    int i2 = this.nextValue;
                    this.nextValue = i2 + 1;
                    sourceContext.collect(Integer.valueOf(i2));
                }
            }
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.nextValueState.update(Collections.singletonList(Integer.valueOf(this.nextValue)));
            if (this.isWaitingCheckpointComplete) {
                this.snapshottedAfterAllRecordsOutput = true;
            }
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (this.isWaitingCheckpointComplete && this.snapshottedAfterAllRecordsOutput) {
                ((CountDownLatch) StreamingExecutionFileSinkITCase.LATCH_MAP.get(this.latchId)).countDown();
            }
            this.hasCompletedCheckpoint = true;
        }

        public void cancel() {
            this.isCanceled = true;
        }
    }

    @Before
    public void setup() {
        this.latchId = UUID.randomUUID().toString();
        LATCH_MAP.put(this.latchId, new CountDownLatch(8));
    }

    @After
    public void teardown() {
        LATCH_MAP.remove(this.latchId);
    }

    @Override // org.apache.flink.connector.file.sink.FileSinkITBase
    protected JobGraph createJobGraph(String str) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
        executionEnvironment.configure(configuration, getClass().getClassLoader());
        executionEnvironment.enableCheckpointing(10L, CheckpointingMode.EXACTLY_ONCE);
        if (this.triggerFailover) {
            executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100L)));
        } else {
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        }
        executionEnvironment.addSource(new StreamingExecutionTestSource(this.latchId, 10000, this.triggerFailover)).setParallelism(4).sinkTo(createFileSink(str)).setParallelism(3);
        return executionEnvironment.getStreamGraph().getJobGraph();
    }
}
