package org.apache.flink.kubernetes.operator.health;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/health/CanaryResourceManager.class */
public class CanaryResourceManager<CR extends AbstractFlinkResource<?, ?>> {
    private static final Logger LOG = LoggerFactory.getLogger(CanaryResourceManager.class);
    public static final String CANARY_LABEL = "flink.apache.org/canary";
    private final ConcurrentHashMap<ResourceID, CanaryResourceManager<CR>.CanaryResourceState> canaryResources = new ConcurrentHashMap<>();
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
    private final FlinkConfigManager configManager;
    private final KubernetesClient kubernetesClient;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/kubernetes/operator/health/CanaryResourceManager$CanaryResourceState.class */
    public class CanaryResourceState {
        CR resource;
        long previousGeneration;
        boolean isHealthy = true;

        private CanaryResourceState() {
        }

        void onReconcile(CR cr) {
            this.resource = cr;
        }

        boolean canaryReconciledSinceUpdate() {
            return this.resource.getMetadata().getGeneration().longValue() > this.previousGeneration;
        }
    }

    public boolean allCanariesHealthy() {
        return this.canaryResources.values().stream().allMatch(canaryResourceState -> {
            return canaryResourceState.isHealthy;
        });
    }

    public boolean handleCanaryResourceReconciliation(CR cr) {
        if (!isCanaryResource(cr)) {
            return false;
        }
        ResourceID fromResource = ResourceID.fromResource(cr);
        LOG.info("Reconciling canary resource");
        this.canaryResources.compute(fromResource, (resourceID, canaryResourceState) -> {
            boolean z = false;
            if (canaryResourceState == null) {
                z = true;
                canaryResourceState = new CanaryResourceState();
            }
            canaryResourceState.onReconcile(cr);
            if (z) {
                updateSpecAndScheduleHealthCheck(fromResource, canaryResourceState);
            }
            return canaryResourceState;
        });
        return true;
    }

    public boolean handleCanaryResourceDeletion(CR cr) {
        if (!isCanaryResource(cr)) {
            return false;
        }
        ResourceID fromResource = ResourceID.fromResource(cr);
        LOG.info("Deleting canary resource");
        this.canaryResources.remove(fromResource);
        return true;
    }

    /* JADX WARN: Type inference failed for: r0v11, types: [org.apache.flink.kubernetes.operator.api.AbstractFlinkResource, CR extends org.apache.flink.kubernetes.operator.api.AbstractFlinkResource<?, ?>] */
    /* JADX WARN: Type inference failed for: r0v6, types: [org.apache.flink.kubernetes.operator.api.AbstractFlinkResource, CR extends org.apache.flink.kubernetes.operator.api.AbstractFlinkResource<?, ?>] */
    /* JADX WARN: Type inference failed for: r1v8, types: [org.apache.flink.kubernetes.operator.api.AbstractFlinkResource, CR extends org.apache.flink.kubernetes.operator.api.AbstractFlinkResource<?, ?>] */
    private void updateSpecAndScheduleHealthCheck(ResourceID resourceID, CanaryResourceManager<CR>.CanaryResourceState canaryResourceState) {
        Duration duration = (Duration) this.configManager.getDefaultConfig().get(KubernetesOperatorConfigOptions.CANARY_RESOURCE_TIMEOUT);
        Long restartNonce = ((AbstractFlinkSpec) canaryResourceState.resource.getSpec()).getRestartNonce();
        ((AbstractFlinkSpec) canaryResourceState.resource.getSpec()).setRestartNonce(Long.valueOf(restartNonce == null ? 1L : restartNonce.longValue() + 1));
        canaryResourceState.previousGeneration = canaryResourceState.resource.getMetadata().getGeneration().longValue();
        LOG.info("Scheduling canary check for {} in {}s", resourceID, Long.valueOf(duration.toSeconds()));
        try {
            this.kubernetesClient.resource((AbstractFlinkResource) ReconciliationUtils.clone(canaryResourceState.resource)).replace();
        } catch (Throwable th) {
            LOG.warn("Could not bump canary deployment, it may have been deleted", th);
        }
        this.executorService.schedule(() -> {
            checkHealth(resourceID);
        }, duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    /* JADX WARN: Type inference failed for: r5v5, types: [org.apache.flink.kubernetes.operator.api.AbstractFlinkResource, CR extends org.apache.flink.kubernetes.operator.api.AbstractFlinkResource<?, ?>] */
    @VisibleForTesting
    protected void checkHealth(ResourceID resourceID) {
        CanaryResourceManager<CR>.CanaryResourceState canaryResourceState = this.canaryResources.get(resourceID);
        if (canaryResourceState == null) {
            LOG.info("Canary resource {} not found. Stopping health checks", resourceID);
            return;
        }
        if (canaryResourceState.canaryReconciledSinceUpdate()) {
            LOG.info("Canary deployment healthy");
            canaryResourceState.isHealthy = true;
        } else {
            LOG.error("Canary deployment {} latest spec not reconciled. Expected generation larger than {}, received {}", new Object[]{resourceID, Long.valueOf(canaryResourceState.previousGeneration), canaryResourceState.resource.getMetadata().getGeneration()});
            canaryResourceState.isHealthy = false;
        }
        updateSpecAndScheduleHealthCheck(resourceID, canaryResourceState);
    }

    @VisibleForTesting
    public int getNumberOfActiveCanaries() {
        return this.canaryResources.size();
    }

    public static boolean isCanaryResource(HasMetadata hasMetadata) {
        Map labels = hasMetadata.getMetadata().getLabels();
        if (labels == null) {
            return false;
        }
        return "true".equalsIgnoreCase((String) labels.getOrDefault(CANARY_LABEL, "false"));
    }

    public CanaryResourceManager(FlinkConfigManager flinkConfigManager, KubernetesClient kubernetesClient) {
        this.configManager = flinkConfigManager;
        this.kubernetesClient = kubernetesClient;
    }
}
