package org.apache.flink.connector.file.sink.writer;

import java.lang.invoke.SerializedLambda;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.class */
class FileSinkMigrationITCase {

    @RegisterExtension
    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
    private static final String SOURCE_UID = "source";
    private static final String SINK_UID = "sink";
    private static final int NUM_SOURCES = 4;
    private static final int NUM_SINKS = 3;
    private static final int NUM_RECORDS = 10000;
    private static final int NUM_BUCKETS = 4;
    private SharedReference<CountDownLatch> finalCheckpointLatch;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase$StatefulSource.class */
    public static class StatefulSource extends RichParallelSourceFunction<Integer> implements CheckpointedFunction, CheckpointListener {
        private final boolean takingSavepointMode;
        private SharedReference<CountDownLatch> finalCheckpointLatch;
        private ListState<Integer> nextValueState;
        private int nextValue;
        private volatile boolean snapshottedAfterAllRecordsOutput;
        private volatile boolean isWaitingCheckpointComplete;
        private volatile boolean isCanceled;

        public StatefulSource(boolean z, SharedReference<CountDownLatch> sharedReference) {
            this.takingSavepointMode = z;
            this.finalCheckpointLatch = sharedReference;
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.nextValueState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("nextValue", Integer.class));
            if (this.nextValueState.get() == null || !((Iterable) this.nextValueState.get()).iterator().hasNext()) {
                return;
            }
            this.nextValue = ((Integer) ((Iterable) this.nextValueState.get()).iterator().next()).intValue();
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            if (!this.takingSavepointMode) {
                sendRecordsUntil(FileSinkMigrationITCase.NUM_RECORDS, 0, sourceContext);
                this.isWaitingCheckpointComplete = true;
                ((CountDownLatch) this.finalCheckpointLatch.get()).await();
            } else {
                sendRecordsUntil(3333, 0, sourceContext);
                sendRecordsUntil(5000, 100, sourceContext);
                while (true) {
                    Thread.sleep(5000L);
                }
            }
        }

        private void sendRecordsUntil(int i, int i2, SourceFunction.SourceContext<Integer> sourceContext) throws InterruptedException {
            while (!this.isCanceled && this.nextValue < i) {
                synchronized (sourceContext.getCheckpointLock()) {
                    int i3 = this.nextValue;
                    this.nextValue = i3 + 1;
                    sourceContext.collect(Integer.valueOf(i3));
                }
                if (i2 > 0) {
                    Thread.sleep(i2);
                }
            }
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.nextValueState.update(Collections.singletonList(Integer.valueOf(this.nextValue)));
            if (this.isWaitingCheckpointComplete) {
                this.snapshottedAfterAllRecordsOutput = true;
            }
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (this.isWaitingCheckpointComplete && this.snapshottedAfterAllRecordsOutput) {
                ((CountDownLatch) this.finalCheckpointLatch.get()).countDown();
            }
        }

        public void cancel() {
            this.isCanceled = true;
        }
    }

    FileSinkMigrationITCase() {
    }

    @BeforeEach
    void setup() {
        this.finalCheckpointLatch = this.sharedObjects.add(new CountDownLatch(8));
    }

    @Test
    void test() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SharedReference add = this.sharedObjects.add(new ArrayList());
        executionEnvironment.setParallelism(100);
        executionEnvironment.fromSequence(0L, NUM_RECORDS).map(l -> {
            return (Boolean) add.applySync(collection -> {
                return Boolean.valueOf(collection.add(l));
            });
        });
        executionEnvironment.execute();
        Assertions.assertThat((Collection) add.get()).hasSize(NUM_RECORDS + 1);
        Assertions.assertThat((List) LongStream.rangeClosed(0L, NUM_RECORDS).boxed().collect(Collectors.toList())).isEqualTo(((Collection) add.get()).stream().sorted().collect(Collectors.toList()));
    }

    @Test
    void testMigration(@TempDir Path path, @TempDir Path path2) throws Exception {
        String path3 = path.toString();
        String path4 = path2.toString();
        MiniClusterConfiguration build = new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(4).build();
        loadSavepointAndExecute(build, createFileSinkJobGraph(path3), executeAndTakeSavepoint(build, createStreamingFileSinkJobGraph(path3), path4));
        IntegerFileSinkTestDataUtils.checkIntegerSequenceSinkOutput(path3, NUM_RECORDS, 4, 4);
    }

    private JobGraph createStreamingFileSinkJobGraph(String str) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.addSource(new StatefulSource(true, this.finalCheckpointLatch)).uid(SOURCE_UID).setParallelism(4).addSink(StreamingFileSink.forRowFormat(new org.apache.flink.core.fs.Path(str), new IntegerFileSinkTestDataUtils.IntEncoder()).withBucketAssigner(new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(4)).withRollingPolicy(OnCheckpointRollingPolicy.build()).build()).setParallelism(3).uid(SINK_UID);
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    private JobGraph createFileSinkJobGraph(String str) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.addSource(new StatefulSource(false, this.finalCheckpointLatch)).uid(SOURCE_UID).setParallelism(4).sinkTo(FileSink.forRowFormat(new org.apache.flink.core.fs.Path(str), new IntegerFileSinkTestDataUtils.IntEncoder()).withBucketAssigner(new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(4)).withRollingPolicy(OnCheckpointRollingPolicy.build()).build()).setParallelism(3).uid(SINK_UID);
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    private String executeAndTakeSavepoint(MiniClusterConfiguration miniClusterConfiguration, JobGraph jobGraph, String str) throws Exception {
        MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
        Throwable th = null;
        try {
            try {
                miniCluster.start();
                JobID jobID = ((JobSubmissionResult) miniCluster.submitJob(jobGraph).get()).getJobID();
                CommonTestUtils.waitForAllTaskRunning(miniCluster, jobID, false);
                String str2 = (String) miniCluster.triggerSavepoint(jobID, str, true, SavepointFormatType.CANONICAL).get();
                if (miniCluster != null) {
                    if (0 != 0) {
                        try {
                            miniCluster.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        miniCluster.close();
                    }
                }
                return str2;
            } finally {
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (th != null) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    private void loadSavepointAndExecute(MiniClusterConfiguration miniClusterConfiguration, JobGraph jobGraph, String str) throws Exception {
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str, false));
        MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
        Throwable th = null;
        try {
            miniCluster.start();
            miniCluster.executeJobBlocking(jobGraph);
            if (miniCluster != null) {
                if (0 == 0) {
                    miniCluster.close();
                    return;
                }
                try {
                    miniCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (0 != 0) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 848566258:
                if (implMethodName.equals("lambda$test$ee12da8$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/testutils/junit/SharedReference;Ljava/lang/Long;)Ljava/lang/Boolean;")) {
                    SharedReference sharedReference = (SharedReference) serializedLambda.getCapturedArg(0);
                    return l -> {
                        return (Boolean) sharedReference.applySync(collection -> {
                            return Boolean.valueOf(collection.add(l));
                        });
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
