package org.apache.flink.streaming.runtime.operators.sink;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperatorTest.class */
public class StatefulSinkWriterOperatorTest extends SinkWriterOperatorTestBase {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperatorTest$DummySinkOperator.class */
    static class DummySinkOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        static final String DUMMY_SINK_STATE_NAME = "dummy_sink_state";
        static final ListStateDescriptor<byte[]> SINK_STATE_DESC = new ListStateDescriptor<>(DUMMY_SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE);
        ListState<String> sinkState;

        DummySinkOperator() {
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.sinkState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(SINK_STATE_DESC), TestSink.StringCommittableSerializer.INSTANCE);
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.sinkState.add(streamRecord.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperatorTest$SnapshottingBufferingSinkWriter.class */
    public static class SnapshottingBufferingSinkWriter extends SinkWriterOperatorTestBase.BufferingSinkWriter {
        SnapshottingBufferingSinkWriter() {
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public List<String> snapshotState() {
            return this.elements;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        void restoredFrom(List<String> list) {
            this.elements = list;
        }
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    protected AbstractSinkWriterOperatorFactory createWriterOperator(TestSink testSink) {
        return new StatefulSinkWriterOperatorFactory(testSink);
    }

    @Test
    public void stateIsRestored() throws Exception {
        OneInputStreamOperatorTestHarness<Integer, String> createTestHarness = createTestHarness(TestSink.newBuilder().setWriter(new SnapshottingBufferingSinkWriter()).withWriterState().build());
        createTestHarness.open();
        createTestHarness.processWatermark(0L);
        createTestHarness.processElement(1, 1L);
        createTestHarness.processElement(2, 2L);
        createTestHarness.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = createTestHarness.snapshot(1L, 1L);
        Assert.assertThat(createTestHarness.getOutput(), Matchers.contains(new Object[]{new Watermark(0L)}));
        createTestHarness.close();
        OneInputStreamOperatorTestHarness<Integer, String> createTestHarness2 = createTestHarness(TestSink.newBuilder().setWriter(new SnapshottingBufferingSinkWriter()).withWriterState().build());
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.endInput();
        Assert.assertThat(createTestHarness2.getOutput(), Matchers.contains(new Object[]{new StreamRecord(Tuple3.of(1, 1L, 0L).toString()), new StreamRecord(Tuple3.of(2, 2L, 0L).toString())}));
    }

    @Test
    public void loadPreviousSinkState() throws Exception {
        List asList = Arrays.asList("bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt");
        OperatorSubtaskState buildSubtaskState = TestHarnessUtil.buildSubtaskState(new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new DummySinkOperator(), (TypeSerializer) StringSerializer.INSTANCE), asList);
        OneInputStreamOperatorTestHarness<Integer, String> createCompatibleSinkOperator = createCompatibleSinkOperator();
        List list = (List) asList.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).collect(Collectors.toList());
        list.add(new StreamRecord(Tuple3.of(1, 1, Long.MIN_VALUE).toString()));
        createCompatibleSinkOperator.initializeState(buildSubtaskState);
        createCompatibleSinkOperator.open();
        createCompatibleSinkOperator.processElement(1, 1L);
        createCompatibleSinkOperator.endInput();
        OperatorSubtaskState snapshot = createCompatibleSinkOperator.snapshot(1L, 1L);
        createCompatibleSinkOperator.close();
        Assert.assertThat(createCompatibleSinkOperator.getOutput(), Matchers.containsInAnyOrder(list.toArray()));
        OneInputStreamOperatorTestHarness<Integer, String> createCompatibleSinkOperator2 = createCompatibleSinkOperator();
        List asList2 = Arrays.asList(new StreamRecord(Tuple3.of(2, 2, Long.MIN_VALUE).toString()), new StreamRecord(Tuple3.of(3, 3, Long.MIN_VALUE).toString()));
        createCompatibleSinkOperator2.initializeState(snapshot);
        createCompatibleSinkOperator2.open();
        createCompatibleSinkOperator2.processElement(2, 2L);
        createCompatibleSinkOperator2.processElement(3, 3L);
        createCompatibleSinkOperator2.endInput();
        Assert.assertThat(createCompatibleSinkOperator2.getOutput(), Matchers.containsInAnyOrder(asList2.toArray()));
    }

    private OneInputStreamOperatorTestHarness<Integer, String> createCompatibleSinkOperator() throws Exception {
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperatorFactory) new StatefulSinkWriterOperatorFactory(TestSink.newBuilder().setWriter(new SnapshottingBufferingSinkWriter()).withWriterState().build(), "dummy_sink_state"), (TypeSerializer) IntSerializer.INSTANCE);
    }
}
