/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api;

import java.io.File;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.NewSavepoint;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SavepointDeepCopyTest
extends AbstractTestBase {
    private static final String TEXT = "The quick brown fox jumps over the lazy dog";
    private static final String RANDOM_VALUE = RandomStringUtils.randomAlphanumeric((int)120);
    private static final int FILE_STATE_SIZE_THRESHOLD = 1;
    private final StateBackend backend;

    public SavepointDeepCopyTest(StateBackend backend) throws Exception {
        this.backend = backend;
    }

    @Parameterized.Parameters(name="State Backend: {0}")
    public static Collection<StateBackend> data() {
        return Arrays.asList(new FsStateBackend(new Path("file:///tmp").toUri(), 1), new RocksDBStateBackend((AbstractStateBackend)new FsStateBackend(new Path("file:///tmp").toUri(), 1)));
    }

    @Test
    public void testSavepointDeepCopy() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource words = env.fromElements((Object[])TEXT.split(" "));
        BootstrapTransformation transformation = OperatorTransformation.bootstrapWith((DataSet)words).keyBy((KeySelector & Serializable)e -> e).transform((KeyedStateBootstrapFunction)new WordMapBootstrapper());
        File savepointUrl1 = this.createAndRegisterTempFile(new AbstractID().toHexString());
        String savepointPath1 = savepointUrl1.getPath();
        ((NewSavepoint)Savepoint.create((StateBackend)this.backend, (int)128).withOperator("Operator1", transformation)).write(savepointPath1);
        env.execute("bootstrap savepoint1");
        Assert.assertTrue((String)"Failed to bootstrap savepoint1 with additional state files", (Files.list(Paths.get(savepointPath1, new String[0])).count() > 1L ? 1 : 0) != 0);
        Set stateFiles1 = Files.list(Paths.get(savepointPath1, new String[0])).map(path -> path.getFileName().toString()).collect(Collectors.toSet());
        File savepointUrl2 = this.createAndRegisterTempFile(new AbstractID().toHexString());
        String savepointPath2 = savepointUrl2.getPath();
        ExistingSavepoint savepoint2 = Savepoint.load((ExecutionEnvironment)env, (String)savepointPath1, (StateBackend)this.backend);
        ((ExistingSavepoint)savepoint2.withOperator("Operator2", transformation)).write(savepointPath2);
        env.execute("create savepoint2");
        Assert.assertTrue((String)"Failed to create savepoint2 from savepoint1 with additional state files", (Files.list(Paths.get(savepointPath2, new String[0])).count() > 1L ? 1 : 0) != 0);
        Set stateFiles2 = Files.list(Paths.get(savepointPath2, new String[0])).map(path -> path.getFileName().toString()).collect(Collectors.toSet());
        Assert.assertThat((String)"At least one state file in savepoint1 are not in savepoint2", stateFiles1, (Matcher)Matchers.everyItem((Matcher)Matchers.isIn(stateFiles2)));
        long actuallyKeyNum = Savepoint.load((ExecutionEnvironment)env, (String)savepointPath2, (StateBackend)this.backend).readKeyedState("Operator1", (KeyedStateReaderFunction)new ReadFunction()).count();
        long expectedKeyNum = Arrays.stream(TEXT.split(" ")).distinct().count();
        Assert.assertEquals((String)"Unexpected number of keys in the state of Operator1", (long)expectedKeyNum, (long)actuallyKeyNum);
    }

    static class ReadFunction
    extends KeyedStateReaderFunction<String, Tuple2<String, String>> {
        private ValueState<Tuple2<String, String>> state;

        ReadFunction() {
        }

        public void open(Configuration parameters) {
            ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("state", Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.STRING, Types.STRING}));
            this.state = this.getRuntimeContext().getState(stateDescriptor);
        }

        public void readKey(String key, KeyedStateReaderFunction.Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
            out.collect(this.state.value());
        }
    }

    static class WordMapBootstrapper
    extends KeyedStateBootstrapFunction<String, String> {
        private ValueState<Tuple2<String, String>> state;

        WordMapBootstrapper() {
        }

        public void open(Configuration parameters) {
            ValueStateDescriptor descriptor = new ValueStateDescriptor("state", Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.STRING, Types.STRING}));
            this.state = this.getRuntimeContext().getState(descriptor);
        }

        public void processElement(String value, KeyedStateBootstrapFunction.Context ctx) throws Exception {
            if (this.state.value() == null) {
                this.state.update((Object)new Tuple2((Object)value, (Object)RANDOM_VALUE));
            }
        }
    }
}

