package org.apache.flink.streaming.runtime.operators.sink;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/BatchCommitterOperatorTest.class */
public class BatchCommitterOperatorTest extends TestLogger {
    @Test(expected = IllegalStateException.class)
    public void throwExceptionWithoutCommitter() throws Exception {
        createTestHarness(null).initializeEmptyState();
    }

    @Test(expected = UnsupportedOperationException.class)
    public void doNotSupportRetry() throws Exception {
        OneInputStreamOperatorTestHarness<String, String> createTestHarness = createTestHarness(new TestSink.AlwaysRetryCommitter());
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord<>("those"));
        createTestHarness.endInput();
        createTestHarness.close();
    }

    @Test
    public void commit() throws Exception {
        TestSink.DefaultCommitter defaultCommitter = new TestSink.DefaultCommitter();
        OneInputStreamOperatorTestHarness<String, String> createTestHarness = createTestHarness(defaultCommitter);
        List asList = Arrays.asList("youth", "laugh", "nothing");
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.processElements((Collection) asList.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).collect(Collectors.toList()));
        createTestHarness.endInput();
        createTestHarness.close();
        MatcherAssert.assertThat(defaultCommitter.getCommittedData(), Matchers.containsInAnyOrder(asList.toArray()));
        MatcherAssert.assertThat(createTestHarness.getOutput(), Matchers.containsInAnyOrder(asList.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).toArray()));
    }

    @Test
    public void close() throws Exception {
        TestSink.DefaultCommitter defaultCommitter = new TestSink.DefaultCommitter();
        OneInputStreamOperatorTestHarness<String, String> createTestHarness = createTestHarness(defaultCommitter);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.close();
        MatcherAssert.assertThat(Boolean.valueOf(defaultCommitter.isClosed()), Matchers.is(true));
    }

    private OneInputStreamOperatorTestHarness<String, String> createTestHarness(Committer<String> committer) throws Exception {
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperatorFactory) new BatchCommitterOperatorFactory(TestSink.newBuilder().setCommitter(committer).setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).build()), (TypeSerializer) StringSerializer.INSTANCE);
    }
}
