package org.apache.seatunnel.engine.client.job;

import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.Job;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;

/* loaded from: input_file:org/apache/seatunnel/engine/client/job/ClientJobProxy.class */
public class ClientJobProxy implements Job {
    private static final ILogger LOGGER = Logger.getLogger(ClientJobProxy.class);
    private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
    private final Long jobId;
    private JobResult jobResult;

    public ClientJobProxy(@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient, @NonNull JobImmutableInformation jobImmutableInformation) {
        if (seaTunnelHazelcastClient == null) {
            throw new NullPointerException("seaTunnelHazelcastClient is marked non-null but is null");
        }
        if (jobImmutableInformation == null) {
            throw new NullPointerException("jobImmutableInformation is marked non-null but is null");
        }
        this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
        this.jobId = Long.valueOf(jobImmutableInformation.getJobId());
        submitJob(jobImmutableInformation);
    }

    public ClientJobProxy(@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient, Long l) {
        if (seaTunnelHazelcastClient == null) {
            throw new NullPointerException("seaTunnelHazelcastClient is marked non-null but is null");
        }
        this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
        this.jobId = l;
    }

    public long getJobId() {
        return this.jobId.longValue();
    }

    private void submitJob(JobImmutableInformation jobImmutableInformation) {
        LOGGER.info(String.format("start submit job, job id: %s, with plugin jar %s", Long.valueOf(jobImmutableInformation.getJobId()), jobImmutableInformation.getPluginJarsUrls()));
        this.seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(SeaTunnelSubmitJobCodec.encodeRequest(jobImmutableInformation.getJobId(), this.seaTunnelHazelcastClient.getSerializationService().toData(jobImmutableInformation))).join();
    }

    public JobStatus waitForJobComplete() {
        try {
            this.jobResult = (JobResult) RetryUtils.retryWithException(() -> {
                return (JobResult) doWaitForJobComplete().get();
            }, new RetryUtils.RetryMaterial(100000, true, exc -> {
                return exc.getCause() instanceof OperationTimeoutException;
            }, 2000L));
            if (this.jobResult == null) {
                throw new SeaTunnelEngineException("failed to fetch job result");
            }
            LOGGER.info(String.format("Job (%s) end with state %s", this.jobId, this.jobResult.getStatus()));
            if (StringUtils.isNotEmpty(this.jobResult.getError()) || this.jobResult.getStatus().equals(JobStatus.FAILED)) {
                throw new SeaTunnelEngineException(this.jobResult.getError());
            }
            return this.jobResult.getStatus();
        } catch (Exception e) {
            LOGGER.info(String.format("Job (%s) end with unknown state, and throw Exception: %s", this.jobId, ExceptionUtils.getMessage(e)));
            throw new RuntimeException(e);
        }
    }

    public JobResult getJobResultCache() {
        return this.jobResult;
    }

    public PassiveCompletableFuture<JobResult> doWaitForJobComplete() {
        return new PassiveCompletableFuture<>(this.seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(SeaTunnelWaitForJobCompleteCodec.encodeRequest(this.jobId.longValue()), SeaTunnelWaitForJobCompleteCodec::decodeResponse).thenApply(obj -> {
            return (JobResult) this.seaTunnelHazelcastClient.getSerializationService().toObject(obj);
        }));
    }

    public void cancelJob() {
        this.seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(SeaTunnelCancelJobCodec.encodeRequest(this.jobId.longValue())).join();
    }

    public JobStatus getJobStatus() {
        return JobStatus.values()[((Integer) this.seaTunnelHazelcastClient.requestOnMasterAndDecodeResponse(SeaTunnelGetJobStatusCodec.encodeRequest(this.jobId.longValue()), SeaTunnelGetJobStatusCodec::decodeResponse)).intValue()];
    }
}
