package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.util.AbstractTestBase;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;

@Deprecated
/* loaded from: input_file:org/apache/flink/test/streaming/runtime/SinkV2DeprecatedITCase.class */
public class SinkV2DeprecatedITCase extends AbstractTestBase {
    static final List<Integer> SOURCE_DATA = Arrays.asList(895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 630, 682, 765, 434, 970, 714, 795, 288, 422);
    static final int STREAMING_SOURCE_SEND_ELEMENTS_NUM = SOURCE_DATA.size() * 2;
    static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE = (List) SOURCE_DATA.stream().flatMap(num -> {
        return Collections.nCopies(2, Tuple3.of(num, (Object) null, Long.MIN_VALUE).toString()).stream();
    }).collect(Collectors.toList());
    static final List<String> EXPECTED_COMMITTED_DATA_IN_BATCH_MODE = (List) SOURCE_DATA.stream().map(num -> {
        return Tuple3.of(num, (Object) null, Long.MIN_VALUE).toString();
    }).collect(Collectors.toList());
    static final Queue<Committer.CommitRequest<String>> COMMIT_QUEUE = new ConcurrentLinkedQueue();
    static final BooleanSupplier COMMIT_QUEUE_RECEIVE_ALL_DATA = (BooleanSupplier) ((Serializable) () -> {
        return COMMIT_QUEUE.size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM;
    });

    @Before
    public void init() {
        COMMIT_QUEUE.clear();
    }

    @Test
    public void writerAndCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment buildStreamEnv = buildStreamEnv();
        buildStreamEnv.addSource(new FiniteTestSource(COMMIT_QUEUE_RECEIVE_ALL_DATA, SOURCE_DATA), IntegerTypeInfo.INT_TYPE_INFO).sinkTo(TestSinkV2.newBuilder().setDefaultCommitter((Supplier) ((Serializable) () -> {
            return COMMIT_QUEUE;
        })).build());
        buildStreamEnv.execute();
        MatcherAssert.assertThat(COMMIT_QUEUE.stream().map((v0) -> {
            return v0.getCommittable();
        }).collect(Collectors.toList()), Matchers.containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
    }

    @Test
    public void writerAndCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment buildBatchEnv = buildBatchEnv();
        buildBatchEnv.fromData(SOURCE_DATA).sinkTo(TestSinkV2.newBuilder().setDefaultCommitter((Supplier) ((Serializable) () -> {
            return COMMIT_QUEUE;
        })).build());
        buildBatchEnv.execute();
        MatcherAssert.assertThat(COMMIT_QUEUE.stream().map((v0) -> {
            return v0.getCommittable();
        }).collect(Collectors.toList()), Matchers.containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
    }

    private StreamExecutionEnvironment buildStreamEnv() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.enableCheckpointing(100L);
        return executionEnvironment;
    }

    private StreamExecutionEnvironment buildBatchEnv() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        return executionEnvironment;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1376259775:
                if (implMethodName.equals("lambda$writerAndCommitterExecuteInBatchMode$46f190a3$1")) {
                    z = 2;
                    break;
                }
                break;
            case 181681474:
                if (implMethodName.equals("lambda$static$6e9ae060$1")) {
                    z = true;
                    break;
                }
                break;
            case 1183610777:
                if (implMethodName.equals("lambda$writerAndCommitterExecuteInStreamingMode$46f190a3$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Supplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkV2DeprecatedITCase") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/Queue;")) {
                    return () -> {
                        return COMMIT_QUEUE;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/BooleanSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("getAsBoolean") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Z") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkV2DeprecatedITCase") && serializedLambda.getImplMethodSignature().equals("()Z")) {
                    return () -> {
                        return COMMIT_QUEUE.size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Supplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/SinkV2DeprecatedITCase") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/Queue;")) {
                    return () -> {
                        return COMMIT_QUEUE;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
