package org.apache.flink.state.api;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.StateBootstrapFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.util.StreamCollector;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/state/api/SavepointWriterITCase.class */
public class SavepointWriterITCase extends AbstractTestBase {
    private static final String ACCOUNT_UID = "accounts";
    private static final String CURRENCY_UID = "currency";
    private static final String MODIFY_UID = "numbers";

    @Rule
    public StreamCollector collector = new StreamCollector();
    private static final MapStateDescriptor<String, Double> descriptor = new MapStateDescriptor<>("currency-rate", Types.STRING, Types.DOUBLE);
    private static final int FILE_STATE_SIZE = 1;
    private static final Collection<Account> accounts = Arrays.asList(new Account(FILE_STATE_SIZE, 100.0d), new Account(2, 100.0d), new Account(3, 100.0d));
    private static final Collection<CurrencyRate> currencyRates = Arrays.asList(new CurrencyRate("USD", 1.0d), new CurrencyRate("EUR", 1.3d));

    /* loaded from: input_file:org/apache/flink/state/api/SavepointWriterITCase$Account.class */
    public static class Account {
        public int id;
        public double amount;
        public long timestamp = 1000;

        Account(int i, double d) {
            this.id = i;
            this.amount = d;
        }

        public boolean equals(Object obj) {
            return (obj instanceof Account) && ((Account) obj).id == this.id && ((Account) obj).amount == this.amount;
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.id), Double.valueOf(this.amount));
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/SavepointWriterITCase$AccountBootstrapper.class */
    public static class AccountBootstrapper extends KeyedStateBootstrapFunction<Integer, Account> {
        ValueState<Double> state;

        public void open(Configuration configuration) {
            this.state = getRuntimeContext().getState(new ValueStateDescriptor("total", Types.DOUBLE));
        }

        public void processElement(Account account, KeyedStateBootstrapFunction<Integer, Account>.Context context) throws Exception {
            this.state.update(Double.valueOf(account.amount));
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedStateBootstrapFunction.Context context) throws Exception {
            processElement((Account) obj, (KeyedStateBootstrapFunction<Integer, Account>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/SavepointWriterITCase$CurrencyBootstrapFunction.class */
    public static class CurrencyBootstrapFunction extends BroadcastStateBootstrapFunction<CurrencyRate> {
        public void processElement(CurrencyRate currencyRate, BroadcastStateBootstrapFunction.Context context) throws Exception {
            context.getBroadcastState(SavepointWriterITCase.descriptor).put(currencyRate.currency, currencyRate.rate);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/SavepointWriterITCase$CurrencyRate.class */
    public static class CurrencyRate {
        public String currency;
        public Double rate;

        CurrencyRate(String str, double d) {
            this.currency = str;
            this.rate = Double.valueOf(d);
        }

        public boolean equals(Object obj) {
            return (obj instanceof CurrencyRate) && ((CurrencyRate) obj).currency.equals(this.currency) && ((CurrencyRate) obj).rate.equals(this.rate);
        }

        public int hashCode() {
            return Objects.hash(this.currency, this.rate);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/SavepointWriterITCase$CurrencyValidationFunction.class */
    public static class CurrencyValidationFunction extends BroadcastProcessFunction<CurrencyRate, CurrencyRate, Void> {
        public void processElement(CurrencyRate currencyRate, BroadcastProcessFunction<CurrencyRate, CurrencyRate, Void>.ReadOnlyContext readOnlyContext, Collector<Void> collector) throws Exception {
            Assert.assertEquals("Incorrect currency rate", currencyRate.rate.doubleValue(), ((Double) readOnlyContext.getBroadcastState(SavepointWriterITCase.descriptor).get(currencyRate.currency)).doubleValue(), 1.0E-4d);
        }

        public void processBroadcastElement(CurrencyRate currencyRate, BroadcastProcessFunction<CurrencyRate, CurrencyRate, Void>.Context context, Collector<Void> collector) {
        }

        public /* bridge */ /* synthetic */ void processBroadcastElement(Object obj, BroadcastProcessFunction.Context context, Collector collector) throws Exception {
            processBroadcastElement((CurrencyRate) obj, (BroadcastProcessFunction<CurrencyRate, CurrencyRate, Void>.Context) context, (Collector<Void>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, BroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
            processElement((CurrencyRate) obj, (BroadcastProcessFunction<CurrencyRate, CurrencyRate, Void>.ReadOnlyContext) readOnlyContext, (Collector<Void>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/SavepointWriterITCase$ModifyProcessFunction.class */
    public static class ModifyProcessFunction extends StateBootstrapFunction<Integer> {
        List<Integer> numbers;
        ListState<Integer> state;

        public void open(Configuration configuration) {
            this.numbers = new ArrayList();
        }

        public void processElement(Integer num, StateBootstrapFunction.Context context) {
            this.numbers.add(num);
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.state.clear();
            this.state.addAll(this.numbers);
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.state = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor(SavepointWriterITCase.MODIFY_UID, Types.INT));
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/SavepointWriterITCase$StatefulOperator.class */
    public static class StatefulOperator extends RichMapFunction<Integer, Integer> implements CheckpointedFunction {
        List<Integer> numbers;
        ListState<Integer> state;

        public void open(Configuration configuration) {
            this.numbers = new ArrayList();
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.state.clear();
            this.state.addAll(this.numbers);
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.state = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor(SavepointWriterITCase.MODIFY_UID, Types.INT));
            if (functionInitializationContext.isRestored()) {
                HashSet hashSet = new HashSet();
                hashSet.add(Integer.valueOf(SavepointWriterITCase.FILE_STATE_SIZE));
                hashSet.add(2);
                hashSet.add(3);
                for (Integer num : (Iterable) this.state.get()) {
                    Assert.assertTrue("Duplicate state", hashSet.contains(num));
                    hashSet.remove(num);
                }
                Assert.assertTrue("Failed to bootstrap all state elements: " + Arrays.toString(hashSet.toArray()), hashSet.isEmpty());
            }
        }

        public Integer map(Integer num) {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/SavepointWriterITCase$UpdateAndGetAccount.class */
    public static class UpdateAndGetAccount extends RichFlatMapFunction<Account, Account> {
        ValueState<Double> state;

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.state = getRuntimeContext().getState(new ValueStateDescriptor("total", Types.DOUBLE));
        }

        public void flatMap(Account account, Collector<Account> collector) throws Exception {
            Double d = (Double) this.state.value();
            if (d != null) {
                account.amount += d.doubleValue();
            }
            this.state.update(Double.valueOf(account.amount));
            collector.collect(account);
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Account) obj, (Collector<Account>) collector);
        }
    }

    @Test
    public void testFsStateBackend() throws Exception {
        testStateBootstrapAndModification(new FsStateBackend(TEMPORARY_FOLDER.newFolder().toURI(), FILE_STATE_SIZE));
    }

    @Test
    public void testRocksDBStateBackend() throws Exception {
        testStateBootstrapAndModification(new RocksDBStateBackend(new FsStateBackend(TEMPORARY_FOLDER.newFolder().toURI(), FILE_STATE_SIZE)));
    }

    @Test
    public void testHashMapStateBackend() throws Exception {
        testStateBootstrapAndModification(new HashMapStateBackend());
    }

    @Test
    public void testEmbeddedRocksDBStateBackend() throws Exception {
        testStateBootstrapAndModification(new EmbeddedRocksDBStateBackend());
    }

    public void testStateBootstrapAndModification(StateBackend stateBackend) throws Exception {
        String tempDirPath = getTempDirPath(new AbstractID().toHexString());
        bootstrapState(stateBackend, tempDirPath);
        validateBootstrap(stateBackend, tempDirPath);
        String tempDirPath2 = getTempDirPath(new AbstractID().toHexString());
        modifySavepoint(stateBackend, tempDirPath, tempDirPath2);
        validateModification(stateBackend, tempDirPath2);
    }

    private void bootstrapState(StateBackend stateBackend, String str) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        BootstrapTransformation transform = OperatorTransformation.bootstrapWith(executionEnvironment.fromCollection(accounts)).keyBy(account -> {
            return Integer.valueOf(account.id);
        }).transform(new AccountBootstrapper());
        Savepoint.create(stateBackend, 128).withOperator(ACCOUNT_UID, transform).withOperator(CURRENCY_UID, OperatorTransformation.bootstrapWith(executionEnvironment.fromCollection(currencyRates)).transform(new CurrencyBootstrapFunction())).write(str);
        executionEnvironment.execute("Bootstrap");
    }

    private void validateBootstrap(StateBackend stateBackend, String str) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(stateBackend);
        CompletableFuture collect = this.collector.collect(executionEnvironment.fromCollection(accounts).keyBy(account -> {
            return Integer.valueOf(account.id);
        }).flatMap(new UpdateAndGetAccount()).uid(ACCOUNT_UID));
        executionEnvironment.fromCollection(currencyRates).connect(executionEnvironment.fromCollection(currencyRates).broadcast(new MapStateDescriptor[]{descriptor})).process(new CurrencyValidationFunction()).uid(CURRENCY_UID).addSink(new DiscardingSink());
        JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str, false));
        ClusterClient clusterClient = miniClusterResource.getClusterClient();
        CompletableFuture submitJob = clusterClient.submitJob(jobGraph);
        clusterClient.getClass();
        ((JobResult) submitJob.thenCompose(clusterClient::requestJobResult).get()).getSerializedThrowable().ifPresent(serializedThrowable -> {
            throw new AssertionError("Unexpected exception during bootstrapping", serializedThrowable);
        });
        Assert.assertEquals("Unexpected output", 3L, ((Collection) collect.get()).size());
    }

    private void modifySavepoint(StateBackend stateBackend, String str, String str2) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        Savepoint.load(executionEnvironment, str, stateBackend).removeOperator(CURRENCY_UID).withOperator(MODIFY_UID, OperatorTransformation.bootstrapWith(executionEnvironment.fromElements(new Integer[]{Integer.valueOf(FILE_STATE_SIZE), 2, 3})).transform(new ModifyProcessFunction())).write(str2);
        executionEnvironment.execute("Modifying");
    }

    private void validateModification(StateBackend stateBackend, String str) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(stateBackend);
        SingleOutputStreamOperator uid = executionEnvironment.fromCollection(accounts).keyBy(account -> {
            return Integer.valueOf(account.id);
        }).flatMap(new UpdateAndGetAccount()).uid(ACCOUNT_UID);
        CompletableFuture collect = this.collector.collect(uid);
        uid.map(account2 -> {
            return Integer.valueOf(account2.id);
        }).map(new StatefulOperator()).uid(MODIFY_UID).addSink(new DiscardingSink());
        JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str, false));
        ClusterClient clusterClient = miniClusterResource.getClusterClient();
        CompletableFuture submitJob = clusterClient.submitJob(jobGraph);
        clusterClient.getClass();
        Assert.assertFalse(((JobResult) submitJob.thenCompose(clusterClient::requestJobResult).get()).getSerializedThrowable().isPresent());
        Assert.assertEquals("Unexpected output", 3L, ((Collection) collect.get()).size());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1708148361:
                if (implMethodName.equals("lambda$validateBootstrap$2c4ffb13$1")) {
                    z = FILE_STATE_SIZE;
                    break;
                }
                break;
            case -1226531559:
                if (implMethodName.equals("lambda$validateModification$bedaaf0b$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1198595489:
                if (implMethodName.equals("lambda$bootstrapState$bfbbf2aa$1")) {
                    z = false;
                    break;
                }
                break;
            case 1462560963:
                if (implMethodName.equals("lambda$validateModification$2c4ffb13$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/state/api/SavepointWriterITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/state/api/SavepointWriterITCase$Account;)Ljava/lang/Integer;")) {
                    return account -> {
                        return Integer.valueOf(account.id);
                    };
                }
                break;
            case FILE_STATE_SIZE /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/state/api/SavepointWriterITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/state/api/SavepointWriterITCase$Account;)Ljava/lang/Integer;")) {
                    return account2 -> {
                        return Integer.valueOf(account2.id);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/state/api/SavepointWriterITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/state/api/SavepointWriterITCase$Account;)Ljava/lang/Integer;")) {
                    return account22 -> {
                        return Integer.valueOf(account22.id);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/state/api/SavepointWriterITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/state/api/SavepointWriterITCase$Account;)Ljava/lang/Integer;")) {
                    return account3 -> {
                        return Integer.valueOf(account3.id);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
