/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.SavepointReaderITCase;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public abstract class SavepointReaderITTestBase
extends AbstractTestBase {
    static final String UID = "stateful-operator";
    static final String LIST_NAME = "list";
    static final String UNION_NAME = "union";
    static final String BROADCAST_NAME = "broadcast";
    private final ListStateDescriptor<Integer> list;
    private final ListStateDescriptor<Integer> union;
    private final MapStateDescriptor<Integer, String> broadcast;

    SavepointReaderITTestBase(ListStateDescriptor<Integer> list, ListStateDescriptor<Integer> union, MapStateDescriptor<Integer, String> broadcast) {
        this.list = list;
        this.union = union;
        this.broadcast = broadcast;
    }

    @Test
    public void testOperatorStateInputFormat() throws Exception {
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamEnv.setParallelism(4);
        DataStream data = streamEnv.addSource((SourceFunction)new SavepointSource()).rebalance();
        data.connect(data.broadcast(new MapStateDescriptor[]{this.broadcast})).process((BroadcastProcessFunction)new StatefulOperator(this.list, this.union, this.broadcast)).uid(UID).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = streamEnv.getStreamGraph().getJobGraph();
        String savepoint = this.takeSavepoint(jobGraph);
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        this.verifyListState(savepoint, batchEnv);
        this.verifyUnionState(savepoint, batchEnv);
        this.verifyBroadcastState(savepoint, batchEnv);
    }

    abstract DataSet<Integer> readListState(ExistingSavepoint var1) throws IOException;

    abstract DataSet<Integer> readUnionState(ExistingSavepoint var1) throws IOException;

    abstract DataSet<Tuple2<Integer, String>> readBroadcastState(ExistingSavepoint var1) throws IOException;

    private void verifyListState(String path, ExecutionEnvironment batchEnv) throws Exception {
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)path, (StateBackend)new MemoryStateBackend());
        List listResult = this.readListState(savepoint).collect();
        listResult.sort(Comparator.naturalOrder());
        Assert.assertEquals((String)"Unexpected elements read from list state", (Object)SavepointSource.getElements(), (Object)listResult);
    }

    private void verifyUnionState(String path, ExecutionEnvironment batchEnv) throws Exception {
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)path, (StateBackend)new MemoryStateBackend());
        List unionResult = this.readUnionState(savepoint).collect();
        unionResult.sort(Comparator.naturalOrder());
        Assert.assertEquals((String)"Unexpected elements read from union state", (Object)SavepointSource.getElements(), (Object)unionResult);
    }

    private void verifyBroadcastState(String path, ExecutionEnvironment batchEnv) throws Exception {
        ExistingSavepoint savepoint = Savepoint.load((ExecutionEnvironment)batchEnv, (String)path, (StateBackend)new MemoryStateBackend());
        List broadcastResult = this.readBroadcastState(savepoint).collect();
        List broadcastStateKeys = broadcastResult.stream().map(entry -> (Integer)entry.f0).sorted(Comparator.naturalOrder()).collect(Collectors.toList());
        List broadcastStateValues = broadcastResult.stream().map(entry -> (String)entry.f1).sorted(Comparator.naturalOrder()).collect(Collectors.toList());
        Assert.assertEquals((String)"Unexpected element in broadcast state keys", (Object)SavepointSource.getElements(), broadcastStateKeys);
        Assert.assertEquals((String)"Unexpected element in broadcast state values", SavepointSource.getElements().stream().map(Object::toString).sorted().collect(Collectors.toList()), broadcastStateValues);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String takeSavepoint(JobGraph jobGraph) throws Exception {
        SavepointSource.initializeForTest();
        ClusterClient client = miniClusterResource.getClusterClient();
        client.setDetached(true);
        JobID jobId = jobGraph.getJobID();
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofMinutes(5L));
        String dirPath = this.getTempDirPath(new AbstractID().toHexString());
        try {
            client.setDetached(true);
            JobSubmissionResult result = client.submitJob(jobGraph, SavepointReaderITCase.class.getClassLoader());
            boolean finished = false;
            while (deadline.hasTimeLeft()) {
                if (SavepointSource.isFinished()) {
                    finished = true;
                    break;
                }
                try {
                    Thread.sleep(2L);
                }
                catch (InterruptedException ignored) {
                    Thread.currentThread().interrupt();
                }
            }
            if (!finished) {
                Assert.fail((String)"Failed to initialize state within deadline");
            }
            CompletableFuture path = client.triggerSavepoint(result.getJobID(), dirPath);
            String string = (String)path.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            return string;
        }
        finally {
            client.cancel(jobId);
        }
    }

    private static class StatefulOperator
    extends BroadcastProcessFunction<Integer, Integer, Void>
    implements CheckpointedFunction {
        private final ListStateDescriptor<Integer> list;
        private final ListStateDescriptor<Integer> union;
        private final MapStateDescriptor<Integer, String> broadcast;
        private List<Integer> elements;
        private ListState<Integer> listState;
        private ListState<Integer> unionState;

        private StatefulOperator(ListStateDescriptor<Integer> list, ListStateDescriptor<Integer> union, MapStateDescriptor<Integer, String> broadcast) {
            this.list = list;
            this.union = union;
            this.broadcast = broadcast;
        }

        public void open(Configuration parameters) {
            this.elements = new ArrayList<Integer>();
        }

        public void processElement(Integer value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<Void> out) {
            this.elements.add(value);
        }

        public void processBroadcastElement(Integer value, BroadcastProcessFunction.Context ctx, Collector<Void> out) throws Exception {
            ctx.getBroadcastState(this.broadcast).put((Object)value, (Object)value.toString());
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.listState.clear();
            this.listState.addAll(this.elements);
            this.unionState.clear();
            this.unionState.addAll(this.elements);
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.listState = context.getOperatorStateStore().getListState(this.list);
            this.unionState = context.getOperatorStateStore().getUnionListState(this.union);
        }
    }

    private static class SavepointSource
    implements SourceFunction<Integer> {
        private static volatile boolean finished;
        private volatile boolean running = true;
        private static final Integer[] elements;

        private SavepointSource() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) {
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                for (Integer element : elements) {
                    ctx.collect((Object)element);
                }
                finished = true;
            }
            while (this.running) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        public void cancel() {
            this.running = false;
        }

        private static void initializeForTest() {
            finished = false;
        }

        private static boolean isFinished() {
            return finished;
        }

        private static List<Integer> getElements() {
            return Arrays.asList(elements);
        }

        static {
            elements = new Integer[]{1, 2, 3};
        }
    }
}

