/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.Collections;
import java.util.List;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class CommitterOperatorTest {
    CommitterOperatorTest() {
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testEmitCommittables(boolean withPostCommitTopology) throws Exception {
        ForwardingCommitter committer = new ForwardingCommitter();
        Sink<Integer> sink = withPostCommitTopology ? TestSink.newBuilder().setCommitter(committer).setDefaultGlobalCommitter().setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).build().asV2() : TestSink.newBuilder().setCommitter(committer).setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).build().asV2();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new CommitterOperatorFactory((TwoPhaseCommittingSink)sink, false, true));
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, Long.valueOf(1L), 1, 1, 0);
        testHarness.processElement(new StreamRecord((Object)committableSummary));
        CommittableWithLineage committableWithLineage = new CommittableWithLineage((Object)"1", Long.valueOf(1L), 1);
        testHarness.processElement(new StreamRecord((Object)committableWithLineage));
        testHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat((int)committer.getSuccessfulCommits()).isEqualTo(1);
        if (withPostCommitTopology) {
            List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
            SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(output.get(0))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables()).hasPendingCommittables(0);
            SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(1))).isEqualTo(this.copyCommittableWithDifferentOrigin(committableWithLineage, 0));
        } else {
            Assertions.assertThat(testHarness.getOutput()).isEmpty();
        }
        testHarness.close();
    }

    @Test
    void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws Exception {
        ForwardingCommitter committer = new ForwardingCommitter();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(committer, false, true);
        testHarness.open();
        testHarness.setProcessingTime(0L);
        CommittableSummary committableSummary = new CommittableSummary(1, 1, Long.valueOf(1L), 2, 2, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary));
        CommittableWithLineage first = new CommittableWithLineage((Object)"1", Long.valueOf(1L), 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)first));
        testHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        Assertions.assertThat((int)committer.getSuccessfulCommits()).isEqualTo(0);
        CommittableWithLineage second = new CommittableWithLineage((Object)"2", Long.valueOf(1L), 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)second));
        testHarness.getProcessingTimeService().setCurrentTime(2000L);
        List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(3);
        Assertions.assertThat((int)committer.getSuccessfulCommits()).isEqualTo(2);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(output.get(0))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables()).hasPendingCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(1))).isEqualTo(this.copyCommittableWithDifferentOrigin(first, 0));
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(2))).isEqualTo(this.copyCommittableWithDifferentOrigin(second, 0));
        testHarness.close();
    }

    @Test
    void testImmediatelyCommitLateCommittables() throws Exception {
        ForwardingCommitter committer = new ForwardingCommitter();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(committer, false, true);
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, Long.valueOf(1L), 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary));
        testHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        CommittableWithLineage first = new CommittableWithLineage((Object)"1", Long.valueOf(1L), 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)first));
        List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(2);
        Assertions.assertThat((int)committer.getSuccessfulCommits()).isEqualTo(1);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(output.get(0))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables()).hasPendingCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(1))).isEqualTo(this.copyCommittableWithDifferentOrigin(first, 0));
        testHarness.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception {
        ForwardingCommitter committer = new ForwardingCommitter();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(committer, isBatchMode, !isBatchMode);
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 2, null, 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary));
        CommittableSummary committableSummary2 = new CommittableSummary(2, 2, null, 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary2));
        CommittableWithLineage first = new CommittableWithLineage((Object)"1", null, 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)first));
        CommittableWithLineage second = new CommittableWithLineage((Object)"1", null, 2);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)second));
        testHarness.endInput();
        if (!isBatchMode) {
            Assertions.assertThat(testHarness.getOutput()).hasSize(0);
            testHarness.notifyOfCompletedCheckpoint(1L);
        }
        List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(3);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(output.get(0))).hasFailedCommittables(0).hasOverallCommittables(2).hasPendingCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(1))).isEqualTo(this.copyCommittableWithDifferentOrigin(first, 0));
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(2))).isEqualTo(this.copyCommittableWithDifferentOrigin(second, 0));
        testHarness.close();
    }

    @Test
    void testStateRestore() throws Exception {
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(new TestSink.RetryOnceCommitter(), false, true);
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, Long.valueOf(0L), 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary));
        CommittableWithLineage first = new CommittableWithLineage((Object)"1", Long.valueOf(0L), 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)first));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 2L);
        testHarness.notifyOfCompletedCheckpoint(0L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        testHarness.close();
        ForwardingCommitter committer = new ForwardingCommitter();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> restored = this.createTestHarness(committer, false, true);
        restored.initializeState(snapshot);
        restored.open();
        List<StreamElement> output = SinkTestUtil.fromOutput(restored.getOutput());
        Assertions.assertThat(output).hasSize(2);
        Assertions.assertThat((int)committer.getSuccessfulCommits()).isEqualTo(1);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(output.get(0))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables()).hasPendingCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(1))).isEqualTo(new CommittableWithLineage(first.getCommittable(), Long.valueOf(1L), 0));
        restored.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception {
        Sink<Integer> sink = TestSink.newBuilder().setDefaultCommitter().setDefaultGlobalCommitter().setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).build().asV2();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new CommitterOperatorFactory((TwoPhaseCommittingSink)sink, false, isCheckpointingEnabled));
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, Long.valueOf(1L), 1, 1, 0);
        testHarness.processElement(new StreamRecord((Object)committableSummary));
        CommittableWithLineage committableWithLineage = new CommittableWithLineage((Object)"1", Long.valueOf(1L), 1);
        testHarness.processElement(new StreamRecord((Object)committableWithLineage));
        testHarness.endInput();
        if (isCheckpointingEnabled) {
            testHarness.notifyOfCompletedCheckpoint(1L);
        }
        List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(2);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(output.get(0))).hasCheckpointId(1L).hasPendingCommittables(0).hasOverallCommittables(1).hasFailedCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(1))).isEqualTo(this.copyCommittableWithDifferentOrigin(committableWithLineage, 0));
        testHarness.notifyOfCompletedCheckpoint(2L);
        testHarness.endInput();
        Assertions.assertThat(testHarness.getOutput()).hasSize(2);
    }

    CommittableWithLineage<?> copyCommittableWithDifferentOrigin(CommittableWithLineage<?> committable, int subtaskId) {
        return new CommittableWithLineage(committable.getCommittable(), committable.getCheckpointId().isPresent() ? Long.valueOf(committable.getCheckpointId().getAsLong()) : null, subtaskId);
    }

    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> createTestHarness(Committer<String> committer, boolean isBatchMode, boolean isCheckpointingEnabled) throws Exception {
        return new OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>>((OneInputStreamOperatorFactory<CommittableMessage<String>, CommittableMessage<String>>)new CommitterOperatorFactory((TwoPhaseCommittingSink)TestSink.newBuilder().setCommitter(committer).setDefaultGlobalCommitter().setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).build().asV2(), isBatchMode, isCheckpointingEnabled));
    }

    private static class ForwardingCommitter
    extends TestSink.DefaultCommitter {
        private int successfulCommits = 0;

        private ForwardingCommitter() {
        }

        @Override
        public List<String> commit(List<String> committables) {
            this.successfulCommits += committables.size();
            return Collections.emptyList();
        }

        @Override
        public void close() throws Exception {
        }

        public int getSuccessfulCommits() {
            return this.successfulCommits;
        }
    }
}

