/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.util.CollectionUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SourceOperatorTest {
    private static final int SUBTASK_INDEX = 1;
    private static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 10);
    private MockSourceReader mockSourceReader;
    private MockOperatorEventGateway mockGateway;
    private SourceOperator<Integer, MockSourceSplit> operator;

    @Before
    public void setup() throws Exception {
        this.mockSourceReader = new MockSourceReader();
        this.mockGateway = new MockOperatorEventGateway();
        this.operator = new TestingSourceOperator<Integer>((SourceReader<Integer, MockSourceSplit>)this.mockSourceReader, (OperatorEventGateway)this.mockGateway, 1);
        Environment env = this.getTestingEnvironment();
        this.operator.setup((StreamTask)new SourceOperatorStreamTask(env), (StreamConfig)new MockStreamConfig(new Configuration(), 1), new MockOutput(new ArrayList()));
        this.operator.initializeState((StreamTaskStateInitializer)new StreamTaskStateInitializerImpl(env, (StateBackend)new MemoryStateBackend()));
    }

    @After
    public void cleanUp() throws Exception {
        this.operator.close();
        this.operator.dispose();
        Assert.assertTrue((boolean)this.mockSourceReader.isClosed());
    }

    @Test
    public void testInitializeState() throws Exception {
        StateInitializationContext stateContext = this.getStateContext();
        this.operator.initializeState(stateContext);
        Assert.assertNotNull((Object)stateContext.getOperatorStateStore().getListState(SourceOperator.SPLITS_STATE_DESC));
    }

    @Test
    public void testOpen() throws Exception {
        this.operator.initializeState(this.getStateContext());
        this.operator.open();
        Assert.assertEquals(Collections.singletonList(MOCK_SPLIT), (Object)this.mockSourceReader.getAssignedSplits());
        Assert.assertTrue((boolean)this.mockSourceReader.isStarted());
        Assert.assertEquals((long)1L, (long)this.mockGateway.getEventsSent().size());
        OperatorEvent operatorEvent = (OperatorEvent)this.mockGateway.getEventsSent().get(0);
        Assert.assertTrue((boolean)(operatorEvent instanceof ReaderRegistrationEvent));
        Assert.assertEquals((long)1L, (long)((ReaderRegistrationEvent)operatorEvent).subtaskId());
    }

    @Test
    public void testHandleAddSplitsEvent() throws Exception {
        this.operator.initializeState(this.getStateContext());
        this.operator.open();
        MockSourceSplit newSplit = new MockSourceSplit(2);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(newSplit), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        Assert.assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), (Object)this.mockSourceReader.getAssignedSplits());
    }

    @Test
    public void testHandleAddSourceEvent() throws Exception {
        this.operator.initializeState(this.getStateContext());
        this.operator.open();
        SourceEvent event = new SourceEvent(){};
        this.operator.handleOperatorEvent((OperatorEvent)new SourceEventWrapper(event));
        Assert.assertEquals(Collections.singletonList(event), (Object)this.mockSourceReader.getReceivedSourceEvents());
    }

    @Test
    public void testSnapshotState() throws Exception {
        StateInitializationContext stateContext = this.getStateContext();
        this.operator.initializeState(stateContext);
        this.operator.open();
        MockSourceSplit newSplit = new MockSourceSplit(2);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(newSplit), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        this.operator.snapshotState((StateSnapshotContext)new StateSnapshotContextSynchronousImpl(100L, 100L));
        List splitsInState = CollectionUtil.iterableToList((Iterable)((Iterable)this.operator.getReaderState().get()));
        Assert.assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), (Object)splitsInState);
    }

    @Test
    public void testNotifyCheckpointComplete() throws Exception {
        StateInitializationContext stateContext = this.getStateContext();
        this.operator.initializeState(stateContext);
        this.operator.open();
        this.operator.snapshotState((StateSnapshotContext)new StateSnapshotContextSynchronousImpl(100L, 100L));
        this.operator.notifyCheckpointComplete(100L);
        Assert.assertEquals((long)100L, (long)((Long)this.mockSourceReader.getCompletedCheckpoints().get(0)));
    }

    @Test
    public void testNotifyCheckpointAborted() throws Exception {
        StateInitializationContext stateContext = this.getStateContext();
        this.operator.initializeState(stateContext);
        this.operator.open();
        this.operator.snapshotState((StateSnapshotContext)new StateSnapshotContextSynchronousImpl(100L, 100L));
        this.operator.notifyCheckpointAborted(100L);
        Assert.assertEquals((long)100L, (long)((Long)this.mockSourceReader.getAbortedCheckpoints().get(0)));
    }

    @Test
    public void testDisposeAfterCloseOnlyClosesReaderOnce() throws Exception {
        this.operator.initializeState(this.getStateContext());
        this.operator.open();
        this.operator.close();
        this.operator.dispose();
        Assert.assertEquals((long)1L, (long)this.mockSourceReader.getTimesClosed());
    }

    private StateInitializationContext getStateContext() throws Exception {
        byte[] serializedSplitWithVersion = SimpleVersionedSerialization.writeVersionAndSerialize((SimpleVersionedSerializer)new MockSourceSplitSerializer(), (Object)MOCK_SPLIT);
        OperatorStateStore operatorStateStore = this.createOperatorStateStore();
        StateInitializationContextImpl stateContext = new StateInitializationContextImpl(false, operatorStateStore, null, null, null);
        stateContext.getOperatorStateStore().getListState(SourceOperator.SPLITS_STATE_DESC).update(Collections.singletonList(serializedSplitWithVersion));
        return stateContext;
    }

    private OperatorStateStore createOperatorStateStore() throws Exception {
        MockEnvironment env = new MockEnvironmentBuilder().build();
        MemoryStateBackend abstractStateBackend = new MemoryStateBackend();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        return abstractStateBackend.createOperatorStateBackend((Environment)env, "test-operator", Collections.emptyList(), cancelStreamRegistry);
    }

    private Environment getTestingEnvironment() {
        return new StreamMockEnvironment(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, (TaskStateManager)new TestTaskStateManager());
    }
}

