package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingClusterClient;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext;
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventCollector;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
import org.apache.flink.util.concurrent.Executors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.class */
public class NativeFlinkServiceTest {
    KubernetesClient client;
    private final Configuration configuration = new Configuration();
    private final FlinkConfigManager configManager = new FlinkConfigManager(this.configuration);
    private final EventCollector eventCollector = new EventCollector();
    private EventRecorder eventRecorder;
    private FlinkOperatorConfiguration operatorConfig;
    private ExecutorService executorService;

    /* loaded from: input_file:org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest$TestingNativeFlinkService.class */
    class TestingNativeFlinkService extends NativeFlinkService {
        private Configuration runtimeConfig;

        public TestingNativeFlinkService(NativeFlinkService nativeFlinkService) {
            super(nativeFlinkService.kubernetesClient, nativeFlinkService.artifactManager, nativeFlinkService.executorService, NativeFlinkServiceTest.this.operatorConfig, NativeFlinkServiceTest.this.eventRecorder);
        }

        protected void deployApplicationCluster(JobSpec jobSpec, Configuration configuration) {
            this.runtimeConfig = configuration;
        }

        protected void submitClusterInternal(Configuration configuration) throws Exception {
            this.runtimeConfig = configuration;
        }

        public Configuration getRuntimeConfig() {
            return this.runtimeConfig;
        }
    }

    @BeforeEach
    public void setup() {
        this.configuration.set(KubernetesConfigOptions.CLUSTER_ID, "test-cluster");
        this.configuration.set(KubernetesConfigOptions.NAMESPACE, "flink-operator-test");
        this.configuration.set(FlinkConfigBuilder.FLINK_VERSION, FlinkVersion.v1_15);
        this.eventRecorder = new EventRecorder(this.client, this.eventCollector);
        this.operatorConfig = FlinkOperatorConfiguration.fromConfiguration(this.configuration);
        this.executorService = Executors.newDirectExecutorService();
    }

    @Test
    public void testDeleteClusterInternal() {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        AbstractFlinkService createFlinkService = createFlinkService(null);
        this.client.resource(((DeploymentBuilder) ((DeploymentBuilder) new DeploymentBuilder().withNewMetadata().withName("test-cluster").withNamespace("flink-operator-test").endMetadata()).withNewSpec().endSpec()).build()).create();
        Assertions.assertNotNull(((RollableScalableResource) ((NonNamespaceOperation) this.client.apps().deployments().inNamespace("flink-operator-test")).withName("test-cluster")).get());
        createFlinkService.deleteClusterInternal(buildApplicationCluster.getMetadata(), this.configManager.getObserveConfig(buildApplicationCluster), false, DeletionPropagation.FOREGROUND);
        Assertions.assertNull(((RollableScalableResource) ((NonNamespaceOperation) this.client.apps().deployments().inNamespace("flink-operator-test")).withName("test-cluster")).get());
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.flink.kubernetes.operator.service.NativeFlinkServiceTest$1] */
    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testDeleteOnSavepointBefore1_15(final FlinkVersion flinkVersion) throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        new NativeFlinkService(this.client, null, this.executorService, this.operatorConfig, this.eventRecorder) { // from class: org.apache.flink.kubernetes.operator.service.NativeFlinkServiceTest.1
            protected void cancelJob(FlinkDeployment flinkDeployment, UpgradeMode upgradeMode, Configuration configuration, boolean z) {
                Assertions.assertEquals(Boolean.valueOf(!flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)), Boolean.valueOf(z));
                atomicBoolean.set(true);
            }
        }.cancelJob(TestUtils.buildApplicationCluster(flinkVersion), UpgradeMode.SAVEPOINT, new Configuration());
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testSubmitApplicationClusterConfigRemoval() throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        Configuration createOperatorConfig = createOperatorConfig();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        TestingNativeFlinkService testingNativeFlinkService = new TestingNativeFlinkService(createFlinkService(testingClusterClient));
        testingNativeFlinkService.submitApplicationCluster(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob(), createOperatorConfig, false);
        Assertions.assertFalse(testingNativeFlinkService.getRuntimeConfig().containsKey(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT.key()));
    }

    @Test
    public void testSubmitSessionClusterConfigRemoval() throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        Configuration createOperatorConfig = createOperatorConfig();
        TestingNativeFlinkService testingNativeFlinkService = new TestingNativeFlinkService(createFlinkService(testingClusterClient));
        testingNativeFlinkService.submitSessionCluster(createOperatorConfig);
        Assertions.assertFalse(testingNativeFlinkService.getRuntimeConfig().containsKey(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT.key()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v4, types: [org.apache.flink.kubernetes.operator.service.NativeFlinkService, org.apache.flink.kubernetes.operator.service.NativeFlinkServiceTest$2] */
    @Test
    public void testScaling() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        ?? r0 = new NativeFlinkService(this.client, null, this.executorService, this.operatorConfig, this.eventRecorder) { // from class: org.apache.flink.kubernetes.operator.service.NativeFlinkServiceTest.2
            protected Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(RestClusterClient<String> restClusterClient, AbstractFlinkResource<?, ?> abstractFlinkResource) {
                return (Map) atomicReference.get();
            }

            protected void updateVertexResources(RestClusterClient<String> restClusterClient, AbstractFlinkResource<?, ?> abstractFlinkResource, Map<JobVertexID, JobVertexResourceRequirements> map) {
                atomicReference2.set(map);
            }
        };
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) buildApplicationCluster.getSpec();
        flinkDeploymentSpec.setFlinkVersion(FlinkVersion.v1_18);
        Configuration fromMap = Configuration.fromMap(flinkDeploymentSpec.getFlinkConfiguration());
        fromMap.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
        flinkDeploymentSpec.setFlinkConfiguration(fromMap.toMap());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, buildApplicationCluster);
        fromMap.set(PipelineOptions.PARALLELISM_OVERRIDES, Map.of(jobVertexID.toHexString(), "4"));
        flinkDeploymentSpec.setFlinkConfiguration(fromMap.toMap());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setState("RUNNING");
        atomicReference.set(Map.of(jobVertexID, new JobVertexResourceRequirements(new JobVertexResourceRequirements.Parallelism(1, 1)), jobVertexID2, new JobVertexResourceRequirements(new JobVertexResourceRequirements.Parallelism(2, 2))));
        Assertions.assertEquals(FlinkService.ScalingResult.SCALING_TRIGGERED, r0.scale(new FlinkDeploymentContext(buildApplicationCluster, TestUtils.createEmptyContext(), (KubernetesResourceMetricGroup) null, this.configManager, flinkResourceContext -> {
            return r0;
        }), this.configManager.getDeployConfig(buildApplicationCluster.getMetadata(), (FlinkDeploymentSpec) buildApplicationCluster.getSpec())));
        Assertions.assertEquals(Map.of(jobVertexID, new JobVertexResourceRequirements(new JobVertexResourceRequirements.Parallelism(4, 4)), jobVertexID2, new JobVertexResourceRequirements(new JobVertexResourceRequirements.Parallelism(2, 2))), atomicReference2.get());
        testScaleConditionDep(buildApplicationCluster, r0, flinkDeployment -> {
        }, FlinkService.ScalingResult.SCALING_TRIGGERED);
        testScaleConditionLastSpec(buildApplicationCluster, r0, flinkDeploymentSpec2 -> {
        }, FlinkService.ScalingResult.SCALING_TRIGGERED);
        testScaleConditionDep(buildApplicationCluster, r0, flinkDeployment2 -> {
            ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.JOB_UPGRADE_INPLACE_SCALING_ENABLED.key(), "false");
        }, FlinkService.ScalingResult.CANNOT_SCALE);
        testScaleConditionLastSpec(buildApplicationCluster, r0, flinkDeploymentSpec3 -> {
            flinkDeploymentSpec3.getFlinkConfiguration().put(JobManagerOptions.SCHEDULER.key(), JobManagerOptions.SchedulerType.Default.name());
        }, FlinkService.ScalingResult.CANNOT_SCALE);
        testScaleConditionLastSpec(buildApplicationCluster, r0, flinkDeploymentSpec4 -> {
            flinkDeploymentSpec4.setFlinkVersion(FlinkVersion.v1_17);
        }, FlinkService.ScalingResult.CANNOT_SCALE);
        testScaleConditionLastSpec(buildApplicationCluster, r0, flinkDeploymentSpec5 -> {
            flinkDeploymentSpec5.setFlinkVersion(FlinkVersion.v1_18);
        }, FlinkService.ScalingResult.SCALING_TRIGGERED);
        testScaleConditionDep(buildApplicationCluster, r0, flinkDeployment3 -> {
            ((FlinkDeploymentStatus) flinkDeployment3.getStatus()).getJobStatus().setState("FAILED");
        }, FlinkService.ScalingResult.CANNOT_SCALE);
        testScaleConditionDep(buildApplicationCluster, r0, flinkDeployment4 -> {
            ((FlinkDeploymentStatus) flinkDeployment4.getStatus()).getJobStatus().setState("RECONCILING");
        }, FlinkService.ScalingResult.CANNOT_SCALE);
        testScaleConditionDep(buildApplicationCluster, r0, flinkDeployment5 -> {
            ((FlinkDeploymentStatus) flinkDeployment5.getStatus()).getJobStatus().setState("RUNNING");
        }, FlinkService.ScalingResult.SCALING_TRIGGERED);
        testScaleConditionDep(buildApplicationCluster, r0, flinkDeployment6 -> {
            ((FlinkDeploymentSpec) flinkDeployment6.getSpec()).setJob((JobSpec) null);
        }, FlinkService.ScalingResult.CANNOT_SCALE);
        testScaleConditionLastSpec(buildApplicationCluster, r0, flinkDeploymentSpec6 -> {
            flinkDeploymentSpec6.getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID2 + ":3");
        }, FlinkService.ScalingResult.CANNOT_SCALE);
        testScaleConditionLastSpec(buildApplicationCluster, r0, flinkDeploymentSpec7 -> {
            flinkDeploymentSpec7.getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID + ":1," + new JobVertexID() + ":5");
        }, FlinkService.ScalingResult.SCALING_TRIGGERED);
        FlinkDeployment flinkDeployment7 = (FlinkDeployment) ReconciliationUtils.clone(buildApplicationCluster);
        ((FlinkDeploymentSpec) flinkDeployment7.getSpec()).getFlinkConfiguration().remove(PipelineOptions.PARALLELISM_OVERRIDES.key());
        testScaleConditionLastSpec(flinkDeployment7, r0, flinkDeploymentSpec8 -> {
            flinkDeploymentSpec8.getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID2 + ":3");
        }, FlinkService.ScalingResult.CANNOT_SCALE);
        testScaleConditionDep(flinkDeployment7, r0, flinkDeployment8 -> {
            ((FlinkDeploymentSpec) flinkDeployment8.getSpec()).getFlinkConfiguration().remove(PipelineOptions.PARALLELISM_OVERRIDES.key());
        }, FlinkService.ScalingResult.CANNOT_SCALE);
        atomicReference.set(Map.of(jobVertexID, new JobVertexResourceRequirements(new JobVertexResourceRequirements.Parallelism(1, 1))));
        atomicReference2.set(null);
        testScaleConditionDep(buildApplicationCluster, r0, flinkDeployment9 -> {
            ((FlinkDeploymentSpec) flinkDeployment9.getSpec()).getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID2 + ":5");
        }, FlinkService.ScalingResult.ALREADY_SCALED);
        Assertions.assertNull(atomicReference2.get());
        testScaleConditionDep(buildApplicationCluster, r0, flinkDeployment10 -> {
            ((FlinkDeploymentSpec) flinkDeployment10.getSpec()).getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID2 + ":5," + jobVertexID + ":1");
        }, FlinkService.ScalingResult.ALREADY_SCALED);
        Assertions.assertNull(atomicReference2.get());
        atomicReference.set(Map.of(jobVertexID, new JobVertexResourceRequirements(new JobVertexResourceRequirements.Parallelism(1, 2))));
        testScaleConditionDep(buildApplicationCluster, r0, flinkDeployment11 -> {
            ((FlinkDeploymentSpec) flinkDeployment11.getSpec()).getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID2 + ":5," + jobVertexID + ":1");
        }, FlinkService.ScalingResult.SCALING_TRIGGERED);
        Assertions.assertEquals(new JobVertexResourceRequirements.Parallelism(1, 1), ((JobVertexResourceRequirements) ((Map) atomicReference2.get()).get(jobVertexID)).getParallelism());
        atomicReference.set(null);
        testScaleConditionDep(buildApplicationCluster, r0, flinkDeployment12 -> {
        }, FlinkService.ScalingResult.CANNOT_SCALE);
    }

    private void testScaleConditionDep(FlinkDeployment flinkDeployment, NativeFlinkService nativeFlinkService, Consumer<FlinkDeployment> consumer, FlinkService.ScalingResult scalingResult) throws Exception {
        FlinkDeployment flinkDeployment2 = (FlinkDeployment) ReconciliationUtils.clone(flinkDeployment);
        consumer.accept(flinkDeployment2);
        Assertions.assertEquals(scalingResult, nativeFlinkService.scale(new FlinkDeploymentContext(flinkDeployment2, TestUtils.createEmptyContext(), (KubernetesResourceMetricGroup) null, this.configManager, flinkResourceContext -> {
            return nativeFlinkService;
        }), this.configManager.getDeployConfig(flinkDeployment2.getMetadata(), (FlinkDeploymentSpec) flinkDeployment2.getSpec())));
    }

    private void testScaleConditionLastSpec(FlinkDeployment flinkDeployment, NativeFlinkService nativeFlinkService, Consumer<FlinkDeploymentSpec> consumer, FlinkService.ScalingResult scalingResult) throws Exception {
        testScaleConditionDep(flinkDeployment, nativeFlinkService, flinkDeployment2 -> {
            FlinkDeploymentReconciliationStatus reconciliationStatus = ((FlinkDeploymentStatus) flinkDeployment2.getStatus()).getReconciliationStatus();
            FlinkDeploymentSpec deserializeLastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
            consumer.accept(deserializeLastReconciledSpec);
            reconciliationStatus.serializeAndSetLastReconciledSpec(deserializeLastReconciledSpec, flinkDeployment2);
        }, scalingResult);
    }

    @Test
    public void testScalingCompleted() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        NativeFlinkService nativeFlinkService = (NativeFlinkService) createFlinkService(testingClusterClient);
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) buildApplicationCluster.getSpec();
        flinkDeploymentSpec.setFlinkVersion(FlinkVersion.v1_18);
        Configuration fromMap = Configuration.fromMap(flinkDeploymentSpec.getFlinkConfiguration());
        fromMap.set(PipelineOptions.PARALLELISM_OVERRIDES, Map.of(jobVertexID.toHexString(), "4", jobVertexID2.toHexString(), "1"));
        FlinkDeploymentReconciliationStatus reconciliationStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus();
        flinkDeploymentSpec.setFlinkConfiguration(fromMap.toMap());
        reconciliationStatus.serializeAndSetLastReconciledSpec(flinkDeploymentSpec, buildApplicationCluster);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setJobId(new JobID().toHexString());
        FlinkDeploymentContext flinkDeploymentContext = new FlinkDeploymentContext(buildApplicationCluster, TestUtils.createEmptyContext(), (KubernetesResourceMetricGroup) null, this.configManager, flinkResourceContext -> {
            return nativeFlinkService;
        });
        AtomicReference atomicReference = new AtomicReference();
        testingClusterClient.setRequestProcessor((messageHeaders, messageParameters, requestBody) -> {
            return messageHeaders instanceof JobDetailsHeaders ? CompletableFuture.completedFuture((ResponseBody) atomicReference.get()) : CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
        });
        atomicReference.set(createJobDetailsFor(List.of()));
        Assertions.assertFalse(nativeFlinkService.scalingCompleted(flinkDeploymentContext));
        atomicReference.set(createJobDetailsFor(List.of(jobVertexDetailsInfo(jobVertexID, 1), jobVertexDetailsInfo(jobVertexID2, 1))));
        Assertions.assertFalse(nativeFlinkService.scalingCompleted(flinkDeploymentContext));
        atomicReference.set(createJobDetailsFor(List.of(jobVertexDetailsInfo(jobVertexID, 4), jobVertexDetailsInfo(jobVertexID2, 1))));
        Assertions.assertTrue(nativeFlinkService.scalingCompleted(flinkDeploymentContext));
        flinkDeploymentSpec.getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID + ":4," + jobVertexID2 + ":1," + new JobVertexID() + ":100");
        reconciliationStatus.serializeAndSetLastReconciledSpec(flinkDeploymentSpec, buildApplicationCluster);
        Assertions.assertTrue(nativeFlinkService.scalingCompleted(new FlinkDeploymentContext(buildApplicationCluster, TestUtils.createEmptyContext(), (KubernetesResourceMetricGroup) null, this.configManager, flinkResourceContext2 -> {
            return nativeFlinkService;
        })));
    }

    private JobDetailsInfo.JobVertexDetailsInfo jobVertexDetailsInfo(JobVertexID jobVertexID, int i) {
        return new JobDetailsInfo.JobVertexDetailsInfo(jobVertexID, "", 900, i, ExecutionState.RUNNING, 0L, 0L, 0L, Map.of(), new IOMetricsInfo(0L, false, 0L, false, 0L, false, 0L, false, 0L, 0L, Double.valueOf(0.0d)));
    }

    @Test
    public void resourceRestApiTest() throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration);
        NativeFlinkService nativeFlinkService = (NativeFlinkService) createFlinkService(testingClusterClient);
        JobID jobID = new JobID();
        JobResourceRequirements jobResourceRequirements = new JobResourceRequirements(Map.of(new JobVertexID(), new JobVertexResourceRequirements(new JobVertexResourceRequirements.Parallelism(0, 2))));
        testingClusterClient.setRequestProcessor((messageHeaders, messageParameters, requestBody) -> {
            if (messageHeaders instanceof JobResourceRequirementsHeaders) {
                if (jobID.equals(((JobMessageParameters) messageParameters).jobPathParameter.getValue())) {
                    return CompletableFuture.completedFuture(new JobResourceRequirementsBody(jobResourceRequirements));
                }
            } else if ((requestBody instanceof JobResourceRequirementsBody) && jobID.equals(((JobMessageParameters) messageParameters).jobPathParameter.getValue())) {
                Assertions.assertEquals(Optional.of(jobResourceRequirements), ((JobResourceRequirementsBody) requestBody).asJobResourceRequirements());
                return CompletableFuture.completedFuture(null);
            }
            Assertions.fail("unknown request");
            return null;
        });
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setJobId(jobID.toString());
        Assertions.assertEquals(jobResourceRequirements.getJobVertexParallelisms(), nativeFlinkService.getVertexResources(testingClusterClient, buildApplicationCluster));
        nativeFlinkService.updateVertexResources(testingClusterClient, buildApplicationCluster, jobResourceRequirements.getJobVertexParallelisms());
    }

    public static JobDetailsInfo createJobDetailsFor(List<JobDetailsInfo.JobVertexDetailsInfo> list) {
        return new JobDetailsInfo(new JobID(), "", false, JobStatus.RUNNING, 0L, 0L, 0L, 0L, 0L, Map.of(), list, Map.of(), new JobPlanInfo.RawJson(""));
    }

    private AbstractFlinkService createFlinkService(final RestClusterClient<String> restClusterClient) {
        return new NativeFlinkService(this.client, null, this.executorService, this.operatorConfig, this.eventRecorder) { // from class: org.apache.flink.kubernetes.operator.service.NativeFlinkServiceTest.3
            public RestClusterClient<String> getClusterClient(Configuration configuration) {
                return restClusterClient;
            }
        };
    }

    private Configuration createOperatorConfig() {
        return Configuration.fromMap(Map.of(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT.key(), "80"));
    }
}
