package org.apache.flink.streaming.util;

import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.migration.util.MigrationInstantiationUtil;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.class */
public class AbstractStreamOperatorTestHarness<OUT> {
    protected final StreamOperator<OUT> operator;
    protected final ConcurrentLinkedQueue<Object> outputList;
    protected final StreamConfig config;
    protected final ExecutionConfig executionConfig;
    protected final TestProcessingTimeService processingTimeService;
    protected final StreamTask<?, ?> mockTask;
    final Environment environment;
    CloseableRegistry closableRegistry;
    protected AbstractStateBackend stateBackend;
    private final Object checkpointLock;
    private final OperatorStateRepartitioner operatorStateRepartitioner;
    private boolean setupCalled;
    private boolean initializeCalled;
    private volatile boolean wasFailedExternally;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness$MockOutput.class */
    public class MockOutput implements Output<StreamRecord<OUT>> {
        private TypeSerializer<OUT> outputSerializer;

        MockOutput(AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness) {
            this(null);
        }

        MockOutput(TypeSerializer<OUT> typeSerializer) {
            this.outputSerializer = typeSerializer;
        }

        public void emitWatermark(Watermark watermark) {
            AbstractStreamOperatorTestHarness.this.outputList.add(watermark);
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            AbstractStreamOperatorTestHarness.this.outputList.add(latencyMarker);
        }

        public void collect(StreamRecord<OUT> streamRecord) {
            if (this.outputSerializer == null) {
                this.outputSerializer = TypeExtractor.getForObject(streamRecord.getValue()).createSerializer(AbstractStreamOperatorTestHarness.this.executionConfig);
            }
            if (streamRecord.hasTimestamp()) {
                AbstractStreamOperatorTestHarness.this.outputList.add(new StreamRecord(this.outputSerializer.copy(streamRecord.getValue()), streamRecord.getTimestamp()));
            } else {
                AbstractStreamOperatorTestHarness.this.outputList.add(new StreamRecord(this.outputSerializer.copy(streamRecord.getValue())));
            }
        }

        public void close() {
        }
    }

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> streamOperator, int i, int i2, int i3) throws Exception {
        this(streamOperator, i, i2, i3, new MockEnvironment("MockTask", 3145728L, new MockInputSplitProvider(), StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE, new Configuration(), new ExecutionConfig(), i, i2, i3));
    }

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> streamOperator, int i, int i2, int i3, final Environment environment) throws Exception {
        this.stateBackend = new MemoryStateBackend();
        this.operatorStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
        this.setupCalled = false;
        this.initializeCalled = false;
        this.wasFailedExternally = false;
        this.operator = streamOperator;
        this.outputList = new ConcurrentLinkedQueue<>();
        Configuration taskConfiguration = environment.getTaskConfiguration();
        this.config = new StreamConfig(taskConfiguration);
        this.config.setCheckpointingEnabled(true);
        this.executionConfig = environment.getExecutionConfig();
        this.closableRegistry = new CloseableRegistry();
        this.checkpointLock = new Object();
        this.environment = (Environment) Preconditions.checkNotNull(environment);
        this.mockTask = (StreamTask) Mockito.mock(StreamTask.class);
        this.processingTimeService = new TestProcessingTimeService();
        this.processingTimeService.setCurrentTime(0L);
        Mockito.when(this.mockTask.getName()).thenReturn("Mock Task");
        Mockito.when(this.mockTask.getCheckpointLock()).thenReturn(this.checkpointLock);
        Mockito.when(this.mockTask.getConfiguration()).thenReturn(this.config);
        Mockito.when(this.mockTask.getTaskConfiguration()).thenReturn(taskConfiguration);
        Mockito.when(this.mockTask.getEnvironment()).thenReturn(environment);
        Mockito.when(this.mockTask.getExecutionConfig()).thenReturn(this.executionConfig);
        Mockito.when(this.mockTask.getUserCodeClassLoader()).thenReturn(getClass().getClassLoader());
        Mockito.when(this.mockTask.getCancelables()).thenReturn(this.closableRegistry);
        ((StreamTask) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m85answer(InvocationOnMock invocationOnMock) throws Throwable {
                AbstractStreamOperatorTestHarness.this.wasFailedExternally = true;
                return null;
            }
        }).when(this.mockTask)).handleAsyncException((String) Matchers.any(String.class), (Throwable) Matchers.any(Throwable.class));
        try {
            ((StreamTask) Mockito.doAnswer(new Answer<CheckpointStreamFactory>() { // from class: org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.2
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public CheckpointStreamFactory m86answer(InvocationOnMock invocationOnMock) throws Throwable {
                    return AbstractStreamOperatorTestHarness.this.stateBackend.createStreamFactory(new JobID(), ((StreamOperator) invocationOnMock.getArguments()[0]).getClass().getSimpleName());
                }
            }).when(this.mockTask)).createCheckpointStreamFactory((StreamOperator) Matchers.any(StreamOperator.class));
            try {
                ((StreamTask) Mockito.doAnswer(new Answer<OperatorStateBackend>() { // from class: org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.3
                    /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                    public OperatorStateBackend m87answer(InvocationOnMock invocationOnMock) throws Throwable {
                        StreamOperator streamOperator2 = (StreamOperator) invocationOnMock.getArguments()[0];
                        Collection collection = (Collection) invocationOnMock.getArguments()[1];
                        OperatorStateBackend createOperatorStateBackend = AbstractStreamOperatorTestHarness.this.stateBackend.createOperatorStateBackend(environment, streamOperator2.getClass().getSimpleName());
                        AbstractStreamOperatorTestHarness.this.mockTask.getCancelables().registerClosable(createOperatorStateBackend);
                        if (null != collection) {
                            createOperatorStateBackend.restore(collection);
                        }
                        return createOperatorStateBackend;
                    }
                }).when(this.mockTask)).createOperatorStateBackend((StreamOperator) Matchers.any(StreamOperator.class), (Collection) Matchers.any(Collection.class));
                ((StreamTask) Mockito.doAnswer(new Answer<ProcessingTimeService>() { // from class: org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.4
                    /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                    public ProcessingTimeService m88answer(InvocationOnMock invocationOnMock) throws Throwable {
                        return AbstractStreamOperatorTestHarness.this.processingTimeService;
                    }
                }).when(this.mockTask)).getProcessingTimeService();
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        } catch (Exception e2) {
            throw new RuntimeException(e2.getMessage(), e2);
        }
    }

    public void setStateBackend(AbstractStateBackend abstractStateBackend) {
        this.stateBackend = abstractStateBackend;
    }

    public Object getCheckpointLock() {
        return this.mockTask.getCheckpointLock();
    }

    public Environment getEnvironment() {
        return this.mockTask.getEnvironment();
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public ConcurrentLinkedQueue<Object> getOutput() {
        return this.outputList;
    }

    public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
        LinkedList linkedList = new LinkedList();
        Iterator<Object> it = getOutput().iterator();
        while (it.hasNext()) {
            Object next = it.next();
            if (next instanceof StreamRecord) {
                linkedList.add((StreamRecord) next);
            }
        }
        return linkedList;
    }

    public void setup() {
        setup(null);
    }

    public void setup(TypeSerializer<OUT> typeSerializer) {
        this.operator.setup(this.mockTask, this.config, new MockOutput(typeSerializer));
        this.setupCalled = true;
    }

    public void initializeStateFromLegacyCheckpoint(String str) throws Exception {
        FileInputStream fileInputStream = new FileInputStream(str);
        StreamTaskState streamTaskState = (StreamTaskState) MigrationInstantiationUtil.deserializeObject(fileInputStream, ClassLoader.getSystemClassLoader());
        fileInputStream.close();
        if (!this.setupCalled) {
            setup();
        }
        StreamStateHandle convertOperatorAndFunctionState = SavepointV0Serializer.convertOperatorAndFunctionState(streamTaskState);
        ArrayList arrayList = new ArrayList();
        if (streamTaskState.getKvStates() != null) {
            arrayList.add(SavepointV0Serializer.convertKeyedBackendState(streamTaskState.getKvStates(), this.environment.getTaskInfo().getIndexOfThisSubtask(), 0L));
        }
        initializeState(new OperatorStateHandles(0, convertOperatorAndFunctionState, arrayList, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
    }

    public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
        if (!this.setupCalled) {
            setup();
        }
        if (operatorStateHandles != null) {
            int numberOfKeyGroups = getEnvironment().getTaskInfo().getNumberOfKeyGroups();
            int numberOfParallelSubtasks = getEnvironment().getTaskInfo().getNumberOfParallelSubtasks();
            int indexOfThisSubtask = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
            KeyGroupRange keyGroupRange = (KeyGroupRange) StateAssignmentOperation.createKeyGroupPartitions(numberOfKeyGroups, numberOfParallelSubtasks).get(indexOfThisSubtask);
            List list = null;
            if (operatorStateHandles.getManagedKeyedState() != null) {
                list = StateAssignmentOperation.getKeyGroupsStateHandles(operatorStateHandles.getManagedKeyedState(), keyGroupRange);
            }
            List list2 = null;
            if (operatorStateHandles.getRawKeyedState() != null) {
                list2 = StateAssignmentOperation.getKeyGroupsStateHandles(operatorStateHandles.getRawKeyedState(), keyGroupRange);
            }
            ArrayList arrayList = new ArrayList();
            if (operatorStateHandles.getManagedOperatorState() != null) {
                arrayList.addAll(operatorStateHandles.getManagedOperatorState());
            }
            Collection collection = (Collection) this.operatorStateRepartitioner.repartitionState(arrayList, numberOfParallelSubtasks).get(indexOfThisSubtask);
            ArrayList arrayList2 = new ArrayList();
            if (operatorStateHandles.getRawOperatorState() != null) {
                arrayList2.addAll(operatorStateHandles.getRawOperatorState());
            }
            this.operator.initializeState(new OperatorStateHandles(0, operatorStateHandles.getLegacyOperatorState(), list, list2, collection, (Collection) this.operatorStateRepartitioner.repartitionState(arrayList2, numberOfParallelSubtasks).get(indexOfThisSubtask)));
        } else {
            this.operator.initializeState((OperatorStateHandles) null);
        }
        this.initializeCalled = true;
    }

    public static OperatorStateHandles repackageState(OperatorStateHandles... operatorStateHandlesArr) throws Exception {
        if (operatorStateHandlesArr.length < 1) {
            return null;
        }
        if (operatorStateHandlesArr.length == 1) {
            return operatorStateHandlesArr[0];
        }
        ArrayList arrayList = new ArrayList(operatorStateHandlesArr.length);
        ArrayList arrayList2 = new ArrayList(operatorStateHandlesArr.length);
        ArrayList arrayList3 = new ArrayList(operatorStateHandlesArr.length);
        ArrayList arrayList4 = new ArrayList(operatorStateHandlesArr.length);
        for (OperatorStateHandles operatorStateHandles : operatorStateHandlesArr) {
            Collection managedOperatorState = operatorStateHandles.getManagedOperatorState();
            Collection rawOperatorState = operatorStateHandles.getRawOperatorState();
            Collection managedKeyedState = operatorStateHandles.getManagedKeyedState();
            Collection rawKeyedState = operatorStateHandles.getRawKeyedState();
            if (managedOperatorState != null) {
                arrayList.addAll(managedOperatorState);
            }
            if (rawOperatorState != null) {
                arrayList2.addAll(rawOperatorState);
            }
            if (managedKeyedState != null) {
                arrayList3.addAll(managedKeyedState);
            }
            if (rawKeyedState != null) {
                arrayList4.addAll(rawKeyedState);
            }
        }
        return new OperatorStateHandles(0, (StreamStateHandle) null, arrayList3, arrayList4, arrayList, arrayList2);
    }

    public void open() throws Exception {
        if (!this.initializeCalled) {
            initializeState(null);
        }
        this.operator.open();
    }

    public OperatorStateHandles snapshot(long j, long j2) throws Exception {
        OperatorSnapshotResult snapshotState = this.operator.snapshotState(j, j2);
        KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) FutureUtil.runIfNotDoneAndGet(snapshotState.getKeyedStateManagedFuture());
        KeyGroupsStateHandle keyGroupsStateHandle2 = (KeyGroupsStateHandle) FutureUtil.runIfNotDoneAndGet(snapshotState.getKeyedStateRawFuture());
        OperatorStateHandle operatorStateHandle = (OperatorStateHandle) FutureUtil.runIfNotDoneAndGet(snapshotState.getOperatorStateManagedFuture());
        OperatorStateHandle operatorStateHandle2 = (OperatorStateHandle) FutureUtil.runIfNotDoneAndGet(snapshotState.getOperatorStateRawFuture());
        return new OperatorStateHandles(0, (StreamStateHandle) null, keyGroupsStateHandle != null ? Collections.singletonList(keyGroupsStateHandle) : null, keyGroupsStateHandle2 != null ? Collections.singletonList(keyGroupsStateHandle2) : null, operatorStateHandle != null ? Collections.singletonList(operatorStateHandle) : null, operatorStateHandle2 != null ? Collections.singletonList(operatorStateHandle2) : null);
    }

    @Deprecated
    public StreamStateHandle snapshotLegacy(long j, long j2) throws Exception {
        CheckpointStreamFactory.CheckpointStateOutputStream createCheckpointStateOutputStream = this.stateBackend.createStreamFactory(new JobID(), "test_op").createCheckpointStateOutputStream(j, j2);
        if (!(this.operator instanceof StreamCheckpointedOperator)) {
            throw new RuntimeException("Operator is not StreamCheckpointedOperator");
        }
        this.operator.snapshotState(createCheckpointStateOutputStream, j, j2);
        return createCheckpointStateOutputStream.closeAndGetHandle();
    }

    public void notifyOfCompletedCheckpoint(long j) throws Exception {
        this.operator.notifyOfCompletedCheckpoint(j);
    }

    @Deprecated
    public void restore(StreamStateHandle streamStateHandle) throws Exception {
        if (!(this.operator instanceof StreamCheckpointedOperator)) {
            throw new RuntimeException("Operator is not StreamCheckpointedOperator");
        }
        FSDataInputStream openInputStream = streamStateHandle.openInputStream();
        Throwable th = null;
        try {
            try {
                this.operator.restoreState(openInputStream);
                if (openInputStream != null) {
                    if (0 == 0) {
                        openInputStream.close();
                        return;
                    }
                    try {
                        openInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (openInputStream != null) {
                if (th != null) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    openInputStream.close();
                }
            }
            throw th4;
        }
    }

    public void close() throws Exception {
        this.operator.close();
        this.operator.dispose();
        if (this.processingTimeService != null) {
            this.processingTimeService.shutdownService();
        }
        this.setupCalled = false;
    }

    public void setProcessingTime(long j) throws Exception {
        this.processingTimeService.setCurrentTime(j);
    }

    public long getProcessingTime() {
        return this.processingTimeService.getCurrentProcessingTime();
    }

    public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
        this.config.setTimeCharacteristic(timeCharacteristic);
    }

    public TimeCharacteristic getTimeCharacteristic() {
        return this.config.getTimeCharacteristic();
    }

    public boolean wasFailedExternally() {
        return this.wasFailedExternally;
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        if (this.operator instanceof AbstractStreamOperator) {
            return this.operator.numProcessingTimeTimers();
        }
        throw new UnsupportedOperationException();
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        if (this.operator instanceof AbstractStreamOperator) {
            return this.operator.numEventTimeTimers();
        }
        throw new UnsupportedOperationException();
    }
}
