package org.apache.seatunnel.engine.client.job;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.Job;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;

/* loaded from: input_file:org/apache/seatunnel/engine/client/job/ClientJobProxy.class */
public class ClientJobProxy implements Job {
    private static final ILogger LOGGER = Logger.getLogger(ClientJobProxy.class);
    private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
    private final JobImmutableInformation jobImmutableInformation;

    public ClientJobProxy(@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient, @NonNull JobImmutableInformation jobImmutableInformation) {
        if (seaTunnelHazelcastClient == null) {
            throw new NullPointerException("seaTunnelHazelcastClient is marked @NonNull but is null");
        }
        if (jobImmutableInformation == null) {
            throw new NullPointerException("jobImmutableInformation is marked @NonNull but is null");
        }
        this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
        this.jobImmutableInformation = jobImmutableInformation;
        submitJob();
    }

    public long getJobId() {
        return this.jobImmutableInformation.getJobId();
    }

    private void submitJob() {
        LOGGER.info(String.format("start submit job, job id: %s, with plugin jar %s", Long.valueOf(this.jobImmutableInformation.getJobId()), this.jobImmutableInformation.getPluginJarsUrls()));
        this.seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(SeaTunnelSubmitJobCodec.encodeRequest(this.jobImmutableInformation.getJobId(), this.seaTunnelHazelcastClient.getSerializationService().toData(this.jobImmutableInformation))).join();
    }

    public JobStatus waitForJobComplete() {
        try {
            JobStatus jobStatus = (JobStatus) RetryUtils.retryWithException(() -> {
                return (JobStatus) doWaitForJobComplete().get();
            }, new RetryUtils.RetryMaterial(5, true, exc -> {
                return exc instanceof RuntimeException;
            }, 2000L));
            LOGGER.info(String.format("Job %s (%s) end with state %s", this.jobImmutableInformation.getJobConfig().getName(), Long.valueOf(this.jobImmutableInformation.getJobId()), jobStatus));
            return jobStatus;
        } catch (Exception e) {
            LOGGER.info(String.format("Job %s (%s) end with unknown state, and throw Exception: %s", Long.valueOf(this.jobImmutableInformation.getJobId()), this.jobImmutableInformation.getJobConfig().getName(), ExceptionUtils.getMessage(e)));
            throw new RuntimeException(e);
        }
    }

    public PassiveCompletableFuture<JobStatus> doWaitForJobComplete() {
        return this.seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(SeaTunnelWaitForJobCompleteCodec.encodeRequest(this.jobImmutableInformation.getJobId()), clientMessage -> {
            return JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(clientMessage)];
        });
    }

    public void cancelJob() {
        this.seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(SeaTunnelCancelJobCodec.encodeRequest(this.jobImmutableInformation.getJobId())).join();
    }

    public JobStatus getJobStatus() {
        return JobStatus.values()[((Integer) this.seaTunnelHazelcastClient.requestOnMasterAndDecodeResponse(SeaTunnelGetJobStatusCodec.encodeRequest(this.jobImmutableInformation.getJobId()), SeaTunnelGetJobStatusCodec::decodeResponse)).intValue()];
    }
}
