package org.apache.flink.test.state;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler;
import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.operators.TimerSerializer;
import org.apache.flink.test.state.BackendSwitchSpecs;
import org.apache.flink.util.InstantiationUtil;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/state/SavepointStateBackendSwitchTestBase.class */
public abstract class SavepointStateBackendSwitchTestBase {
    private static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 1);
    private static final int NUM_KEY_GROUPS = KEY_GROUP_RANGE.getNumberOfKeyGroups();

    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();
    private final BackendSwitchSpecs.BackendSwitchSpec fromBackend;
    private final BackendSwitchSpecs.BackendSwitchSpec toBackend;

    /* JADX INFO: Access modifiers changed from: protected */
    public SavepointStateBackendSwitchTestBase(BackendSwitchSpecs.BackendSwitchSpec backendSwitchSpec, BackendSwitchSpecs.BackendSwitchSpec backendSwitchSpec2) {
        this.fromBackend = backendSwitchSpec;
        this.toBackend = backendSwitchSpec2;
    }

    @Test
    public void switchStateBackend() throws Exception {
        Throwable th;
        BufferedInputStream bufferedInputStream;
        Throwable th2;
        File newFile = tempFolder.newFile();
        MapStateDescriptor<Long, Long> mapStateDescriptor = new MapStateDescriptor<>("my-map-state", Long.class, Long.class);
        mapStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueStateDescriptor<Long> valueStateDescriptor = new ValueStateDescriptor<>("my-value-state", Long.class);
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<>("my-list-state", Long.class);
        listStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        CheckpointableKeyedStateBackend<String> createBackend = this.fromBackend.createBackend(KEY_GROUP_RANGE, NUM_KEY_GROUPS, Collections.emptyList());
        Throwable th3 = null;
        try {
            try {
                takeSavepoint(createBackend, newFile, mapStateDescriptor, valueStateDescriptor, listStateDescriptor, 1, 2, 3, 4);
                if (createBackend != null) {
                    if (0 != 0) {
                        try {
                            createBackend.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    } else {
                        createBackend.close();
                    }
                }
                bufferedInputStream = new BufferedInputStream(new FileInputStream(newFile));
                th2 = null;
            } catch (Throwable th5) {
                th3 = th5;
                throw th5;
            }
            try {
                try {
                    SnapshotResult snapshotResult = (SnapshotResult) InstantiationUtil.deserializeObject(bufferedInputStream, Thread.currentThread().getContextClassLoader());
                    if (bufferedInputStream != null) {
                        if (0 != 0) {
                            try {
                                bufferedInputStream.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            bufferedInputStream.close();
                        }
                    }
                    createBackend = this.toBackend.createBackend(KEY_GROUP_RANGE, NUM_KEY_GROUPS, StateObjectCollection.singleton(snapshotResult.getJobManagerOwnedSnapshot()));
                    th = null;
                } catch (Throwable th7) {
                    th2 = th7;
                    throw th7;
                }
                try {
                    try {
                        verifyRestoredState(mapStateDescriptor, valueStateDescriptor, listStateDescriptor, 1, 2, 3, 4, createBackend);
                        if (createBackend != null) {
                            if (0 == 0) {
                                createBackend.close();
                                return;
                            }
                            try {
                                createBackend.close();
                            } catch (Throwable th8) {
                                th.addSuppressed(th8);
                            }
                        }
                    } catch (Throwable th9) {
                        th = th9;
                        throw th9;
                    }
                } finally {
                }
            } catch (Throwable th10) {
                if (bufferedInputStream != null) {
                    if (th2 != null) {
                        try {
                            bufferedInputStream.close();
                        } catch (Throwable th11) {
                            th2.addSuppressed(th11);
                        }
                    } else {
                        bufferedInputStream.close();
                    }
                }
                throw th10;
            }
        } finally {
        }
    }

    private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> internalMapState) throws Exception {
        int i = 0;
        Iterator it = internalMapState.iterator();
        while (it.hasNext()) {
            i++;
            it.next();
        }
        return i;
    }

    private void takeSavepoint(CheckpointableKeyedStateBackend<String> checkpointableKeyedStateBackend, File file, MapStateDescriptor<Long, Long> mapStateDescriptor, ValueStateDescriptor<Long> valueStateDescriptor, ListStateDescriptor<Long> listStateDescriptor, Integer num, Integer num2, Integer num3, Integer num4) throws Exception {
        InternalMapState createInternalState = checkpointableKeyedStateBackend.createInternalState(IntSerializer.INSTANCE, mapStateDescriptor);
        InternalValueState createInternalState2 = checkpointableKeyedStateBackend.createInternalState(IntSerializer.INSTANCE, valueStateDescriptor);
        InternalListState createInternalState3 = checkpointableKeyedStateBackend.createInternalState(IntSerializer.INSTANCE, listStateDescriptor);
        checkpointableKeyedStateBackend.setCurrentKey("abc");
        createInternalState.setCurrentNamespace(num);
        createInternalState.put(33L, 33L);
        createInternalState.put(55L, 55L);
        createInternalState.setCurrentNamespace(num2);
        createInternalState.put(22L, 22L);
        createInternalState.put(11L, 11L);
        createInternalState3.setCurrentNamespace(num2);
        createInternalState3.add(4L);
        createInternalState3.add(5L);
        createInternalState3.add(6L);
        createInternalState.setCurrentNamespace(num3);
        createInternalState.put(44L, 44L);
        checkpointableKeyedStateBackend.setCurrentKey("mno");
        createInternalState.setCurrentNamespace(num3);
        createInternalState.put(11L, 11L);
        createInternalState.put(22L, 22L);
        createInternalState.put(33L, 33L);
        createInternalState.put(44L, 44L);
        createInternalState.put(55L, 55L);
        createInternalState2.setCurrentNamespace(num3);
        createInternalState2.update(1239L);
        createInternalState3.setCurrentNamespace(num3);
        createInternalState3.add(1L);
        createInternalState3.add(2L);
        createInternalState3.add(3L);
        createInternalState.setCurrentNamespace(num4);
        createInternalState.put(1L, 1L);
        Iterator it = createInternalState.iterator();
        while (it.hasNext()) {
            it.next();
            it.remove();
        }
        KeyGroupedInternalPriorityQueue create = checkpointableKeyedStateBackend.create("event-time", new TimerSerializer(checkpointableKeyedStateBackend.getKeySerializer(), IntSerializer.INSTANCE));
        create.add(new TimerHeapInternalTimer(1234L, "mno", num3));
        create.add(new TimerHeapInternalTimer(2345L, "mno", num2));
        create.add(new TimerHeapInternalTimer(3456L, "mno", num3));
        RunnableFuture snapshot = StreamOperatorStateHandler.prepareCanonicalSavepoint(checkpointableKeyedStateBackend, new CloseableRegistry()).snapshot(0L, 0L, new MemCheckpointStreamFactory(4194304), new CheckpointOptions(SavepointType.savepoint(SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()));
        snapshot.run();
        BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(file));
        Throwable th = null;
        try {
            try {
                InstantiationUtil.serializeObject(bufferedOutputStream, snapshot.get());
                if (bufferedOutputStream != null) {
                    if (0 == 0) {
                        bufferedOutputStream.close();
                        return;
                    }
                    try {
                        bufferedOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (bufferedOutputStream != null) {
                if (th != null) {
                    try {
                        bufferedOutputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    bufferedOutputStream.close();
                }
            }
            throw th4;
        }
    }

    private void verifyRestoredState(MapStateDescriptor<Long, Long> mapStateDescriptor, ValueStateDescriptor<Long> valueStateDescriptor, ListStateDescriptor<Long> listStateDescriptor, Integer num, Integer num2, Integer num3, Integer num4, CheckpointableKeyedStateBackend<String> checkpointableKeyedStateBackend) throws Exception {
        InternalMapState createInternalState = checkpointableKeyedStateBackend.createInternalState(IntSerializer.INSTANCE, mapStateDescriptor);
        InternalValueState createInternalState2 = checkpointableKeyedStateBackend.createInternalState(IntSerializer.INSTANCE, valueStateDescriptor);
        InternalListState createInternalState3 = checkpointableKeyedStateBackend.createInternalState(IntSerializer.INSTANCE, listStateDescriptor);
        checkpointableKeyedStateBackend.setCurrentKey("abc");
        createInternalState.setCurrentNamespace(num);
        Assert.assertEquals(33L, ((Long) createInternalState.get(33L)).longValue());
        Assert.assertEquals(55L, ((Long) createInternalState.get(55L)).longValue());
        Assert.assertEquals(2L, getStateSize(createInternalState));
        createInternalState.setCurrentNamespace(num2);
        Assert.assertEquals(22L, ((Long) createInternalState.get(22L)).longValue());
        Assert.assertEquals(11L, ((Long) createInternalState.get(11L)).longValue());
        Assert.assertEquals(2L, getStateSize(createInternalState));
        createInternalState3.setCurrentNamespace(num2);
        Assert.assertThat(createInternalState3.get(), Matchers.contains(new Long[]{4L, 5L, 6L}));
        createInternalState.setCurrentNamespace(num3);
        Assert.assertEquals(44L, ((Long) createInternalState.get(44L)).longValue());
        Assert.assertEquals(1L, getStateSize(createInternalState));
        checkpointableKeyedStateBackend.setCurrentKey("mno");
        createInternalState.setCurrentNamespace(num3);
        Assert.assertEquals(11L, ((Long) createInternalState.get(11L)).longValue());
        Assert.assertEquals(22L, ((Long) createInternalState.get(22L)).longValue());
        Assert.assertEquals(33L, ((Long) createInternalState.get(33L)).longValue());
        Assert.assertEquals(44L, ((Long) createInternalState.get(44L)).longValue());
        Assert.assertEquals(55L, ((Long) createInternalState.get(55L)).longValue());
        Assert.assertEquals(5L, getStateSize(createInternalState));
        createInternalState2.setCurrentNamespace(num3);
        Assert.assertEquals(1239L, ((Long) createInternalState2.value()).longValue());
        createInternalState3.setCurrentNamespace(num3);
        Assert.assertThat(createInternalState3.get(), Matchers.contains(new Long[]{1L, 2L, 3L}));
        createInternalState.setCurrentNamespace(num4);
        Assert.assertThat(Boolean.valueOf(createInternalState.isEmpty()), CoreMatchers.is(true));
        KeyGroupedInternalPriorityQueue create = checkpointableKeyedStateBackend.create("event-time", new TimerSerializer(checkpointableKeyedStateBackend.getKeySerializer(), IntSerializer.INSTANCE));
        Assert.assertThat(Integer.valueOf(create.size()), CoreMatchers.equalTo(3));
        Assert.assertThat(create.poll(), CoreMatchers.equalTo(new TimerHeapInternalTimer(1234L, "mno", num3)));
        Assert.assertThat(create.poll(), CoreMatchers.equalTo(new TimerHeapInternalTimer(2345L, "mno", num2)));
        Assert.assertThat(create.poll(), CoreMatchers.equalTo(new TimerHeapInternalTimer(3456L, "mno", num3)));
    }
}
