package org.apache.flink.state.api;

import java.lang.invoke.SerializedLambda;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/state/api/SavepointDeepCopyTest.class */
public class SavepointDeepCopyTest extends AbstractTestBase {
    private static final String TEXT = "The quick brown fox jumps over the lazy dog";
    private final StateBackend backend;
    private static final MemorySize FILE_STATE_SIZE_THRESHOLD = new MemorySize(1);
    private static final String RANDOM_VALUE = RandomStringUtils.randomAlphanumeric(120);

    /* loaded from: input_file:org/apache/flink/state/api/SavepointDeepCopyTest$ReadFunction.class */
    static class ReadFunction extends KeyedStateReaderFunction<String, Tuple2<String, String>> {
        private ValueState<Tuple2<String, String>> state;

        ReadFunction() {
        }

        public void open(Configuration configuration) {
            this.state = getRuntimeContext().getState(new ValueStateDescriptor("state", Types.TUPLE(new TypeInformation[]{Types.STRING, Types.STRING})));
        }

        public void readKey(String str, KeyedStateReaderFunction.Context context, Collector<Tuple2<String, String>> collector) throws Exception {
            collector.collect(this.state.value());
        }

        public /* bridge */ /* synthetic */ void readKey(Object obj, KeyedStateReaderFunction.Context context, Collector collector) throws Exception {
            readKey((String) obj, context, (Collector<Tuple2<String, String>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/SavepointDeepCopyTest$WordMapBootstrapper.class */
    static class WordMapBootstrapper extends KeyedStateBootstrapFunction<String, String> {
        private ValueState<Tuple2<String, String>> state;

        WordMapBootstrapper() {
        }

        public void open(Configuration configuration) {
            this.state = getRuntimeContext().getState(new ValueStateDescriptor("state", Types.TUPLE(new TypeInformation[]{Types.STRING, Types.STRING})));
        }

        public void processElement(String str, KeyedStateBootstrapFunction<String, String>.Context context) throws Exception {
            if (this.state.value() == null) {
                this.state.update(new Tuple2(str, SavepointDeepCopyTest.RANDOM_VALUE));
            }
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedStateBootstrapFunction.Context context) throws Exception {
            processElement((String) obj, (KeyedStateBootstrapFunction<String, String>.Context) context);
        }
    }

    public SavepointDeepCopyTest(StateBackend stateBackend) throws Exception {
        this.backend = stateBackend;
    }

    @Parameterized.Parameters(name = "State Backend: {0}")
    public static Collection<StateBackend> data() {
        return Arrays.asList(new FsStateBackend(new Path("file:///tmp").toUri()), new RocksDBStateBackend(new FsStateBackend(new Path("file:///tmp").toUri())), new HashMapStateBackend(), new EmbeddedRocksDBStateBackend());
    }

    @Test
    public void testSavepointDeepCopy() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        BootstrapTransformation transform = OperatorTransformation.bootstrapWith(executionEnvironment.fromElements(TEXT.split(" "))).keyBy(str -> {
            return str;
        }).transform(new WordMapBootstrapper());
        String path = createAndRegisterTempFile(new AbstractID().toHexString()).getPath();
        Savepoint.create(this.backend, 128).withConfiguration(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, FILE_STATE_SIZE_THRESHOLD).withOperator("Operator1", transform).write(path);
        executionEnvironment.execute("bootstrap savepoint1");
        Assert.assertTrue("Failed to bootstrap savepoint1 with additional state files", Files.list(Paths.get(path, new String[0])).count() > 1);
        Set set = (Set) Files.list(Paths.get(path, new String[0])).map(path2 -> {
            return path2.getFileName().toString();
        }).collect(Collectors.toSet());
        String path3 = createAndRegisterTempFile(new AbstractID().toHexString()).getPath();
        Savepoint.load(executionEnvironment, path, this.backend).withConfiguration(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, FILE_STATE_SIZE_THRESHOLD).withOperator("Operator2", transform).write(path3);
        executionEnvironment.execute("create savepoint2");
        Assert.assertTrue("Failed to create savepoint2 from savepoint1 with additional state files", Files.list(Paths.get(path3, new String[0])).count() > 1);
        Assert.assertThat("At least one state file in savepoint1 are not in savepoint2", set, Matchers.everyItem(Matchers.isIn((Set) Files.list(Paths.get(path3, new String[0])).map(path4 -> {
            return path4.getFileName().toString();
        }).collect(Collectors.toSet()))));
        Assert.assertEquals("Unexpected number of keys in the state of Operator1", Arrays.stream(TEXT.split(" ")).distinct().count(), Savepoint.load(executionEnvironment, path3, this.backend).readKeyedState("Operator1", new ReadFunction()).count());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 112578127:
                if (implMethodName.equals("lambda$testSavepointDeepCopy$c179e206$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/state/api/SavepointDeepCopyTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str -> {
                        return str;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
