package org.apache.flink.kubernetes.operator.observer.sessionjob;

import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
import org.apache.flink.kubernetes.operator.observer.context.VoidObserverContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.class */
public class SessionJobObserver implements Observer<FlinkSessionJob> {
    private static final Logger LOG = LoggerFactory.getLogger(SessionJobObserver.class);
    private final FlinkConfigManager configManager;
    private final EventRecorder eventRecorder;
    private final SavepointObserver<FlinkSessionJobStatus> savepointObserver;
    private final JobStatusObserver<VoidObserverContext> jobStatusObserver;
    private final FlinkService flinkService;

    public SessionJobObserver(FlinkService flinkService, FlinkConfigManager flinkConfigManager, StatusRecorder<FlinkSessionJobStatus> statusRecorder, EventRecorder eventRecorder) {
        this.configManager = flinkConfigManager;
        this.eventRecorder = eventRecorder;
        this.flinkService = flinkService;
        this.savepointObserver = new SavepointObserver<>(flinkService, flinkConfigManager, statusRecorder, eventRecorder);
        this.jobStatusObserver = new JobStatusObserver<VoidObserverContext>(flinkService, eventRecorder) { // from class: org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.flink.kubernetes.operator.observer.JobStatusObserver
            public void onTimeout(VoidObserverContext voidObserverContext) {
            }

            @Override // org.apache.flink.kubernetes.operator.observer.JobStatusObserver
            protected Optional<JobStatusMessage> filterTargetJob(JobStatus jobStatus, List<JobStatusMessage> list) {
                String str = (String) Preconditions.checkNotNull(jobStatus.getJobId(), "The jobID to be observed should not be null");
                List list2 = (List) list.stream().filter(jobStatusMessage -> {
                    return jobStatusMessage.getJobId().toHexString().equals(str);
                }).collect(Collectors.toList());
                Preconditions.checkArgument(list2.size() <= 1, String.format("Expected one job for JobID: %s, but %d founded", jobStatus.getJobId(), Integer.valueOf(list2.size())));
                if (list2.size() != 0) {
                    return Optional.of((JobStatusMessage) list2.get(0));
                }
                SessionJobObserver.LOG.info("No job found for JobID: {}", str);
                return Optional.empty();
            }
        };
    }

    @Override // org.apache.flink.kubernetes.operator.observer.Observer
    public void observe(FlinkSessionJob flinkSessionJob, Context context) {
        if (((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus2().isFirstDeployment()) {
            return;
        }
        Optional secondaryResource = context.getSecondaryResource(FlinkDeployment.class);
        if (SessionJobReconciler.sessionClusterReady(secondaryResource)) {
            Configuration sessionJobConfig = this.configManager.getSessionJobConfig((FlinkDeployment) secondaryResource.get(), (FlinkSessionJobSpec) flinkSessionJob.getSpec());
            ReconciliationStatus<FlinkSessionJobSpec> reconciliationStatus2 = ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus2();
            if (reconciliationStatus2.getState() == ReconciliationState.UPGRADING) {
                checkIfAlreadyUpgraded(flinkSessionJob, sessionJobConfig);
                if (reconciliationStatus2.getState() == ReconciliationState.UPGRADING) {
                    return;
                }
            }
            if (this.jobStatusObserver.observe(flinkSessionJob, sessionJobConfig, VoidObserverContext.INSTANCE)) {
                this.savepointObserver.observeSavepointStatus(flinkSessionJob, sessionJobConfig);
            }
            SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, this.eventRecorder);
        }
    }

    private void checkIfAlreadyUpgraded(FlinkSessionJob flinkSessionJob, Configuration configuration) {
        String uid = flinkSessionJob.getMetadata().getUid();
        try {
            Collection<JobStatusMessage> listJobs = this.flinkService.listJobs(configuration);
            ArrayList arrayList = new ArrayList();
            for (JobStatusMessage jobStatusMessage : listJobs) {
                JobID jobId = jobStatusMessage.getJobId();
                if (jobId.getLowerPart() == uid.hashCode() && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
                    arrayList.add(jobId);
                }
            }
            if (arrayList.isEmpty()) {
                return;
            }
            if (arrayList.size() > 1) {
                throw new RuntimeException(String.format("Unexpected case: %d job found for the resource with uid: %s", Integer.valueOf(arrayList.size()), flinkSessionJob.getMetadata().getUid()));
            }
            JobID jobID = (JobID) arrayList.get(0);
            Long upgradeTargetGeneration = ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
            long upperPart = jobID.getUpperPart();
            String jobId2 = ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getJobId();
            if (upgradeTargetGeneration.longValue() != upperPart) {
                throw new RuntimeException(String.format("Running job %s's generation %s doesn't match upgrade target generation %s.", jobID.toHexString(), Long.valueOf(upperPart), upgradeTargetGeneration));
            }
            LOG.info("Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}", jobId2, jobID.toHexString());
            ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkSessionJob);
            ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
            ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().setJobId(jobID.toHexString());
        } catch (Exception e) {
            throw new RuntimeException("Failed to list jobs", e);
        }
    }
}
