/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.connector.datagen.source.TestDataGenerators;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractListAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class SinkITCase
extends AbstractTestBase {
    static final List<Integer> SOURCE_DATA = Arrays.asList(895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 630, 682, 765, 434, 970, 714, 795, 288, 422);
    static final int STREAMING_SOURCE_SEND_ELEMENTS_NUM = SOURCE_DATA.size() * 2;
    static final String[] EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE = (String[])SOURCE_DATA.stream().flatMap(x -> Collections.nCopies(2, Tuple3.of((Object)x, null, (Object)Long.MIN_VALUE).toString()).stream()).toArray(String[]::new);
    static final String[] EXPECTED_COMMITTED_DATA_IN_BATCH_MODE = (String[])SOURCE_DATA.stream().map(x -> Tuple3.of((Object)x, null, (Object)Long.MIN_VALUE).toString()).toArray(String[]::new);
    static final String[] EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE = (String[])SOURCE_DATA.stream().flatMap(x -> Collections.nCopies(2, Tuple3.of((Object)x, null, (Object)Long.MIN_VALUE).toString()).stream()).toArray(String[]::new);
    static final String EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE = SOURCE_DATA.stream().map(x -> Tuple3.of((Object)x, null, (Object)Long.MIN_VALUE).toString()).sorted().collect(Collectors.joining("+"));
    static final Queue<String> COMMIT_QUEUE = new ConcurrentLinkedQueue<String>();
    static final Queue<String> GLOBAL_COMMIT_QUEUE = new ConcurrentLinkedQueue<String>();
    static final BooleanSupplier COMMIT_QUEUE_RECEIVE_ALL_DATA = (BooleanSupplier & Serializable)() -> COMMIT_QUEUE.size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM;
    static final BooleanSupplier GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA = (BooleanSupplier & Serializable)() -> SinkITCase.getSplitGlobalCommittedData().size() == STREAMING_SOURCE_SEND_ELEMENTS_NUM;
    static final BooleanSupplier BOTH_QUEUE_RECEIVE_ALL_DATA = (BooleanSupplier & Serializable)() -> COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean() && GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean();
    @RegisterExtension
    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();

    SinkITCase() {
    }

    @BeforeEach
    public void init() {
        COMMIT_QUEUE.clear();
        GLOBAL_COMMIT_QUEUE.clear();
    }

    @Test
    void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment env = this.buildStreamEnv();
        DataStreamSource stream = env.fromSource((Source)TestDataGenerators.fromDataWithSnapshotsLatch(SOURCE_DATA, (TypeInformation)Types.INT, (BooleanSupplier)BOTH_QUEUE_RECEIVE_ALL_DATA), WatermarkStrategy.noWatermarks(), "Test Source");
        stream.sinkTo((Sink)TestSink.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).setGlobalCommitter((Supplier<Queue> & Serializable)() -> GLOBAL_COMMIT_QUEUE).build());
        this.executeAndVerifyStreamGraph(env);
        GLOBAL_COMMIT_QUEUE.remove("end of input");
        Assertions.assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE);
        Assertions.assertThat(SinkITCase.getSplitGlobalCommittedData()).containsExactlyInAnyOrder((Object[])EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE);
    }

    @Test
    void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment env = this.buildBatchEnv();
        env.fromData(SOURCE_DATA).sinkTo((Sink)TestSink.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).setGlobalCommitter((Supplier<Queue> & Serializable)() -> GLOBAL_COMMIT_QUEUE).build());
        this.executeAndVerifyStreamGraph(env);
        Assertions.assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_BATCH_MODE);
        Assertions.assertThat(GLOBAL_COMMIT_QUEUE).containsExactlyInAnyOrder((Object[])new String[]{EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE});
    }

    @Test
    void writerAndCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment env = this.buildStreamEnv();
        DataStreamSource stream = env.fromSource((Source)TestDataGenerators.fromDataWithSnapshotsLatch(SOURCE_DATA, (TypeInformation)Types.INT, (BooleanSupplier)COMMIT_QUEUE_RECEIVE_ALL_DATA), WatermarkStrategy.noWatermarks(), "Test Source");
        stream.sinkTo((Sink)TestSink.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).build());
        this.executeAndVerifyStreamGraph(env);
        Assertions.assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE);
    }

    @Test
    void duplicateEndInput() throws Exception {
        int maxAttempts = 10;
        Configuration conf = new Configuration();
        conf.set(CheckpointingOptions.TOLERABLE_FAILURE_NUMBER, (Object)maxAttempts);
        conf.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"fixed-delay");
        conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)maxAttempts);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        env.enableCheckpointing(100L);
        AtomicBoolean failedOnce = new AtomicBoolean(false);
        ArrayList committed = new ArrayList();
        FailingOnceCommitter committer = new FailingOnceCommitter((SharedReference<AtomicBoolean>)this.sharedObjects.add((Object)failedOnce), (SharedReference<List<String>>)this.sharedObjects.add(committed));
        env.fromData(new Object[]{"bounded"}).sinkTo((org.apache.flink.api.connector.sink2.Sink)TestSinkV2.newBuilder().setCommitter((TestSinkV2.DefaultCommitter)committer).build());
        JobClient jobClient = env.executeAsync();
        jobClient.getJobExecutionResult().get();
        Assertions.assertThat((Comparable)((Comparable)jobClient.getJobStatus().get())).isEqualTo((Object)JobStatus.FINISHED);
        Assertions.assertThat((AtomicBoolean)failedOnce).isTrue();
        Assertions.assertThat(committed).isNotEmpty();
    }

    @Test
    void writerAndCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment env = this.buildBatchEnv();
        env.fromData(SOURCE_DATA).sinkTo((Sink)TestSink.newBuilder().setDefaultCommitter((Supplier<Queue> & Serializable)() -> COMMIT_QUEUE).build());
        this.executeAndVerifyStreamGraph(env);
        Assertions.assertThat(COMMIT_QUEUE).containsExactlyInAnyOrder((Object[])EXPECTED_COMMITTED_DATA_IN_BATCH_MODE);
    }

    @Test
    void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception {
        StreamExecutionEnvironment env = this.buildStreamEnv();
        DataStreamSource stream = env.fromSource((Source)TestDataGenerators.fromDataWithSnapshotsLatch(SOURCE_DATA, (TypeInformation)Types.INT, (BooleanSupplier)GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA), WatermarkStrategy.noWatermarks(), "Test Source");
        stream.sinkTo((Sink)TestSink.newBuilder().setGlobalCommitter((Supplier<Queue> & Serializable)() -> GLOBAL_COMMIT_QUEUE).build());
        this.executeAndVerifyStreamGraph(env);
        GLOBAL_COMMIT_QUEUE.remove("end of input");
        Assertions.assertThat(SinkITCase.getSplitGlobalCommittedData()).containsExactlyInAnyOrder((Object[])EXPECTED_GLOBAL_COMMITTED_DATA_IN_STREAMING_MODE);
    }

    @Test
    void writerAndGlobalCommitterExecuteInBatchMode() throws Exception {
        StreamExecutionEnvironment env = this.buildBatchEnv();
        env.fromData(SOURCE_DATA).sinkTo((Sink)TestSink.newBuilder().setGlobalCommitter((Supplier<Queue> & Serializable)() -> GLOBAL_COMMIT_QUEUE).build());
        this.executeAndVerifyStreamGraph(env);
        Assertions.assertThat(GLOBAL_COMMIT_QUEUE).containsExactlyInAnyOrder((Object[])new String[]{EXPECTED_GLOBAL_COMMITTED_DATA_IN_BATCH_MODE});
    }

    private void executeAndVerifyStreamGraph(StreamExecutionEnvironment env) throws Exception {
        StreamGraph streamGraph = env.getStreamGraph();
        this.assertNoUnalignedCheckpointInSink(streamGraph);
        env.execute(streamGraph);
    }

    private void assertNoUnalignedCheckpointInSink(StreamGraph streamGraph) {
        ((AbstractListAssert)((AbstractCollectionAssert)Assertions.assertThat((Collection)streamGraph.getStreamNodes()).filteredOn(t -> t.getOperatorName().contains("Sink"))).flatMap(StreamNode::getOutEdges).allMatch(e -> !e.supportsUnalignedCheckpoints())).isNotEmpty();
    }

    private static List<String> getSplitGlobalCommittedData() {
        return GLOBAL_COMMIT_QUEUE.stream().flatMap(x -> Arrays.stream(x.split("\\+"))).collect(Collectors.toList());
    }

    private StreamExecutionEnvironment buildStreamEnv() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.enableCheckpointing(100L);
        return env;
    }

    private StreamExecutionEnvironment buildBatchEnv() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        return env;
    }

    static class FailingOnceCommitter
    extends TestSinkV2.DefaultCommitter {
        private final SharedReference<AtomicBoolean> failedOnce;
        private final SharedReference<List<String>> committed;

        public FailingOnceCommitter(SharedReference<AtomicBoolean> failedOnce, SharedReference<List<String>> committed) {
            this.failedOnce = failedOnce;
            this.committed = committed;
        }

        public void commit(Collection<Committer.CommitRequest<String>> committables) {
            if (((AtomicBoolean)this.failedOnce.get()).compareAndSet(false, true)) {
                throw new RuntimeException("Fail to commit");
            }
            for (Committer.CommitRequest<String> committable : committables) {
                ((List)this.committed.get()).add(committable.getCommittable());
            }
        }
    }
}

