/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api.utils;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.state.api.utils.WaitingSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;

public abstract class SavepointTestBase
extends AbstractTestBase {
    public <T> String takeSavepoint(T[] data, Function<SourceFunction<T>, StreamExecutionEnvironment> jobGraphFactory) throws Exception {
        return this.takeSavepoint(Arrays.asList(data), jobGraphFactory);
    }

    public <T> String takeSavepoint(Collection<T> data, Function<SourceFunction<T>, StreamExecutionEnvironment> jobGraphFactory) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableClosureCleaner();
        WaitingSource<T> waitingSource = this.createSource(data);
        JobGraph jobGraph = jobGraphFactory.apply((SourceFunction<WaitingSource<T>>)waitingSource).getStreamGraph().getJobGraph();
        JobID jobId = jobGraph.getJobID();
        ClusterClient client = miniClusterResource.getClusterClient();
        try {
            JobID jobID = (JobID)client.submitJob(jobGraph).get();
            String string = (String)((CompletableFuture)CompletableFuture.runAsync(waitingSource::awaitSource).thenCompose(ignore -> this.triggerSavepoint(client, jobID))).get(5L, TimeUnit.MINUTES);
            return string;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to take savepoint", e);
        }
        finally {
            client.cancel(jobId);
        }
    }

    private <T> WaitingSource<T> createSource(Collection<T> data) throws Exception {
        T first = data.iterator().next();
        if (first == null) {
            throw new IllegalArgumentException("Collection must not contain null elements");
        }
        TypeInformation typeInfo = TypeExtractor.getForObject(first);
        FromElementsFunction inner = new FromElementsFunction(typeInfo.createSerializer(new ExecutionConfig()), data);
        return new WaitingSource(inner, typeInfo);
    }

    private CompletableFuture<String> triggerSavepoint(ClusterClient<?> client, JobID jobID) throws RuntimeException {
        try {
            String dirPath = this.getTempDirPath(new AbstractID().toHexString());
            return client.triggerSavepoint(jobID, dirPath);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

