package org.apache.flink.streaming.api.operators;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.util.CollectionUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperatorTest.class */
public class SourceOperatorTest {

    @Nullable
    private SourceOperatorTestContext context;

    @Nullable
    private SourceOperator<Integer, MockSourceSplit> operator;

    @Nullable
    private MockSourceReader mockSourceReader;

    @Nullable
    private MockOperatorEventGateway mockGateway;

    @Before
    public void setup() throws Exception {
        this.context = new SourceOperatorTestContext();
        this.operator = this.context.getOperator();
        this.mockSourceReader = this.context.getSourceReader();
        this.mockGateway = this.context.getGateway();
    }

    @After
    public void tearDown() throws Exception {
        this.context.close();
        this.context = null;
        this.operator = null;
        this.mockSourceReader = null;
        this.mockGateway = null;
    }

    @Test
    public void testInitializeState() throws Exception {
        StateInitializationContext createStateContext = this.context.createStateContext();
        this.operator.initializeState(createStateContext);
        Assert.assertNotNull(createStateContext.getOperatorStateStore().getListState(SourceOperator.SPLITS_STATE_DESC));
    }

    @Test
    public void testOpen() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        Assert.assertEquals(Collections.singletonList(SourceOperatorTestContext.MOCK_SPLIT), this.mockSourceReader.getAssignedSplits());
        Assert.assertTrue(this.mockSourceReader.isStarted());
        Assert.assertEquals(1L, this.mockGateway.getEventsSent().size());
        Assert.assertTrue(((OperatorEvent) this.mockGateway.getEventsSent().get(0)) instanceof ReaderRegistrationEvent);
        Assert.assertEquals(1L, r0.subtaskId());
    }

    @Test
    public void testStop() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        Assert.assertEquals(Collections.singletonList(SourceOperatorTestContext.MOCK_SPLIT), this.mockSourceReader.getAssignedSplits());
        CollectingDataOutput collectingDataOutput = new CollectingDataOutput();
        Assert.assertEquals(DataInputStatus.NOTHING_AVAILABLE, this.operator.emitNext(collectingDataOutput));
        Assert.assertFalse(this.operator.isAvailable());
        CompletableFuture stop = this.operator.stop(StopMode.DRAIN);
        Assert.assertTrue(this.operator.isAvailable());
        Assert.assertFalse(stop.isDone());
        Assert.assertEquals(DataInputStatus.END_OF_DATA, this.operator.emitNext(collectingDataOutput));
        this.operator.finish();
        Assert.assertTrue(stop.isDone());
    }

    @Test
    public void testHandleAddSplitsEvent() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit mockSourceSplit = new MockSourceSplit(2);
        this.operator.handleOperatorEvent(new AddSplitEvent(Collections.singletonList(mockSourceSplit), new MockSourceSplitSerializer()));
        Assert.assertEquals(Arrays.asList(SourceOperatorTestContext.MOCK_SPLIT, mockSourceSplit), this.mockSourceReader.getAssignedSplits());
    }

    @Test
    public void testHandleAddSourceEvent() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        SourceEvent sourceEvent = new SourceEvent() { // from class: org.apache.flink.streaming.api.operators.SourceOperatorTest.1
        };
        this.operator.handleOperatorEvent(new SourceEventWrapper(sourceEvent));
        Assert.assertEquals(Collections.singletonList(sourceEvent), this.mockSourceReader.getReceivedSourceEvents());
    }

    @Test
    public void testSnapshotState() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit mockSourceSplit = new MockSourceSplit(2);
        this.operator.handleOperatorEvent(new AddSplitEvent(Collections.singletonList(mockSourceSplit), new MockSourceSplitSerializer()));
        this.operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));
        Assert.assertEquals(Arrays.asList(SourceOperatorTestContext.MOCK_SPLIT, mockSourceSplit), CollectionUtil.iterableToList((Iterable) this.operator.getReaderState().get()));
    }

    @Test
    public void testNotifyCheckpointComplete() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        this.operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));
        this.operator.notifyCheckpointComplete(100L);
        Assert.assertEquals(100L, this.mockSourceReader.getCompletedCheckpoints().get(0).longValue());
    }

    @Test
    public void testNotifyCheckpointAborted() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        this.operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));
        this.operator.notifyCheckpointAborted(100L);
        Assert.assertEquals(100L, this.mockSourceReader.getAbortedCheckpoints().get(0).longValue());
    }
}
