package org.apache.flink.kubernetes.operator.observer.sessionjob;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.MissingSessionJobException;
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.class */
public class FlinkSessionJobObserver extends AbstractFlinkResourceObserver<FlinkSessionJob> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobObserver.class);
    private final SessionJobStatusObserver jobStatusObserver;
    private final SavepointObserver<FlinkSessionJob, FlinkSessionJobStatus> savepointObserver;

    /* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver$SessionJobStatusObserver.class */
    private static class SessionJobStatusObserver extends JobStatusObserver<FlinkSessionJob> {
        public SessionJobStatusObserver(EventRecorder eventRecorder) {
            super(eventRecorder);
        }

        @Override // org.apache.flink.kubernetes.operator.observer.JobStatusObserver
        protected void onTimeout(FlinkResourceContext<FlinkSessionJob> flinkResourceContext) {
        }

        @Override // org.apache.flink.kubernetes.operator.observer.JobStatusObserver
        protected Optional<JobStatusMessage> filterTargetJob(JobStatus jobStatus, List<JobStatusMessage> list) {
            String str = (String) Preconditions.checkNotNull(jobStatus.getJobId(), "The jobID to be observed should not be null");
            List list2 = (List) list.stream().filter(jobStatusMessage -> {
                return jobStatusMessage.getJobId().toHexString().equals(str);
            }).collect(Collectors.toList());
            Preconditions.checkArgument(list2.size() <= 1, String.format("Expected one job for JobID: %s, but found %d", jobStatus.getJobId(), Integer.valueOf(list2.size())));
            if (list2.size() != 0) {
                return Optional.of((JobStatusMessage) list2.get(0));
            }
            FlinkSessionJobObserver.LOG.warn("No job found for JobID: {}", str);
            return Optional.empty();
        }

        @Override // org.apache.flink.kubernetes.operator.observer.JobStatusObserver
        protected void onTargetJobNotFound(FlinkResourceContext<FlinkSessionJob> flinkResourceContext) {
            ifHaDisabledMarkSessionJobMissing(flinkResourceContext);
        }

        @Override // org.apache.flink.kubernetes.operator.observer.JobStatusObserver
        protected void onNoJobsFound(FlinkResourceContext<FlinkSessionJob> flinkResourceContext) {
            ifHaDisabledMarkSessionJobMissing(flinkResourceContext);
        }

        private void ifHaDisabledMarkSessionJobMissing(FlinkResourceContext<FlinkSessionJob> flinkResourceContext) {
            AbstractFlinkResource<?, ?> abstractFlinkResource = (FlinkSessionJob) flinkResourceContext.getResource();
            if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkResourceContext.getObserveConfig())) {
                return;
            }
            ((FlinkSessionJobStatus) abstractFlinkResource.getStatus()).getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
            FlinkSessionJobObserver.LOG.error(JobStatusObserver.MISSING_SESSION_JOB_ERR);
            ReconciliationUtils.updateForReconciliationError(flinkResourceContext, new MissingSessionJobException(JobStatusObserver.MISSING_SESSION_JOB_ERR));
            this.eventRecorder.triggerEvent(abstractFlinkResource, EventRecorder.Type.Warning, EventRecorder.Reason.Missing, EventRecorder.Component.Job, JobStatusObserver.MISSING_SESSION_JOB_ERR);
        }
    }

    public FlinkSessionJobObserver(EventRecorder eventRecorder) {
        super(eventRecorder);
        this.jobStatusObserver = new SessionJobStatusObserver(eventRecorder);
        this.savepointObserver = new SavepointObserver<>(eventRecorder);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver
    public boolean isResourceReadyToBeObserved(FlinkResourceContext<FlinkSessionJob> flinkResourceContext) {
        return super.isResourceReadyToBeObserved(flinkResourceContext) && flinkResourceContext.getFlinkService() != null;
    }

    @Override // org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver
    protected void observeInternal(FlinkResourceContext<FlinkSessionJob> flinkResourceContext) {
        if (this.jobStatusObserver.observe(flinkResourceContext)) {
            this.savepointObserver.observeSavepointStatus(flinkResourceContext);
        }
    }

    @Override // org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver
    protected boolean checkIfAlreadyUpgraded(FlinkResourceContext<FlinkSessionJob> flinkResourceContext) {
        FlinkSessionJob resource = flinkResourceContext.getResource();
        String uid = resource.getMetadata().getUid();
        try {
            Collection<JobStatusMessage> listJobs = flinkResourceContext.getFlinkService().listJobs(flinkResourceContext.getObserveConfig());
            ArrayList arrayList = new ArrayList();
            for (JobStatusMessage jobStatusMessage : listJobs) {
                JobID jobId = jobStatusMessage.getJobId();
                if (jobId.getLowerPart() == FlinkUtils.generateSessionJobFixedJobID(uid, Long.valueOf(jobId.getUpperPart() + 1)).getLowerPart() && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
                    arrayList.add(jobId);
                }
            }
            if (arrayList.isEmpty()) {
                return false;
            }
            if (arrayList.size() > 1) {
                throw new RuntimeException(String.format("Unexpected case: %d job found for the resource with uid: %s", Integer.valueOf(arrayList.size()), resource.getMetadata().getUid()));
            }
            JobID jobID = (JobID) arrayList.get(0);
            Long upgradeTargetGeneration = ReconciliationUtils.getUpgradeTargetGeneration(resource);
            long upperPart = jobID.getUpperPart();
            String jobId2 = ((FlinkSessionJobStatus) resource.getStatus()).getJobStatus().getJobId();
            if (upgradeTargetGeneration.longValue() != upperPart) {
                throw new RuntimeException(String.format("Running job %s's generation %s doesn't match upgrade target generation %s.", jobID.toHexString(), Long.valueOf(upperPart), upgradeTargetGeneration));
            }
            LOG.info("Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}", jobId2, jobID.toHexString());
            ((FlinkSessionJobStatus) resource.getStatus()).getJobStatus().setJobId(jobID.toHexString());
            return true;
        } catch (Exception e) {
            throw new RuntimeException("Failed to list jobs", e);
        }
    }
}
