package org.apache.flink.streaming.runtime.operators.sink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterOperatorTest.class */
public class StreamingCommitterOperatorTest extends TestLogger {
    @Test(expected = IllegalStateException.class)
    public void throwExceptionWithoutSerializer() throws Exception {
        OneInputStreamOperatorTestHarness<String, String> createTestHarness = createTestHarness(new TestSink.DefaultCommitter(), null);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
    }

    @Test(expected = IllegalStateException.class)
    public void throwExceptionWithoutCommitter() throws Exception {
        OneInputStreamOperatorTestHarness<String, String> createTestHarness = createTestHarness(null, TestSink.StringCommittableSerializer.INSTANCE);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
    }

    @Test(expected = UnsupportedOperationException.class)
    public void doNotSupportRetry() throws Exception {
        List asList = Arrays.asList("lazy", "leaf");
        OneInputStreamOperatorTestHarness<String, String> createTestHarness = createTestHarness(new TestSink.AlwaysRetryCommitter());
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.processElements((Collection) asList.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).collect(Collectors.toList()));
        createTestHarness.snapshot(1L, 1L);
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        createTestHarness.close();
    }

    @Test
    public void closeCommitter() throws Exception {
        TestSink.DefaultCommitter defaultCommitter = new TestSink.DefaultCommitter();
        OneInputStreamOperatorTestHarness<String, String> createTestHarness = createTestHarness(defaultCommitter);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.close();
        MatcherAssert.assertThat(Boolean.valueOf(defaultCommitter.isClosed()), Matchers.is(true));
    }

    @Test
    public void restoredFromMergedState() throws Exception {
        List asList = Arrays.asList("today", "whom");
        OperatorSubtaskState buildSubtaskState = TestHarnessUtil.buildSubtaskState(createTestHarness(), asList);
        List asList2 = Arrays.asList("future", "evil", "how");
        OperatorSubtaskState buildSubtaskState2 = TestHarnessUtil.buildSubtaskState(createTestHarness(), asList2);
        TestSink.DefaultCommitter defaultCommitter = new TestSink.DefaultCommitter();
        OneInputStreamOperatorTestHarness<String, String> createTestHarness = createTestHarness(defaultCommitter);
        createTestHarness.initializeState(OneInputStreamOperatorTestHarness.repartitionOperatorState(OneInputStreamOperatorTestHarness.repackageState(buildSubtaskState, buildSubtaskState2), 2, 2, 1, 0));
        createTestHarness.open();
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(asList);
        arrayList.addAll(asList2);
        createTestHarness.snapshot(1L, 1L);
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        createTestHarness.close();
        MatcherAssert.assertThat(createTestHarness.getOutput(), Matchers.containsInAnyOrder(arrayList.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).toArray()));
        MatcherAssert.assertThat(defaultCommitter.getCommittedData(), Matchers.containsInAnyOrder(arrayList.toArray()));
    }

    @Test
    public void commitMultipleStagesTogether() throws Exception {
        TestSink.DefaultCommitter defaultCommitter = new TestSink.DefaultCommitter();
        List asList = Arrays.asList("cautious", "nature");
        List asList2 = Arrays.asList("count", "over");
        List asList3 = Arrays.asList("lawyer", "grammar");
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(asList);
        arrayList.addAll(asList2);
        arrayList.addAll(asList3);
        OneInputStreamOperatorTestHarness<String, String> createTestHarness = createTestHarness(defaultCommitter);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.processElements((Collection) asList.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).collect(Collectors.toList()));
        createTestHarness.snapshot(1L, 1L);
        createTestHarness.processElements((Collection) asList2.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).collect(Collectors.toList()));
        createTestHarness.snapshot(2L, 2L);
        createTestHarness.processElements((Collection) asList3.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).collect(Collectors.toList()));
        createTestHarness.snapshot(3L, 3L);
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        createTestHarness.notifyOfCompletedCheckpoint(3L);
        createTestHarness.close();
        MatcherAssert.assertThat(createTestHarness.getOutput().toArray(), Matchers.equalTo(arrayList.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).toArray()));
        MatcherAssert.assertThat(defaultCommitter.getCommittedData().toArray(), Matchers.equalTo(arrayList.toArray()));
    }

    private OneInputStreamOperatorTestHarness<String, String> createTestHarness() throws Exception {
        return createTestHarness(new TestSink.DefaultCommitter(), TestSink.StringCommittableSerializer.INSTANCE);
    }

    private OneInputStreamOperatorTestHarness<String, String> createTestHarness(Committer<String> committer) throws Exception {
        return createTestHarness(committer, TestSink.StringCommittableSerializer.INSTANCE);
    }

    private OneInputStreamOperatorTestHarness<String, String> createTestHarness(Committer<String> committer, SimpleVersionedSerializer<String> simpleVersionedSerializer) throws Exception {
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperatorFactory) new StreamingCommitterOperatorFactory(TestSink.newBuilder().setCommitter(committer).setCommittableSerializer(simpleVersionedSerializer).build()), (TypeSerializer) StringSerializer.INSTANCE);
    }
}
