package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingClusterClient;
import org.apache.flink.kubernetes.operator.TestingRestClient;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.TriFunction;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.class */
public class AbstractFlinkServiceTest {

    @TempDir
    Path tempDir;
    File testJar;
    private KubernetesClient client;
    private final Configuration configuration = new Configuration();
    private final FlinkConfigManager configManager = new FlinkConfigManager(this.configuration);
    private FlinkOperatorConfiguration operatorConfig;
    private ExecutorService executorService;
    private ArtifactManager artifactManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest$TestingService.class */
    public class TestingService extends AbstractFlinkService {
        RestClusterClient<String> clusterClient;
        RestClient restClient;
        List<ObjectMeta> deleted;
        Map<Tuple2<String, String>, PodList> jmPods;
        Map<Tuple2<String, String>, PodList> tmPods;

        TestingService(AbstractFlinkServiceTest abstractFlinkServiceTest, RestClusterClient<String> restClusterClient) {
            this(restClusterClient, null);
        }

        TestingService(RestClusterClient<String> restClusterClient, RestClient restClient) {
            super(AbstractFlinkServiceTest.this.client, AbstractFlinkServiceTest.this.artifactManager, AbstractFlinkServiceTest.this.executorService, AbstractFlinkServiceTest.this.operatorConfig);
            this.deleted = new ArrayList();
            this.jmPods = new HashMap();
            this.tmPods = new HashMap();
            this.clusterClient = restClusterClient;
            this.restClient = restClient;
        }

        public RestClusterClient<String> getClusterClient(Configuration configuration) {
            return this.clusterClient;
        }

        protected RestClient getRestClient(Configuration configuration) throws ConfigurationException {
            return this.restClient;
        }

        protected PodList getJmPodList(String str, String str2) {
            return this.jmPods.getOrDefault(Tuple2.of(str, str2), new PodList());
        }

        protected PodList getTmPodList(String str, String str2) {
            return this.tmPods.getOrDefault(Tuple2.of(str, str2), new PodList());
        }

        protected void deployApplicationCluster(JobSpec jobSpec, Configuration configuration) {
            throw new UnsupportedOperationException();
        }

        public void submitSessionCluster(Configuration configuration) {
            throw new UnsupportedOperationException();
        }

        public void cancelJob(FlinkDeployment flinkDeployment, UpgradeMode upgradeMode, Configuration configuration) {
            throw new UnsupportedOperationException();
        }

        public FlinkService.ScalingResult scale(FlinkResourceContext<?> flinkResourceContext, Configuration configuration) {
            throw new UnsupportedOperationException();
        }

        public boolean scalingCompleted(FlinkResourceContext<?> flinkResourceContext) {
            throw new UnsupportedOperationException();
        }

        protected void deleteClusterInternal(ObjectMeta objectMeta, Configuration configuration, boolean z, DeletionPropagation deletionPropagation) {
            this.deleted.add(objectMeta);
        }
    }

    @BeforeEach
    public void setup() {
        this.configuration.set(KubernetesConfigOptions.CLUSTER_ID, "test-cluster");
        this.configuration.set(KubernetesConfigOptions.NAMESPACE, "flink-operator-test");
        this.configuration.set(FlinkConfigBuilder.FLINK_VERSION, FlinkVersion.v1_18);
        this.operatorConfig = FlinkOperatorConfiguration.fromConfiguration(this.configuration);
        this.executorService = Executors.newDirectExecutorService();
        this.testJar = this.tempDir.resolve("test.jar").toFile();
        this.artifactManager = new ArtifactManager(this.configManager) { // from class: org.apache.flink.kubernetes.operator.service.AbstractFlinkServiceTest.1
            public File fetch(String str, Configuration configuration, String str2) throws IOException {
                Files.writeString(AbstractFlinkServiceTest.this.testJar.toPath(), "test", new OpenOption[0]);
                return AbstractFlinkServiceTest.this.testJar;
            }
        };
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void sessionJobSubmissionTest(FlinkVersion flinkVersion) throws Exception {
        ArrayList arrayList = new ArrayList();
        TestingService testingService = getTestingService((messageHeaders, messageParameters, requestBody) -> {
            if (requestBody instanceof JarRunRequestBody) {
                arrayList.add((JarRunRequestBody) requestBody);
                return CompletableFuture.completedFuture(null);
            }
            if (messageHeaders instanceof JarUploadHeaders) {
                return CompletableFuture.completedFuture(new JarUploadResponseBody("test"));
            }
            if (messageHeaders instanceof JarDeleteHeaders) {
                return CompletableFuture.completedFuture(null);
            }
            throw new UnsupportedOperationException("Unknown request");
        });
        FlinkDeployment buildSessionCluster = TestUtils.buildSessionCluster(flinkVersion);
        ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) buildSessionCluster.getSpec(), buildSessionCluster);
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        Configuration sessionJobConfig = this.configManager.getSessionJobConfig(buildSessionCluster, (FlinkSessionJobSpec) buildSessionJob.getSpec());
        testingService.submitJobToSessionCluster(buildSessionJob.getMetadata(), (FlinkSessionJobSpec) buildSessionJob.getSpec(), sessionJobConfig, null);
        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_16)) {
            Assertions.assertEquals(sessionJobConfig.toMap(), ((JarRunRequestBody) arrayList.get(0)).getFlinkConfiguration().toMap());
        } else {
            Assertions.assertTrue(((JarRunRequestBody) arrayList.get(0)).getFlinkConfiguration().toMap().isEmpty());
        }
    }

    @Test
    public void jarRunErrorHandlingTest() throws Exception {
        ArrayList arrayList = new ArrayList();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TestingService testingService = getTestingService((messageHeaders, messageParameters, requestBody) -> {
            if (requestBody instanceof JarRunRequestBody) {
                arrayList.add((JarRunRequestBody) requestBody);
                return CompletableFuture.failedFuture(new Exception("RunException"));
            }
            if (messageHeaders instanceof JarDeleteHeaders) {
                atomicBoolean.set(true);
                return CompletableFuture.failedFuture(new Exception("DeleteException"));
            }
            Assertions.fail();
            return null;
        });
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        JobID jobID = new JobID();
        Assertions.assertThrows(FlinkRuntimeException.class, () -> {
            testingService.runJar(((FlinkSessionJobSpec) buildSessionJob.getSpec()).getJob(), jobID, new JarUploadResponseBody("test"), this.configuration, null);
        });
        Assertions.assertEquals(jobID, ((JarRunRequestBody) arrayList.get(0)).getJobId());
        Assertions.assertTrue(atomicBoolean.get());
    }

    private TestingService getTestingService(TriFunction<MessageHeaders<?, ?, ?>, MessageParameters, RequestBody, CompletableFuture<ResponseBody>> triFunction) throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration);
        testingClusterClient.setRequestProcessor(triFunction);
        TestingRestClient testingRestClient = new TestingRestClient(this.configuration);
        testingRestClient.setRequestProcessor(triFunction);
        return new TestingService(testingClusterClient, testingRestClient);
    }

    @Test
    public void cancelJobWithStatelessUpgradeModeTest() throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        CompletableFuture completableFuture = new CompletableFuture();
        testingClusterClient.setCancelFunction(jobID -> {
            completableFuture.complete(jobID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        TestingService testingService = new TestingService(this, testingClusterClient);
        JobID generate = JobID.generate();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        JobStatus jobStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        jobStatus.setJobId(generate.toHexString());
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setState("RUNNING");
        testingService.cancelJob(buildApplicationCluster, UpgradeMode.STATELESS, this.configManager.getObserveConfig(buildApplicationCluster), false);
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertEquals(generate, completableFuture.get());
        Assertions.assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void cancelJobWithSavepointUpgradeModeTest(boolean z) throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        CompletableFuture completableFuture = new CompletableFuture();
        String str = "file:///path/of/svp-1";
        this.configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, "file:///path/of/svp-1");
        testingClusterClient.setStopWithSavepointFunction((jobID, bool, str2) -> {
            completableFuture.complete(new Tuple3(jobID, bool, str2));
            return CompletableFuture.completedFuture(str);
        });
        TestingService testingService = new TestingService(this, testingClusterClient);
        JobID generate = JobID.generate();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "file:///path/of/svp-1");
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        JobStatus jobStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        jobStatus.setJobId(generate.toHexString());
        jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        testingService.cancelJob(buildApplicationCluster, UpgradeMode.SAVEPOINT, this.configManager.getObserveConfig(buildApplicationCluster), z);
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertEquals(generate, ((Tuple3) completableFuture.get()).f0);
        Assertions.assertFalse(((Boolean) ((Tuple3) completableFuture.get()).f1).booleanValue());
        Assertions.assertEquals("file:///path/of/svp-1", ((Tuple3) completableFuture.get()).f2);
        Assertions.assertEquals("file:///path/of/svp-1", jobStatus.getSavepointInfo().getLastSavepoint().getLocation());
        Assertions.assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name());
        Assertions.assertEquals(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus(), z ? JobManagerDeploymentStatus.MISSING : JobManagerDeploymentStatus.READY);
        if (z) {
            Assertions.assertEquals(List.of(buildApplicationCluster.getMetadata()), testingService.deleted);
        } else {
            Assertions.assertTrue(testingService.deleted.isEmpty());
        }
    }

    @Test
    public void cancelJobWithLastStateUpgradeModeTest() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        TestingService testingService = new TestingService(this, new TestingClusterClient(this.configuration, "test-cluster"));
        JobID generate = JobID.generate();
        JobStatus jobStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        jobStatus.setJobId(generate.toHexString());
        testingService.cancelJob(buildApplicationCluster, UpgradeMode.LAST_STATE, this.configManager.getObserveConfig(buildApplicationCluster), false);
        Assertions.assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
    }

    @Test
    public void deletionPropagationTest() {
        final ArrayList arrayList = new ArrayList();
        new TestingService(null) { // from class: org.apache.flink.kubernetes.operator.service.AbstractFlinkServiceTest.2
            @Override // org.apache.flink.kubernetes.operator.service.AbstractFlinkServiceTest.TestingService
            protected void deleteClusterInternal(ObjectMeta objectMeta, Configuration configuration, boolean z, DeletionPropagation deletionPropagation) {
                arrayList.add(deletionPropagation);
            }
        }.deleteClusterDeployment(new ObjectMeta(), new FlinkDeploymentStatus(), this.configuration, true);
        Assertions.assertEquals(DeletionPropagation.FOREGROUND, arrayList.get(0));
        this.configuration.set(KubernetesOperatorConfigOptions.RESOURCE_DELETION_PROPAGATION, DeletionPropagation.BACKGROUND);
        this.operatorConfig = FlinkOperatorConfiguration.fromConfiguration(this.configuration);
        new TestingService(null) { // from class: org.apache.flink.kubernetes.operator.service.AbstractFlinkServiceTest.3
            @Override // org.apache.flink.kubernetes.operator.service.AbstractFlinkServiceTest.TestingService
            protected void deleteClusterInternal(ObjectMeta objectMeta, Configuration configuration, boolean z, DeletionPropagation deletionPropagation) {
                arrayList.add(deletionPropagation);
            }
        }.deleteClusterDeployment(new ObjectMeta(), new FlinkDeploymentStatus(), this.configuration, true);
        Assertions.assertEquals(DeletionPropagation.BACKGROUND, arrayList.get(1));
    }

    @Test
    public void triggerSavepointTest() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, "file:///path/of/svp");
        TestingService testingService = getTestingService((messageHeaders, messageParameters, requestBody) -> {
            completableFuture.complete(new Tuple3((JobID) ((SavepointTriggerMessageParameters) messageParameters).jobID.getValue(), (String) ((SavepointTriggerRequestBody) requestBody).getTargetDirectory().get(), Boolean.valueOf(((SavepointTriggerRequestBody) requestBody).isCancelJob())));
            return CompletableFuture.completedFuture(new TriggerResponse(new TriggerId()));
        });
        JobID generate = JobID.generate();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        JobStatus jobStatus = new JobStatus();
        jobStatus.setJobId(generate.toString());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobStatus(jobStatus);
        testingService.triggerSavepoint(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId(), SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo(), this.configuration);
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertEquals(generate, ((Tuple3) completableFuture.get()).f0);
        Assertions.assertEquals("file:///path/of/svp", ((Tuple3) completableFuture.get()).f1);
        Assertions.assertFalse(((Boolean) ((Tuple3) completableFuture.get()).f2).booleanValue());
    }

    @Test
    public void disposeSavepointTest() throws Exception {
        String str = "file:///path/of/svp";
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, "file:///path/of/svp");
        getTestingService((messageHeaders, messageParameters, requestBody) -> {
            if (!(requestBody instanceof SavepointDisposalRequest)) {
                Assertions.fail("unknown request");
                return null;
            }
            Assertions.assertEquals(str, ((SavepointDisposalRequest) requestBody).getSavepointPath());
            atomicBoolean.set(true);
            return CompletableFuture.completedFuture(null);
        }).disposeSavepoint("file:///path/of/svp", this.configuration);
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    public void nativeSavepointFormatTest() throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        CompletableFuture completableFuture = new CompletableFuture();
        this.configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, "file:///path/of/svp");
        testingClusterClient.setRequestProcessor((messageHeaders, messageParameters, requestBody) -> {
            completableFuture.complete(new Tuple4((JobID) ((SavepointTriggerMessageParameters) messageParameters).jobID.getValue(), (String) ((SavepointTriggerRequestBody) requestBody).getTargetDirectory().get(), Boolean.valueOf(((SavepointTriggerRequestBody) requestBody).isCancelJob()), ((SavepointTriggerRequestBody) requestBody).getFormatType()));
            return CompletableFuture.completedFuture(new TriggerResponse(new TriggerId()));
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        testingClusterClient.setStopWithSavepointFormat((jobID, savepointFormatType, str) -> {
            completableFuture2.complete(new Tuple3(jobID, savepointFormatType, str));
            return CompletableFuture.completedFuture("file:///path/of/svp");
        });
        TestingService testingService = new TestingService(this, testingClusterClient);
        JobID generate = JobID.generate();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "file:///path/of/svp");
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        JobStatus jobStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        jobStatus.setJobId(generate.toHexString());
        jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        jobStatus.setJobId(generate.toString());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobStatus(jobStatus);
        testingService.triggerSavepoint(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId(), SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo(), new Configuration(this.configuration).set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE, SavepointFormatType.NATIVE));
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertEquals(generate, ((Tuple4) completableFuture.get()).f0);
        Assertions.assertEquals("file:///path/of/svp", ((Tuple4) completableFuture.get()).f1);
        Assertions.assertFalse(((Boolean) ((Tuple4) completableFuture.get()).f2).booleanValue());
        Assertions.assertEquals(SavepointFormatType.NATIVE, ((Tuple4) completableFuture.get()).f3);
        testingService.cancelJob(buildApplicationCluster, UpgradeMode.SAVEPOINT, new Configuration(this.configManager.getObserveConfig(buildApplicationCluster)).set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE, SavepointFormatType.NATIVE), false);
        Assertions.assertTrue(completableFuture2.isDone());
        Assertions.assertEquals(generate, ((Tuple3) completableFuture2.get()).f0);
        Assertions.assertEquals(SavepointFormatType.NATIVE, ((Tuple3) completableFuture2.get()).f1);
        Assertions.assertEquals("file:///path/of/svp", ((Tuple3) completableFuture2.get()).f2);
    }

    @Test
    public void getLastCheckpointTest() throws Exception {
        ObjectMapper strictObjectMapper = RestMapperUtils.getStrictObjectMapper();
        ArrayList arrayList = new ArrayList();
        TestingService testingService = getTestingService((messageHeaders, messageParameters, requestBody) -> {
            if (messageHeaders instanceof CustomCheckpointingStatisticsHeaders) {
                return CompletableFuture.completedFuture((ResponseBody) arrayList.get(0));
            }
            Assertions.fail("unknown request");
            return null;
        });
        arrayList.add((CheckpointHistoryWrapper) strictObjectMapper.readValue("{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":117,\"p999\":117},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0},\"processed_data\":{\"min\":0,\"max\":1274,\"avg\":280,\"p50\":112,\"p90\":840,\"p95\":1071,\"p99\":1274,\"p999\":1274},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0}},\"latest\":{\"completed\":{\"className\":\"completed\",\"id\":96,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212837604,\"latest_ack_timestamp\":1653212837621,\"checkpointed_size\":28437,\"state_size\":28437,\"end_to_end_duration\":17,\"alignment_buffered\":0,\"processed_data\":560,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96\",\"discarded\":false},\"savepoint\":{\"className\":\"completed\",\"id\":51,\"status\":\"COMPLETED\",\"is_savepoint\":true,\"trigger_timestamp\":1653212748176,\"latest_ack_timestamp\":1653212748233,\"checkpointed_size\":53670,\"state_size\":53670,\"end_to_end_duration\":57,\"alignment_buffered\":0,\"processed_data\":483,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"SAVEPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/savepoints/savepoint-000000-e8ea2482ce4f\",\"discarded\":false},\"failed\":null,\"restored\":{\"id\":27,\"restore_timestamp\":1653212683022,\"is_savepoint\":true,\"external_path\":\"file:/flink-data/savepoints/savepoint-000000-5930e5326ca7\"}},\"history\":[{\"className\":\"completed\",\"id\":96,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212837604,\"latest_ack_timestamp\":1653212837621,\"checkpointed_size\":28437,\"state_size\":28437,\"end_to_end_duration\":17,\"alignment_buffered\":0,\"processed_data\":560,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96\",\"discarded\":false},{\"className\":\"completed\",\"id\":95,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212835603,\"latest_ack_timestamp\":1653212835622,\"checkpointed_size\":28473,\"state_size\":28473,\"end_to_end_duration\":19,\"alignment_buffered\":0,\"processed_data\":42,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-95\",\"discarded\":true},{\"className\":\"completed\",\"id\":94,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212833603,\"latest_ack_timestamp\":1653212833623,\"checkpointed_size\":27969,\"state_size\":27969,\"end_to_end_duration\":20,\"alignment_buffered\":0,\"processed_data\":28,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-94\",\"discarded\":true},{\"className\":\"completed\",\"id\":93,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212831603,\"latest_ack_timestamp\":1653212831621,\"checkpointed_size\":28113,\"state_size\":28113,\"end_to_end_duration\":18,\"alignment_buffered\":0,\"processed_data\":138,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-93\",\"discarded\":true},{\"className\":\"completed\",\"id\":92,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212829603,\"latest_ack_timestamp\":1653212829621,\"checkpointed_size\":28293,\"state_size\":28293,\"end_to_end_duration\":18,\"alignment_buffered\":0,\"processed_data\":196,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-92\",\"discarded\":true},{\"className\":\"completed\",\"id\":91,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212827603,\"latest_ack_timestamp\":1653212827629,\"checkpointed_size\":27969,\"state_size\":27969,\"end_to_end_duration\":26,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-91\",\"discarded\":true},{\"className\":\"completed\",\"id\":90,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212825603,\"latest_ack_timestamp\":1653212825641,\"checkpointed_size\":27735,\"state_size\":27735,\"end_to_end_duration\":38,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-90\",\"discarded\":true},{\"className\":\"completed\",\"id\":89,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212823603,\"latest_ack_timestamp\":1653212823618,\"checkpointed_size\":28545,\"state_size\":28545,\"end_to_end_duration\":15,\"alignment_buffered\":0,\"processed_data\":364,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-89\",\"discarded\":true},{\"className\":\"completed\",\"id\":88,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212821603,\"latest_ack_timestamp\":1653212821619,\"checkpointed_size\":28275,\"state_size\":28275,\"end_to_end_duration\":16,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-88\",\"discarded\":true},{\"className\":\"completed\",\"id\":87,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212819604,\"latest_ack_timestamp\":1653212819622,\"checkpointed_size\":28518,\"state_size\":28518,\"end_to_end_duration\":18,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-87\",\"discarded\":true}]}", CheckpointHistoryWrapper.class));
        Assertions.assertEquals("file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96", ((Savepoint) testingService.getLastCheckpoint(new JobID(), new Configuration()).get()).getLocation());
        arrayList.set(0, (CheckpointHistoryWrapper) strictObjectMapper.readValue("{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":117,\"p999\":117},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0},\"processed_data\":{\"min\":0,\"max\":1274,\"avg\":280,\"p50\":112,\"p90\":840,\"p95\":1071,\"p99\":1274,\"p999\":1274},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0}},\"latest\":{\"completed\":null,\"savepoint\":null,\"failed\":null,\"restored\":{\"id\":27,\"restore_timestamp\":1653212683022,\"is_savepoint\":true,\"external_path\":\"file:/flink-data/savepoints/savepoint-000000-5930e5326ca7\"}},\"history\":[]}", CheckpointHistoryWrapper.class));
        Assertions.assertEquals("file:/flink-data/savepoints/savepoint-000000-5930e5326ca7", ((Savepoint) testingService.getLastCheckpoint(new JobID(), new Configuration()).get()).getLocation());
        arrayList.set(0, (CheckpointHistoryWrapper) strictObjectMapper.readValue("{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":117,\"p999\":117},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0},\"processed_data\":{\"min\":0,\"max\":1274,\"avg\":280,\"p50\":112,\"p90\":840,\"p95\":1071,\"p99\":1274,\"p999\":1274},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0}},\"latest\":{\"completed\":null,\"savepoint\":null,\"failed\":null,\"restored\":{\"id\":27,\"restore_timestamp\":1653212683022,\"is_savepoint\":true,\"external_path\":\"<checkpoint-not-externally-addressable>\"}},\"history\":[]}", CheckpointHistoryWrapper.class));
        try {
            testingService.getLastCheckpoint(new JobID(), new Configuration());
            Assertions.fail();
        } catch (RecoveryFailureException e) {
        }
    }

    @Test
    public void fetchSavepointInfoTest() throws Exception {
        TriggerId triggerId = new TriggerId();
        JobID jobID = new JobID();
        AtomicReference atomicReference = new AtomicReference();
        TestingService testingService = getTestingService((messageHeaders, messageParameters, requestBody) -> {
            if (!(messageParameters instanceof SavepointStatusMessageParameters)) {
                Assertions.fail("unknown request");
                return null;
            }
            SavepointStatusMessageParameters savepointStatusMessageParameters = (SavepointStatusMessageParameters) messageParameters;
            Assertions.assertEquals(jobID, savepointStatusMessageParameters.jobIdPathParameter.getValue());
            Assertions.assertEquals(triggerId, savepointStatusMessageParameters.triggerIdPathParameter.getValue());
            return atomicReference.get() == null ? CompletableFuture.failedFuture(new Exception("fail")) : CompletableFuture.completedFuture((ResponseBody) atomicReference.get());
        });
        atomicReference.set(AsynchronousOperationResult.completed(new SavepointInfo("l", (SerializedThrowable) null)));
        Assertions.assertEquals(SavepointFetchResult.completed("l"), testingService.fetchSavepointInfo(triggerId.toString(), jobID.toString(), this.configuration));
        atomicReference.set(AsynchronousOperationResult.inProgress());
        Assertions.assertEquals(SavepointFetchResult.pending(), testingService.fetchSavepointInfo(triggerId.toString(), jobID.toString(), this.configuration));
        atomicReference.set(AsynchronousOperationResult.completed(new SavepointInfo((String) null, new SerializedThrowable(new Exception("testErr")))));
        Assertions.assertTrue(testingService.fetchSavepointInfo(triggerId.toString(), jobID.toString(), this.configuration).getError().contains("testErr"));
        atomicReference.set(null);
        Assertions.assertTrue(testingService.fetchSavepointInfo(triggerId.toString(), jobID.toString(), this.configuration).getError().contains("fail"));
    }

    @Test
    public void removeOperatorConfigTest() {
        Assertions.assertFalse(AbstractFlinkService.removeOperatorConfigs(Configuration.fromMap(Map.of("kubernetes.operator.meyKey", "v"))).containsKey("kubernetes.operator.meyKey"));
    }

    @Test
    public void getMetricsTest() throws Exception {
        JobID jobID = new JobID();
        List of = List.of("m1", "m2");
        Assertions.assertEquals(Map.of("m1", "m1", "m2", "m2"), getTestingService((messageHeaders, messageParameters, requestBody) -> {
            if (!(messageParameters instanceof JobMetricsMessageParameters)) {
                Assertions.fail("unknown request");
                return null;
            }
            JobMetricsMessageParameters jobMetricsMessageParameters = (JobMetricsMessageParameters) messageParameters;
            Assertions.assertEquals(jobID, jobMetricsMessageParameters.jobPathParameter.getValue());
            return CompletableFuture.completedFuture(new MetricCollectionResponseBody((List) ((List) jobMetricsMessageParameters.metricsFilterParameter.getValue()).stream().map(str -> {
                return new Metric(str, str);
            }).collect(Collectors.toList())));
        }).getMetrics(this.configuration, jobID.toHexString(), of));
    }

    @Test
    public void getClusterInfoTest() throws Exception {
        CustomDashboardConfiguration customDashboardConfiguration = new CustomDashboardConfiguration();
        customDashboardConfiguration.setFlinkVersion("testVersion");
        customDashboardConfiguration.setFlinkRevision("testRevision");
        TaskManagersInfo taskManagersInfo = new TaskManagersInfo(List.of(new TaskManagerInfo(ResourceID.generate(), "", 0, 0, 0L, 0, 0, ResourceProfile.UNKNOWN, ResourceProfile.UNKNOWN, new HardwareDescription(1, 0L, 0L, 0L), new TaskExecutorMemoryConfiguration(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), (Boolean) null)));
        TestingService testingService = getTestingService((messageHeaders, messageParameters, requestBody) -> {
            if (messageHeaders instanceof CustomDashboardConfigurationHeaders) {
                return CompletableFuture.completedFuture(customDashboardConfiguration);
            }
            if (messageHeaders instanceof TaskManagersHeaders) {
                return CompletableFuture.completedFuture(taskManagersInfo);
            }
            Assertions.fail("unknown request");
            return null;
        });
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1000L));
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1000L));
        Assertions.assertEquals(Map.of("flink-version", "testVersion", "flink-revision", "testRevision", "total-cpu", "2.0", "total-memory", (MemorySize.ofMebiBytes(1000L).getBytes() * 2)), testingService.getClusterInfo(configuration));
    }

    @Test
    public void effectiveStatusTest() {
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.RUNNING, AbstractFlinkService.getEffectiveStatus(getJobDetails(org.apache.flink.api.common.JobStatus.RUNNING, Tuple2.of(ExecutionState.RUNNING, 4))));
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.RUNNING, AbstractFlinkService.getEffectiveStatus(getJobDetails(org.apache.flink.api.common.JobStatus.RUNNING, Tuple2.of(ExecutionState.RUNNING, 2), Tuple2.of(ExecutionState.FINISHED, 2))));
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.CREATED, AbstractFlinkService.getEffectiveStatus(getJobDetails(org.apache.flink.api.common.JobStatus.RUNNING, Tuple2.of(ExecutionState.RUNNING, 2), Tuple2.of(ExecutionState.SCHEDULED, 2))));
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.FINISHED, AbstractFlinkService.getEffectiveStatus(getJobDetails(org.apache.flink.api.common.JobStatus.FINISHED, Tuple2.of(ExecutionState.FINISHED, 4))));
    }

    private JobDetails getJobDetails(org.apache.flink.api.common.JobStatus jobStatus, Tuple2<ExecutionState, Integer>... tuple2Arr) {
        int[] iArr = new int[ExecutionState.values().length];
        for (Tuple2<ExecutionState, Integer> tuple2 : tuple2Arr) {
            iArr[((ExecutionState) tuple2.f0).ordinal()] = ((Integer) tuple2.f1).intValue();
        }
        return new JobDetails(new JobID(), "test-job", System.currentTimeMillis(), -1L, 0L, jobStatus, System.currentTimeMillis(), iArr, Arrays.stream(iArr).sum());
    }

    @Test
    public void isJobManagerReadyTest() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        TestingService testingService = new TestingService(this, new TestingClusterClient<String>(this.configuration) { // from class: org.apache.flink.kubernetes.operator.service.AbstractFlinkServiceTest.4
            @Override // org.apache.flink.kubernetes.operator.TestingClusterClient
            public String getWebInterfaceURL() {
                return (String) atomicReference.get();
            }
        });
        Assertions.assertThrows(FlinkRuntimeException.class, () -> {
            testingService.isJobManagerPortReady(this.configuration);
        });
        atomicReference.set("http://127.0.0.1:" + 6868);
        Assertions.assertFalse(testingService.isJobManagerPortReady(this.configuration));
        ServerSocket serverSocket = new ServerSocket(6868);
        try {
            Assertions.assertTrue(testingService.isJobManagerPortReady(this.configuration));
            serverSocket.close();
        } catch (Throwable th) {
            try {
                serverSocket.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
