package org.apache.flink.kubernetes.operator.observer.sessionjob;

import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
import org.apache.flink.kubernetes.operator.observer.context.VoidObserverContext;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobHelper;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import org.apache.flink.kubernetes.operator.utils.StatusHelper;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.class */
public class SessionJobObserver implements Observer<FlinkSessionJob> {
    private static final Logger LOG = LoggerFactory.getLogger(SessionJobObserver.class);
    private final FlinkConfigManager configManager;
    private final SavepointObserver savepointObserver;
    private final JobStatusObserver<VoidObserverContext> jobStatusObserver;

    public SessionJobObserver(FlinkService flinkService, FlinkConfigManager flinkConfigManager, StatusHelper<FlinkSessionJobStatus> statusHelper) {
        this.configManager = flinkConfigManager;
        this.savepointObserver = new SavepointObserver(flinkService, flinkConfigManager, statusHelper);
        this.jobStatusObserver = new JobStatusObserver<VoidObserverContext>(flinkService) { // from class: org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.flink.kubernetes.operator.observer.JobStatusObserver
            public void onTimeout(VoidObserverContext voidObserverContext) {
            }

            @Override // org.apache.flink.kubernetes.operator.observer.JobStatusObserver
            protected Optional<String> updateJobStatus(JobStatus jobStatus, List<JobStatusMessage> list) {
                String str = (String) Preconditions.checkNotNull(jobStatus.getJobId(), "The jobID to be observed should not be null");
                List list2 = (List) list.stream().filter(jobStatusMessage -> {
                    return jobStatusMessage.getJobId().toHexString().equals(str);
                }).collect(Collectors.toList());
                Preconditions.checkArgument(list2.size() <= 1, String.format("Expected one job for JobID: %s, but %d founded", jobStatus.getJobId(), Integer.valueOf(list2.size())));
                if (list2.size() == 0) {
                    SessionJobObserver.LOG.info("No job found for JobID: {}", str);
                    return Optional.empty();
                }
                JobStatusMessage jobStatusMessage2 = (JobStatusMessage) list2.get(0);
                jobStatus.setState(jobStatusMessage2.getJobState().name());
                jobStatus.setJobName(jobStatusMessage2.getJobName());
                jobStatus.setStartTime(String.valueOf(jobStatusMessage2.getStartTime()));
                jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
                return Optional.of(jobStatus.getState());
            }
        };
    }

    @Override // org.apache.flink.kubernetes.operator.observer.Observer
    public void observe(FlinkSessionJob flinkSessionJob, Context context) {
        if (((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus2().getLastReconciledSpec() == null) {
            return;
        }
        Optional<FlinkDeployment> secondaryResource = OperatorUtils.getSecondaryResource(flinkSessionJob, context, this.configManager.getOperatorConfiguration());
        if (new SessionJobHelper(flinkSessionJob, LOG).sessionClusterReady(secondaryResource)) {
            Configuration sessionJobConfig = this.configManager.getSessionJobConfig(secondaryResource.get(), flinkSessionJob);
            if (this.jobStatusObserver.observe(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus(), sessionJobConfig, VoidObserverContext.INSTANCE)) {
                this.savepointObserver.observeSavepointStatus(flinkSessionJob, sessionJobConfig);
            }
        }
    }
}
