package org.apache.flink.kubernetes.operator.controller;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/controller/FlinkResourceContextTest.class */
public class FlinkResourceContextTest {
    private Context<HasMetadata> josdkCtx;
    private FlinkConfigManager configManager;
    private Function<FlinkResourceContext<?>, FlinkService> serviceFactory;

    @BeforeEach
    void setup() {
        this.josdkCtx = TestUtils.createContextWithReadyFlinkDeployment();
        this.configManager = new FlinkConfigManager(new Configuration());
        this.serviceFactory = flinkResourceContext -> {
            return new TestingFlinkService();
        };
    }

    @MethodSource({"crTypes"})
    @ParameterizedTest
    void testCreateService(AbstractFlinkResource<?, ?> abstractFlinkResource) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        FlinkService testingFlinkService = new TestingFlinkService();
        this.serviceFactory = flinkResourceContext -> {
            atomicInteger.incrementAndGet();
            return testingFlinkService;
        };
        FlinkResourceContext context = getContext(abstractFlinkResource);
        Assertions.assertTrue(context.getFlinkService() == testingFlinkService);
        Assertions.assertTrue(context.getFlinkService() == testingFlinkService);
        Assertions.assertTrue(context.getFlinkService() == testingFlinkService);
        Assertions.assertEquals(1, atomicInteger.get());
    }

    @Test
    void testOperatorConfigHandling() {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Configuration configuration = new Configuration();
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_INTERVAL, Duration.ofMinutes(1L));
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_OBSERVER_REST_READY_DELAY, Duration.ofMinutes(2L));
        configuration.setString("kubernetes.operator.default-configuration.flink-version.v1_17.kubernetes.operator.reconcile.interval", "17m");
        configuration.setString("kubernetes.operator.default-configuration.flink-version.v1_18.kubernetes.operator.reconcile.interval", "18m");
        this.configManager = new FlinkConfigManager(configuration) { // from class: org.apache.flink.kubernetes.operator.controller.FlinkResourceContextTest.1
            public Configuration getDefaultConfig(String str, FlinkVersion flinkVersion) {
                atomicInteger.incrementAndGet();
                return super.getDefaultConfig(str, flinkVersion);
            }
        };
        FlinkResourceContext context = getContext(TestUtils.buildApplicationCluster(FlinkVersion.v1_18));
        Assertions.assertEquals(Duration.ofMinutes(18L), context.getOperatorConfig().getReconcileInterval());
        Assertions.assertEquals(Duration.ofMinutes(2L), context.getOperatorConfig().getRestApiReadyDelay());
        Assertions.assertEquals(1, atomicInteger.get());
        FlinkResourceContext context2 = getContext(TestUtils.buildApplicationCluster(FlinkVersion.v1_17));
        Assertions.assertEquals(Duration.ofMinutes(17L), context2.getOperatorConfig().getReconcileInterval());
        Assertions.assertEquals(Duration.ofMinutes(2L), context2.getOperatorConfig().getRestApiReadyDelay());
        Assertions.assertEquals(2, atomicInteger.get());
    }

    @Test
    void testObserveConfigHandling() {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final Configuration configuration = new Configuration();
        this.configManager = new FlinkConfigManager(new Configuration()) { // from class: org.apache.flink.kubernetes.operator.controller.FlinkResourceContextTest.2
            public Configuration getObserveConfig(FlinkDeployment flinkDeployment) {
                atomicInteger.incrementAndGet();
                return configuration;
            }
        };
        FlinkResourceContext context = getContext(TestUtils.buildApplicationCluster());
        Assertions.assertTrue(context.getObserveConfig() == configuration);
        Assertions.assertTrue(context.getObserveConfig() == configuration);
        Assertions.assertTrue(context.getObserveConfig() == configuration);
        Assertions.assertEquals(1, atomicInteger.get());
        this.josdkCtx = TestUtils.createEmptyContext();
        Assertions.assertNull(getContext(TestUtils.buildSessionJob()).getObserveConfig());
        Assertions.assertEquals(1, atomicInteger.get());
        this.josdkCtx = TestUtils.createContextWithReadyFlinkDeployment();
        FlinkResourceContext context2 = getContext(TestUtils.buildSessionJob());
        Assertions.assertTrue(context2.getObserveConfig() == configuration);
        Assertions.assertTrue(context2.getObserveConfig() == configuration);
        Assertions.assertEquals(2, atomicInteger.get());
    }

    FlinkResourceContext getContext(AbstractFlinkResource<?, ?> abstractFlinkResource) {
        return abstractFlinkResource instanceof FlinkDeployment ? new FlinkDeploymentContext((FlinkDeployment) abstractFlinkResource, this.josdkCtx, (KubernetesResourceMetricGroup) null, this.configManager, this.serviceFactory) : new FlinkSessionJobContext((FlinkSessionJob) abstractFlinkResource, this.josdkCtx, (KubernetesResourceMetricGroup) null, this.configManager, this.serviceFactory);
    }

    private static Stream<AbstractFlinkResource<?, ?>> crTypes() {
        return Stream.of((Object[]) new AbstractFlinkResource[]{TestUtils.buildApplicationCluster(), TestUtils.buildSessionJob()});
    }
}
