package org.apache.flink.state.api.utils;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;

/* loaded from: input_file:org/apache/flink/state/api/utils/SavepointTestBase.class */
public abstract class SavepointTestBase extends AbstractTestBase {
    public <T> String takeSavepoint(Collection<T> collection, Function<SourceFunction<T>, StreamExecutionEnvironment> function) throws Exception {
        StreamExecutionEnvironment.getExecutionEnvironment().getConfig().disableClosureCleaner();
        WaitingSource createSource = createSource(collection);
        JobGraph jobGraph = function.apply(createSource).getStreamGraph().getJobGraph();
        JobID jobID = jobGraph.getJobID();
        ClusterClient clusterClient = miniClusterResource.getClusterClient();
        try {
            try {
                JobExecutionResult submitJob = ClientUtils.submitJob(clusterClient, jobGraph);
                createSource.getClass();
                String str = (String) CompletableFuture.runAsync(createSource::awaitSource).thenCompose(r7 -> {
                    return triggerSavepoint(clusterClient, submitJob.getJobID());
                }).get(5L, TimeUnit.MINUTES);
                clusterClient.cancel(jobID);
                return str;
            } catch (Exception e) {
                throw new RuntimeException("Failed to take savepoint", e);
            }
        } catch (Throwable th) {
            clusterClient.cancel(jobID);
            throw th;
        }
    }

    private <T> WaitingSource<T> createSource(Collection<T> collection) throws Exception {
        T next = collection.iterator().next();
        if (next == null) {
            throw new IllegalArgumentException("Collection must not contain null elements");
        }
        TypeInformation forObject = TypeExtractor.getForObject(next);
        return new WaitingSource<>(new FromElementsFunction(forObject.createSerializer(new ExecutionConfig()), collection), forObject);
    }

    private CompletableFuture<String> triggerSavepoint(ClusterClient<?> clusterClient, JobID jobID) throws RuntimeException {
        try {
            return clusterClient.triggerSavepoint(jobID, getTempDirPath(new AbstractID().toHexString()));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
