package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.connector.datagen.source.TestDataGenerators;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/SinkITCase.class */
class SinkITCase extends AbstractTestBase {
    static final List<Integer> SOURCE_DATA = Arrays.asList(895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 630, 682, 765, 434, 970, 714, 795, 288, 422);
    static final int STREAMING_SOURCE_SEND_ELEMENTS_NUM = SOURCE_DATA.size() * 2;
    static final String[] EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE = (String[]) SOURCE_DATA.stream().flatMap(num -> {
        return Collections.nCopies(2, Tuple3.of(num, (Object) null, Long.MIN_VALUE).toString()).stream();
    }).toArray(i -> {
        return new String[i];
    });
    static final String[] EXPECTED_COMMITTED_DATA_IN_BATCH_MODE = (String[]) SOURCE_DATA.stream().map(num -> {
        return Tuple3.of(num, (Object) null, Long.MIN_VALUE).toString();
    }).toArray(i -> {
        return new String[i];
    });
    static final String[] EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE = (String[]) SOURCE_DATA.stream().flatMap(num -> {
        return Collections.nCopies(2, Tuple3.of(num, (Object) null, Long.MIN_VALUE).toString()).stream();
    }).toArray(i -> {
        return new String[i];
    });
    static final String EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE = (String) SOURCE_DATA.stream().map(num -> {
        return Tuple3.of(num, (Object) null, Long.MIN_VALUE).toString();
    }).sorted().collect(Collectors.joining("+"));
    static final Queue<String> COMMIT_QUEUE = new ConcurrentLinkedQueue();
    static final Queue<String> GLOBAL_COMMIT_QUEUE = new ConcurrentLinkedQueue();
    static final BooleanSupplier COMMIT_QUEUE_RECEIVE_ALL_DATA = (BooleanSupplier) ((Serializable) () -> {
        return COMMIT_QUEUE.size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM;
    });
    static final BooleanSupplier GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA = (BooleanSupplier) ((Serializable) () -> {
        return getSplitGlobalCommittedData().size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM;
    });
    static final BooleanSupplier BOTH_QUEUE_RECEIVE_ALL_DATA = (BooleanSupplier) ((Serializable) () -> {
        return COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean() && GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean();
    });

    @RegisterExtension
    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/SinkITCase$FailingOnceCommitter.class */
    static class FailingOnceCommitter extends TestSinkV2.DefaultCommitter {
        private final SharedReference<AtomicBoolean> failedOnce;
        private final SharedReference<List<String>> committed;

        public FailingOnceCommitter(SharedReference<AtomicBoolean> sharedReference, SharedReference<List<String>> sharedReference2) {
            this.failedOnce = sharedReference;
            this.committed = sharedReference2;
        }

        public void commit(Collection<Committer.CommitRequest<String>> collection) {
            if (((AtomicBoolean) this.failedOnce.get()).compareAndSet(false, true)) {
                throw new RuntimeException("Fail to commit");
            }
            Iterator<Committer.CommitRequest<String>> it = collection.iterator();
            while (it.hasNext()) {
                ((List) this.committed.get()).add(it.next().getCommittable());
            }
        }
    }

    SinkITCase() {
    }

    @BeforeEach
    public void init() {
        COMMIT_QUEUE.clear();
        GLOBAL_COMMIT_QUEUE.clear();
    }

    @Test
    void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment buildStreamEnv = buildStreamEnv();
        buildStreamEnv.fromSource(TestDataGenerators.fromDataWithSnapshotsLatch(SOURCE_DATA, Types.INT, BOTH_QUEUE_RECEIVE_ALL_DATA), WatermarkStrategy.noWatermarks(), "Test Source").sinkTo(TestSink.newBuilder().setDefaultCommitter((Supplier) ((Serializable) () -> {
            return COMMIT_QUEUE;
        })).setGlobalCommitter((Supplier) ((Serializable) () -> {
            return GLOBAL_COMMIT_QUEUE;
        })).build());
        executeAndVerifyStreamGraph(buildStreamEnv);
        GLOBAL_COMMIT_QUEUE.remove("end of input");
        Assertions.assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE);
        Assertions.assertThat(getSplitGlobalCommittedData()).containsExactlyInAnyOrder(EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE);
    }

    @Test
    void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment buildBatchEnv = buildBatchEnv();
        buildBatchEnv.fromData(SOURCE_DATA).sinkTo(TestSink.newBuilder().setDefaultCommitter((Supplier) ((Serializable) () -> {
            return COMMIT_QUEUE;
        })).setGlobalCommitter((Supplier) ((Serializable) () -> {
            return GLOBAL_COMMIT_QUEUE;
        })).build());
        executeAndVerifyStreamGraph(buildBatchEnv);
        Assertions.assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE);
        Assertions.assertThat(GLOBAL_COMMIT_QUEUE).containsExactlyInAnyOrder(new String[]{EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE});
    }

    @Test
    void writerAndCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment buildStreamEnv = buildStreamEnv();
        buildStreamEnv.fromSource(TestDataGenerators.fromDataWithSnapshotsLatch(SOURCE_DATA, Types.INT, COMMIT_QUEUE_RECEIVE_ALL_DATA), WatermarkStrategy.noWatermarks(), "Test Source").sinkTo(TestSink.newBuilder().setDefaultCommitter((Supplier) ((Serializable) () -> {
            return COMMIT_QUEUE;
        })).build());
        executeAndVerifyStreamGraph(buildStreamEnv);
        Assertions.assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE);
    }

    @Test
    void duplicateEndInput() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.TOLERABLE_FAILURE_NUMBER, 10);
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 10);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.enableCheckpointing(100L);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ArrayList arrayList = new ArrayList();
        executionEnvironment.fromData(new Object[]{"bounded"}).sinkTo(TestSinkV2.newBuilder().setCommitter(new FailingOnceCommitter(this.sharedObjects.add(atomicBoolean), this.sharedObjects.add(arrayList))).build());
        JobClient executeAsync = executionEnvironment.executeAsync();
        executeAsync.getJobExecutionResult().get();
        Assertions.assertThat((Comparable) executeAsync.getJobStatus().get()).isEqualTo(JobStatus.FINISHED);
        Assertions.assertThat(atomicBoolean).isTrue();
        Assertions.assertThat(arrayList).isNotEmpty();
    }

    @Test
    void writerAndCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment buildBatchEnv = buildBatchEnv();
        buildBatchEnv.fromData(SOURCE_DATA).sinkTo(TestSink.newBuilder().setDefaultCommitter((Supplier) ((Serializable) () -> {
            return COMMIT_QUEUE;
        })).build());
        executeAndVerifyStreamGraph(buildBatchEnv);
        Assertions.assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE);
    }

    @Test
    void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment buildStreamEnv = buildStreamEnv();
        buildStreamEnv.fromSource(TestDataGenerators.fromDataWithSnapshotsLatch(SOURCE_DATA, Types.INT, GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA), WatermarkStrategy.noWatermarks(), "Test Source").sinkTo(TestSink.newBuilder().setGlobalCommitter((Supplier) ((Serializable) () -> {
            return GLOBAL_COMMIT_QUEUE;
        })).build());
        executeAndVerifyStreamGraph(buildStreamEnv);
        GLOBAL_COMMIT_QUEUE.remove("end of input");
        Assertions.assertThat(getSplitGlobalCommittedData()).containsExactlyInAnyOrder(EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE);
    }

    @Test
    void writerAndGlobalCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment buildBatchEnv = buildBatchEnv();
        buildBatchEnv.fromData(SOURCE_DATA).sinkTo(TestSink.newBuilder().setGlobalCommitter((Supplier) ((Serializable) () -> {
            return GLOBAL_COMMIT_QUEUE;
        })).build());
        executeAndVerifyStreamGraph(buildBatchEnv);
        Assertions.assertThat(GLOBAL_COMMIT_QUEUE).containsExactlyInAnyOrder(new String[]{EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE});
    }

    private void executeAndVerifyStreamGraph(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
        StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph();
        assertNoUnalignedCheckpointInSink(streamGraph);
        streamExecutionEnvironment.execute(streamGraph);
    }

    private void assertNoUnalignedCheckpointInSink(StreamGraph streamGraph) {
        Assertions.assertThat(streamGraph.getStreamNodes()).filteredOn(streamNode -> {
            return streamNode.getOperatorName().contains("Sink");
        }).flatMap((v0) -> {
            return v0.getOutEdges();
        }).allMatch(streamEdge -> {
            return !streamEdge.supportsUnalignedCheckpoints();
        }).isNotEmpty();
    }

    private static List<String> getSplitGlobalCommittedData() {
        return (List) GLOBAL_COMMIT_QUEUE.stream().flatMap(str -> {
            return Arrays.stream(str.split("\\+"));
        }).collect(Collectors.toList());
    }

    private StreamExecutionEnvironment buildStreamEnv() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.enableCheckpointing(100L);
        return executionEnvironment;
    }

    private StreamExecutionEnvironment buildBatchEnv() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        return executionEnvironment;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2071743100:
                if (implMethodName.equals("lambda$writerAndGlobalCommitterExecuteInBatchMode$46f190a3$1")) {
                    z = 4;
                    break;
                }
                break;
            case -1387896228:
                if (implMethodName.equals("lambda$writerAndGlobalCommitterExecuteInStreamingMode$46f190a3$1")) {
                    z = 8;
                    break;
                }
                break;
            case -1376259775:
                if (implMethodName.equals("lambda$writerAndCommitterExecuteInBatchMode$46f190a3$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1140154991:
                if (implMethodName.equals("lambda$writerAndCommitterAndGlobalCommitterExecuteInBatchMode$46f190a3$1")) {
                    z = 9;
                    break;
                }
                break;
            case -1140154990:
                if (implMethodName.equals("lambda$writerAndCommitterAndGlobalCommitterExecuteInBatchMode$46f190a3$2")) {
                    z = 5;
                    break;
                }
                break;
            case -754435186:
                if (implMethodName.equals("lambda$static$1b1029a2$1")) {
                    z = 2;
                    break;
                }
                break;
            case -202749033:
                if (implMethodName.equals("lambda$static$9b259bea$1")) {
                    z = 10;
                    break;
                }
                break;
            case 181681474:
                if (implMethodName.equals("lambda$static$6e9ae060$1")) {
                    z = true;
                    break;
                }
                break;
            case 1183610777:
                if (implMethodName.equals("lambda$writerAndCommitterExecuteInStreamingMode$46f190a3$1")) {
                    z = false;
                    break;
                }
                break;
            case 2010151913:
                if (implMethodName.equals("lambda$writerAndCommitterAndGlobalCommitterExecuteInStreamingMode$46f190a3$1")) {
                    z = 6;
                    break;
                }
                break;
            case 2010151914:
                if (implMethodName.equals("lambda$writerAndCommitterAndGlobalCommitterExecuteInStreamingMode$46f190a3$2")) {
                    z = 7;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Supplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkITCase") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/Queue;")) {
                    return () -> {
                        return COMMIT_QUEUE;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/BooleanSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("getAsBoolean") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Z") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkITCase") && serializedLambda.getImplMethodSignature().equals("()Z")) {
                    return () -> {
                        return COMMIT_QUEUE.size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/BooleanSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("getAsBoolean") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Z") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkITCase") && serializedLambda.getImplMethodSignature().equals("()Z")) {
                    return () -> {
                        return getSplitGlobalCommittedData().size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Supplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkITCase") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/Queue;")) {
                    return () -> {
                        return COMMIT_QUEUE;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Supplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkITCase") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/Queue;")) {
                    return () -> {
                        return GLOBAL_COMMIT_QUEUE;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Supplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkITCase") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/Queue;")) {
                    return () -> {
                        return GLOBAL_COMMIT_QUEUE;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Supplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkITCase") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/Queue;")) {
                    return () -> {
                        return COMMIT_QUEUE;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Supplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkITCase") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/Queue;")) {
                    return () -> {
                        return GLOBAL_COMMIT_QUEUE;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Supplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkITCase") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/Queue;")) {
                    return () -> {
                        return GLOBAL_COMMIT_QUEUE;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Supplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkITCase") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/Queue;")) {
                    return () -> {
                        return COMMIT_QUEUE;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/BooleanSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("getAsBoolean") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Z") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkITCase") && serializedLambda.getImplMethodSignature().equals("()Z")) {
                    return () -> {
                        return COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean() && GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
