package org.apache.flink.state.api;

import java.lang.invoke.SerializedLambda;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/state/api/SavepointWriterUidModificationITCase.class */
public class SavepointWriterUidModificationITCase {

    @RegisterExtension
    static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
    private static final Collection<Integer> STATE_1 = Arrays.asList(1, 2, 3);
    private static final Collection<Integer> STATE_2 = Arrays.asList(4, 5, 6);
    private static final ValueStateDescriptor<Integer> STATE_DESCRIPTOR = new ValueStateDescriptor<>("number", Types.INT);

    /* loaded from: input_file:org/apache/flink/state/api/SavepointWriterUidModificationITCase$StateBootstrapper.class */
    public static class StateBootstrapper extends KeyedStateBootstrapFunction<Integer, Integer> {
        private transient ValueState<Integer> state;

        public void open(Configuration configuration) {
            this.state = getRuntimeContext().getState(SavepointWriterUidModificationITCase.STATE_DESCRIPTOR);
        }

        public void processElement(Integer num, KeyedStateBootstrapFunction<Integer, Integer>.Context context) throws Exception {
            this.state.update(num);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedStateBootstrapFunction.Context context) throws Exception {
            processElement((Integer) obj, (KeyedStateBootstrapFunction<Integer, Integer>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/SavepointWriterUidModificationITCase$StateReader.class */
    public static class StateReader extends RichMapFunction<Integer, Integer> {
        private transient ValueState<Integer> state;

        public void open(Configuration configuration) {
            this.state = getRuntimeContext().getState(SavepointWriterUidModificationITCase.STATE_DESCRIPTOR);
        }

        public Integer map(Integer num) throws Exception {
            return (Integer) this.state.value();
        }
    }

    @Test
    public void testAddUid(@TempDir Path path) throws Exception {
        String hexString = new AbstractID().toHexString();
        runAndValidate(modifySavepoint(path, bootstrapState(path, (streamExecutionEnvironment, savepointWriter) -> {
            savepointWriter.withOperator(OperatorIdentifier.forUidHash(hexString), bootstrap(streamExecutionEnvironment, STATE_1));
        }), savepointWriter2 -> {
            savepointWriter2.changeOperatorIdentifier(OperatorIdentifier.forUidHash(hexString), OperatorIdentifier.forUid("uid"));
        }), Tuple2.of(STATE_1, "uid"));
    }

    @Test
    public void testChangeUid(@TempDir Path path) throws Exception {
        runAndValidate(modifySavepoint(path, bootstrapState(path, (streamExecutionEnvironment, savepointWriter) -> {
            savepointWriter.withOperator(OperatorIdentifier.forUid("uid"), bootstrap(streamExecutionEnvironment, STATE_1));
        }), savepointWriter2 -> {
            savepointWriter2.changeOperatorIdentifier(OperatorIdentifier.forUid("uid"), OperatorIdentifier.forUid("fabulous"));
        }), Tuple2.of(STATE_1, "fabulous"));
    }

    @Test
    public void testSwapUid(@TempDir Path path) throws Exception {
        runAndValidate(modifySavepoint(path, bootstrapState(path, (streamExecutionEnvironment, savepointWriter) -> {
            savepointWriter.withOperator(OperatorIdentifier.forUid("uid1"), bootstrap(streamExecutionEnvironment, STATE_1)).withOperator(OperatorIdentifier.forUid("uid2"), bootstrap(streamExecutionEnvironment, STATE_2));
        }), savepointWriter2 -> {
            savepointWriter2.changeOperatorIdentifier(OperatorIdentifier.forUid("uid1"), OperatorIdentifier.forUid("uid2")).changeOperatorIdentifier(OperatorIdentifier.forUid("uid2"), OperatorIdentifier.forUid("uid1"));
        }), Tuple2.of(STATE_1, "uid2"), Tuple2.of(STATE_2, "uid1"));
    }

    private static String bootstrapState(Path path, BiConsumer<StreamExecutionEnvironment, SavepointWriter> biConsumer) throws Exception {
        String path2 = path.resolve(new AbstractID().toHexString()).toAbsolutePath().toString();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        SavepointWriter newSavepoint = SavepointWriter.newSavepoint(executionEnvironment, 128);
        biConsumer.accept(executionEnvironment, newSavepoint);
        newSavepoint.write(path2);
        executionEnvironment.execute("Bootstrap");
        return path2;
    }

    private static StateBootstrapTransformation<Integer> bootstrap(StreamExecutionEnvironment streamExecutionEnvironment, Collection<Integer> collection) {
        return OperatorTransformation.bootstrapWith(streamExecutionEnvironment.fromCollection(collection)).keyBy(num -> {
            return num;
        }).transform(new StateBootstrapper());
    }

    private static String modifySavepoint(Path path, String str, Consumer<SavepointWriter> consumer) throws Exception {
        String path2 = path.resolve(new AbstractID().toHexString()).toAbsolutePath().toString();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        SavepointWriter fromExistingSavepoint = SavepointWriter.fromExistingSavepoint(executionEnvironment, str);
        consumer.accept(fromExistingSavepoint);
        fromExistingSavepoint.write(path2);
        executionEnvironment.execute("Modifying");
        return path2;
    }

    @SafeVarargs
    private static void runAndValidate(String str, Tuple2<Collection<Integer>, String>... tuple2Arr) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        ArrayList arrayList = new ArrayList();
        for (Tuple2<Collection<Integer>, String> tuple2 : tuple2Arr) {
            arrayList.add(executionEnvironment.fromCollection((Collection) tuple2.f0).keyBy(num -> {
                return num;
            }).map(new StateReader()).uid((String) tuple2.f1).collectAsync());
        }
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str, false));
        executionEnvironment.executeAsync(streamGraph);
        for (int i = 0; i < tuple2Arr.length; i++) {
            Assertions.assertThat((Iterator) arrayList.get(i)).toIterable().containsExactlyInAnyOrderElementsOf((Iterable) tuple2Arr[i].f0);
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((CloseableIterator) it.next()).close();
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1153607641:
                if (implMethodName.equals("lambda$bootstrap$7db1cb31$1")) {
                    z = true;
                    break;
                }
                break;
            case 1371267261:
                if (implMethodName.equals("lambda$runAndValidate$9face57c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/state/api/SavepointWriterUidModificationITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return num;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/state/api/SavepointWriterUidModificationITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num2 -> {
                        return num2;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
