package org.apache.flink.streaming.util;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Migration;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.class */
public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> extends OneInputStreamOperatorTestHarness<IN, OUT> {
    private AbstractKeyedStateBackend<?> keyedStateBackend;
    private List<KeyedStateHandle> restoredKeyedState;

    public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> oneInputStreamOperator, KeySelector<IN, K> keySelector, TypeInformation<K> typeInformation, int i, int i2, int i3) throws Exception {
        super(oneInputStreamOperator, i, i2, i3);
        this.keyedStateBackend = null;
        this.restoredKeyedState = null;
        ClosureCleaner.clean(keySelector, false);
        this.config.setStatePartitioner(0, keySelector);
        this.config.setStateKeySerializer(typeInformation.createSerializer(this.executionConfig));
        setupMockTaskCreateKeyedBackend();
    }

    public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> oneInputStreamOperator, KeySelector<IN, K> keySelector, TypeInformation<K> typeInformation) throws Exception {
        this(oneInputStreamOperator, keySelector, typeInformation, 1, 1, 0);
    }

    private void setupMockTaskCreateKeyedBackend() {
        try {
            ((StreamTask) Mockito.doAnswer(new Answer<KeyedStateBackend>() { // from class: org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness.1
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public KeyedStateBackend m101answer(InvocationOnMock invocationOnMock) throws Throwable {
                    TypeSerializer typeSerializer = (TypeSerializer) invocationOnMock.getArguments()[0];
                    int intValue = ((Integer) invocationOnMock.getArguments()[1]).intValue();
                    KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2];
                    if (KeyedOneInputStreamOperatorTestHarness.this.keyedStateBackend != null) {
                        KeyedOneInputStreamOperatorTestHarness.this.keyedStateBackend.dispose();
                    }
                    KeyedOneInputStreamOperatorTestHarness.this.keyedStateBackend = KeyedOneInputStreamOperatorTestHarness.this.stateBackend.createKeyedStateBackend(KeyedOneInputStreamOperatorTestHarness.this.mockTask.getEnvironment(), new JobID(), "test_op", typeSerializer, intValue, keyGroupRange, KeyedOneInputStreamOperatorTestHarness.this.mockTask.getEnvironment().getTaskKvStateRegistry());
                    KeyedOneInputStreamOperatorTestHarness.this.keyedStateBackend.restore(KeyedOneInputStreamOperatorTestHarness.this.restoredKeyedState);
                    return KeyedOneInputStreamOperatorTestHarness.this.keyedStateBackend;
                }
            }).when(this.mockTask)).createKeyedStateBackend((TypeSerializer) Matchers.any(TypeSerializer.class), Mockito.anyInt(), (KeyGroupRange) Matchers.any(KeyGroupRange.class));
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override // org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness
    public StreamStateHandle snapshotLegacy(long j, long j2) throws Exception {
        CheckpointStreamFactory createStreamFactory = new MemoryStateBackend().createStreamFactory(new JobID(), "test_op");
        CheckpointStreamFactory.CheckpointStateOutputStream createCheckpointStateOutputStream = createStreamFactory.createCheckpointStateOutputStream(j, j2);
        if (this.operator instanceof StreamCheckpointedOperator) {
            this.operator.snapshotState(createCheckpointStateOutputStream, j, j2);
        }
        if (this.keyedStateBackend != null) {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(j, j2, createStreamFactory, CheckpointOptions.forFullCheckpoint());
            if (!snapshot.isDone()) {
                new Thread(snapshot).start();
            }
            createCheckpointStateOutputStream.write(1);
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(createCheckpointStateOutputStream);
            objectOutputStream.writeObject(snapshot.get());
            objectOutputStream.flush();
        } else {
            createCheckpointStateOutputStream.write(0);
        }
        return createCheckpointStateOutputStream.closeAndGetHandle();
    }

    @Override // org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness
    public void restore(StreamStateHandle streamStateHandle) throws Exception {
        FSDataInputStream openInputStream = streamStateHandle.openInputStream();
        Throwable th = null;
        try {
            try {
                if (this.operator instanceof StreamCheckpointedOperator) {
                    this.operator.restoreState(openInputStream);
                }
                if (((byte) openInputStream.read()) == 1) {
                    this.restoredKeyedState = Collections.singletonList((KeyedStateHandle) new ObjectInputStream(openInputStream).readObject());
                }
                if (openInputStream != null) {
                    if (0 == 0) {
                        openInputStream.close();
                        return;
                    }
                    try {
                        openInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (openInputStream != null) {
                if (th != null) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    openInputStream.close();
                }
            }
            throw th4;
        }
    }

    private static boolean hasMigrationHandles(Collection<KeyedStateHandle> collection) {
        Iterator<KeyedStateHandle> it = collection.iterator();
        while (it.hasNext()) {
            if (it.next() instanceof Migration) {
                return true;
            }
        }
        return false;
    }

    public int numKeyedStateEntries() {
        if (this.keyedStateBackend instanceof HeapKeyedStateBackend) {
            return this.keyedStateBackend.numStateEntries();
        }
        throw new UnsupportedOperationException();
    }

    public <N> int numKeyedStateEntries(N n) {
        if (this.keyedStateBackend instanceof HeapKeyedStateBackend) {
            return this.keyedStateBackend.numStateEntries(n);
        }
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness
    public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
        if (operatorStateHandles != null) {
            int maxNumberOfParallelSubtasks = getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks();
            int numberOfParallelSubtasks = getEnvironment().getTaskInfo().getNumberOfParallelSubtasks();
            KeyGroupRange keyGroupRange = (KeyGroupRange) StateAssignmentOperation.createKeyGroupPartitions(maxNumberOfParallelSubtasks, numberOfParallelSubtasks).get(getEnvironment().getTaskInfo().getIndexOfThisSubtask());
            this.restoredKeyedState = null;
            Collection managedKeyedState = operatorStateHandles.getManagedKeyedState();
            if (managedKeyedState != null) {
                if (hasMigrationHandles(managedKeyedState)) {
                    ArrayList arrayList = new ArrayList(managedKeyedState.size());
                    arrayList.addAll(managedKeyedState);
                    this.restoredKeyedState = arrayList;
                } else {
                    this.restoredKeyedState = StateAssignmentOperation.getKeyedStateHandles(managedKeyedState, keyGroupRange);
                }
            }
        }
        super.initializeState(operatorStateHandles);
    }
}
