/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

public class AbstractStreamOperatorTestHarness<OUT>
implements AutoCloseable {
    protected final StreamOperator<OUT> operator;
    protected final ConcurrentLinkedQueue<Object> outputList;
    protected final Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> sideOutputLists;
    protected final StreamConfig config;
    protected final ExecutionConfig executionConfig;
    protected final TestProcessingTimeService processingTimeService;
    protected final MockStreamTask mockTask;
    protected final TestTaskStateManager taskStateManager;
    final MockEnvironment environment;
    private final Optional<MockEnvironment> internalEnvironment;
    protected StreamTaskStateInitializer streamTaskStateInitializer;
    CloseableRegistry closableRegistry;
    protected StateBackend stateBackend = new MemoryStateBackend();
    private CheckpointStorage checkpointStorage = this.stateBackend.createCheckpointStorage(new JobID());
    private final Object checkpointLock;
    private static final OperatorStateRepartitioner operatorStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
    private boolean setupCalled = false;
    private boolean initializeCalled = false;
    private volatile boolean wasFailedExternally = false;

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator, int maxParallelism, int parallelism, int subtaskIndex) throws Exception {
        this(operator, maxParallelism, parallelism, subtaskIndex, new OperatorID());
    }

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator, int maxParallelism, int parallelism, int subtaskIndex, OperatorID operatorID) throws Exception {
        this(operator, new MockEnvironmentBuilder().setTaskName("MockTask").setMemorySize(0x300000L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(1024).setMaxParallelism(maxParallelism).setParallelism(parallelism).setSubtaskIndex(subtaskIndex).build(), true, operatorID);
    }

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator, MockEnvironment env) throws Exception {
        this(operator, env, false, new OperatorID());
    }

    private AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator, MockEnvironment env, boolean environmentIsInternal, OperatorID operatorID) throws Exception {
        this.operator = operator;
        this.outputList = new ConcurrentLinkedQueue();
        this.sideOutputLists = new HashMap();
        Configuration underlyingConfig = env.getTaskConfiguration();
        this.config = new StreamConfig(underlyingConfig);
        this.config.setCheckpointingEnabled(true);
        this.config.setOperatorID(operatorID);
        this.executionConfig = env.getExecutionConfig();
        this.closableRegistry = new CloseableRegistry();
        this.checkpointLock = new Object();
        this.environment = (MockEnvironment)Preconditions.checkNotNull((Object)env);
        this.taskStateManager = (TestTaskStateManager)env.getTaskStateManager();
        this.internalEnvironment = environmentIsInternal ? Optional.of(this.environment) : Optional.empty();
        this.processingTimeService = new TestProcessingTimeService();
        this.processingTimeService.setCurrentTime(0L);
        this.streamTaskStateInitializer = this.createStreamTaskStateManager((Environment)this.environment, this.stateBackend, (ProcessingTimeService)this.processingTimeService);
        BiConsumer<String, Throwable> handleAsyncException = (message, t) -> {
            this.wasFailedExternally = true;
        };
        this.mockTask = new MockStreamTaskBuilder((Environment)env).setCheckpointLock(this.checkpointLock).setConfig(this.config).setExecutionConfig(this.executionConfig).setStreamTaskStateInitializer(this.streamTaskStateInitializer).setClosableRegistry(this.closableRegistry).setCheckpointStorage(this.checkpointStorage).setProcessingTimeService((ProcessingTimeService)this.processingTimeService).setHandleAsyncException(handleAsyncException).build();
    }

    protected StreamTaskStateInitializer createStreamTaskStateManager(Environment env, StateBackend stateBackend, ProcessingTimeService processingTimeService) {
        return new StreamTaskStateInitializerImpl(env, stateBackend, processingTimeService);
    }

    public void setStateBackend(StateBackend stateBackend) {
        this.stateBackend = stateBackend;
        try {
            this.checkpointStorage = stateBackend.createCheckpointStorage(new JobID());
        }
        catch (IOException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public Object getCheckpointLock() {
        return this.mockTask.getCheckpointLock();
    }

    public MockEnvironment getEnvironment() {
        return this.environment;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public ConcurrentLinkedQueue<Object> getOutput() {
        return this.outputList;
    }

    public <X> ConcurrentLinkedQueue<StreamRecord<X>> getSideOutput(OutputTag<X> tag) {
        return this.sideOutputLists.get(tag);
    }

    public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
        LinkedList<StreamRecord<OUT>> resultElements = new LinkedList<StreamRecord<OUT>>();
        for (Object e : this.getOutput()) {
            if (!(e instanceof StreamRecord)) continue;
            resultElements.add((StreamRecord)e);
        }
        return resultElements;
    }

    public void setup() {
        this.setup(null);
    }

    public void setup(TypeSerializer<OUT> outputSerializer) {
        if (!this.setupCalled) {
            this.streamTaskStateInitializer = this.createStreamTaskStateManager((Environment)this.environment, this.stateBackend, (ProcessingTimeService)this.processingTimeService);
            this.mockTask.setStreamTaskStateInitializer(this.streamTaskStateInitializer);
            if (this.operator instanceof SetupableStreamOperator) {
                ((SetupableStreamOperator)this.operator).setup((StreamTask)this.mockTask, this.config, (Output)new MockOutput(outputSerializer));
            }
            this.setupCalled = true;
        }
    }

    public void initializeState(OperatorSubtaskState operatorStateHandles) throws Exception {
        this.initializeState(operatorStateHandles, null);
    }

    public void initializeState(String operatorStateSnapshotPath) throws Exception {
        this.initializeState(OperatorSnapshotUtil.readStateHandle(operatorStateSnapshotPath));
    }

    public void initializeEmptyState() throws Exception {
        this.initializeState((OperatorSubtaskState)null);
    }

    public static OperatorSubtaskState repartitionOperatorState(OperatorSubtaskState operatorStateHandles, int numKeyGroups, int oldParallelism, int newParallelism, int subtaskIndex) {
        Collection<Object> localRawOperatorState;
        Collection<Object> localManagedOperatorState;
        Preconditions.checkNotNull((Object)operatorStateHandles, (String)"the previous operatorStateHandles should not be null.");
        List keyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions((int)numKeyGroups, (int)newParallelism);
        KeyGroupRange localKeyGroupRange = (KeyGroupRange)keyGroupPartitions.get(subtaskIndex);
        ArrayList localManagedKeyGroupState = new ArrayList();
        StateAssignmentOperation.extractIntersectingState((Collection)operatorStateHandles.getManagedKeyedState(), (KeyGroupRange)localKeyGroupRange, localManagedKeyGroupState);
        ArrayList localRawKeyGroupState = new ArrayList();
        StateAssignmentOperation.extractIntersectingState((Collection)operatorStateHandles.getRawKeyedState(), (KeyGroupRange)localKeyGroupRange, localRawKeyGroupState);
        StateObjectCollection managedOperatorStates = operatorStateHandles.getManagedOperatorState();
        if (!managedOperatorStates.isEmpty()) {
            List managedOperatorState = managedOperatorStates.stream().map(Collections::singletonList).collect(Collectors.toList());
            localManagedOperatorState = (Collection)operatorStateRepartitioner.repartitionState(managedOperatorState, oldParallelism, newParallelism).get(subtaskIndex);
        } else {
            localManagedOperatorState = Collections.emptyList();
        }
        StateObjectCollection rawOperatorStates = operatorStateHandles.getRawOperatorState();
        if (!rawOperatorStates.isEmpty()) {
            List rawOperatorState = rawOperatorStates.stream().map(Collections::singletonList).collect(Collectors.toList());
            localRawOperatorState = (Collection)operatorStateRepartitioner.repartitionState(rawOperatorState, oldParallelism, newParallelism).get(subtaskIndex);
        } else {
            localRawOperatorState = Collections.emptyList();
        }
        return new OperatorSubtaskState(new StateObjectCollection(AbstractStreamOperatorTestHarness.nullToEmptyCollection(localManagedOperatorState)), new StateObjectCollection(AbstractStreamOperatorTestHarness.nullToEmptyCollection(localRawOperatorState)), new StateObjectCollection(AbstractStreamOperatorTestHarness.nullToEmptyCollection(localManagedKeyGroupState)), new StateObjectCollection(AbstractStreamOperatorTestHarness.nullToEmptyCollection(localRawKeyGroupState)));
    }

    public void initializeState(OperatorSubtaskState jmOperatorStateHandles, OperatorSubtaskState tmOperatorStateHandles) throws Exception {
        Preconditions.checkState((!this.initializeCalled ? 1 : 0) != 0, (Object)"TestHarness has already been initialized. Have you opened this harness before initializing it?");
        if (!this.setupCalled) {
            this.setup();
        }
        if (jmOperatorStateHandles != null) {
            TaskStateSnapshot jmTaskStateSnapshot = new TaskStateSnapshot();
            jmTaskStateSnapshot.putSubtaskStateByOperatorID(this.operator.getOperatorID(), jmOperatorStateHandles);
            this.taskStateManager.setReportedCheckpointId(0L);
            this.taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(Collections.singletonMap(0L, jmTaskStateSnapshot));
            if (tmOperatorStateHandles != null) {
                TaskStateSnapshot tmTaskStateSnapshot = new TaskStateSnapshot();
                tmTaskStateSnapshot.putSubtaskStateByOperatorID(this.operator.getOperatorID(), tmOperatorStateHandles);
                this.taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId(Collections.singletonMap(0L, tmTaskStateSnapshot));
            }
        }
        this.operator.initializeState();
        this.initializeCalled = true;
    }

    private static <T> Collection<T> nullToEmptyCollection(Collection<T> collection) {
        return collection != null ? collection : Collections.emptyList();
    }

    public static OperatorSubtaskState repackageState(OperatorSubtaskState ... handles) throws Exception {
        if (handles.length < 1) {
            return null;
        }
        if (handles.length == 1) {
            return handles[0];
        }
        ArrayList mergedManagedOperatorState = new ArrayList(handles.length);
        ArrayList mergedRawOperatorState = new ArrayList(handles.length);
        ArrayList mergedManagedKeyedState = new ArrayList(handles.length);
        ArrayList mergedRawKeyedState = new ArrayList(handles.length);
        for (OperatorSubtaskState handle : handles) {
            StateObjectCollection managedOperatorState = handle.getManagedOperatorState();
            StateObjectCollection rawOperatorState = handle.getRawOperatorState();
            StateObjectCollection managedKeyedState = handle.getManagedKeyedState();
            StateObjectCollection rawKeyedState = handle.getRawKeyedState();
            mergedManagedOperatorState.addAll(managedOperatorState);
            mergedRawOperatorState.addAll(rawOperatorState);
            mergedManagedKeyedState.addAll(managedKeyedState);
            mergedRawKeyedState.addAll(rawKeyedState);
        }
        return new OperatorSubtaskState(new StateObjectCollection(mergedManagedOperatorState), new StateObjectCollection(mergedRawOperatorState), new StateObjectCollection(mergedManagedKeyedState), new StateObjectCollection(mergedRawKeyedState));
    }

    public void open() throws Exception {
        if (!this.initializeCalled) {
            this.initializeEmptyState();
        }
        this.operator.open();
    }

    public OperatorSubtaskState snapshot(long checkpointId, long timestamp) throws Exception {
        return this.snapshotWithLocalState(checkpointId, timestamp).getJobManagerOwnedState();
    }

    public OperatorSnapshotFinalizer snapshotWithLocalState(long checkpointId, long timestamp) throws Exception {
        OperatorSnapshotFutures operatorStateResult = this.operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forCheckpointWithDefaultLocation(), this.checkpointStorage.resolveCheckpointStorageLocation(checkpointId, CheckpointStorageLocationReference.getDefault()));
        return new OperatorSnapshotFinalizer(operatorStateResult);
    }

    public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
        this.operator.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public void close() throws Exception {
        this.operator.close();
        this.operator.dispose();
        if (this.processingTimeService != null) {
            this.processingTimeService.shutdownService();
        }
        this.setupCalled = false;
        if (this.internalEnvironment.isPresent()) {
            this.internalEnvironment.get().close();
        }
    }

    public void setProcessingTime(long time) throws Exception {
        this.processingTimeService.setCurrentTime(time);
    }

    public long getProcessingTime() {
        return this.processingTimeService.getCurrentProcessingTime();
    }

    public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
        this.config.setTimeCharacteristic(timeCharacteristic);
    }

    public TimeCharacteristic getTimeCharacteristic() {
        return this.config.getTimeCharacteristic();
    }

    public boolean wasFailedExternally() {
        return this.wasFailedExternally;
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        if (this.operator instanceof AbstractStreamOperator) {
            return ((AbstractStreamOperator)this.operator).numProcessingTimeTimers();
        }
        throw new UnsupportedOperationException();
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        if (this.operator instanceof AbstractStreamOperator) {
            return ((AbstractStreamOperator)this.operator).numEventTimeTimers();
        }
        throw new UnsupportedOperationException();
    }

    @VisibleForTesting
    public StreamStatus getStreamStatus() {
        return this.mockTask.getStreamStatusMaintainer().getStreamStatus();
    }

    private class MockOutput
    implements Output<StreamRecord<OUT>> {
        private TypeSerializer<OUT> outputSerializer;
        private TypeSerializer sideOutputSerializer;

        MockOutput() {
            this(null);
        }

        MockOutput(TypeSerializer<OUT> outputSerializer) {
            this.outputSerializer = outputSerializer;
        }

        public void emitWatermark(Watermark mark) {
            AbstractStreamOperatorTestHarness.this.outputList.add(mark);
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            AbstractStreamOperatorTestHarness.this.outputList.add(latencyMarker);
        }

        public void collect(StreamRecord<OUT> element) {
            if (this.outputSerializer == null) {
                this.outputSerializer = TypeExtractor.getForObject((Object)element.getValue()).createSerializer(AbstractStreamOperatorTestHarness.this.executionConfig);
            }
            if (element.hasTimestamp()) {
                AbstractStreamOperatorTestHarness.this.outputList.add(new StreamRecord(this.outputSerializer.copy(element.getValue()), element.getTimestamp()));
            } else {
                AbstractStreamOperatorTestHarness.this.outputList.add(new StreamRecord(this.outputSerializer.copy(element.getValue())));
            }
        }

        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            this.sideOutputSerializer = TypeExtractor.getForObject((Object)record.getValue()).createSerializer(AbstractStreamOperatorTestHarness.this.executionConfig);
            ConcurrentLinkedQueue<Object> sideOutputList = AbstractStreamOperatorTestHarness.this.sideOutputLists.get(outputTag);
            if (sideOutputList == null) {
                sideOutputList = new ConcurrentLinkedQueue();
                AbstractStreamOperatorTestHarness.this.sideOutputLists.put(outputTag, sideOutputList);
            }
            if (record.hasTimestamp()) {
                sideOutputList.add(new StreamRecord(this.sideOutputSerializer.copy(record.getValue()), record.getTimestamp()));
            } else {
                sideOutputList.add(new StreamRecord(this.sideOutputSerializer.copy(record.getValue())));
            }
        }

        public void close() {
        }
    }
}

