package org.apache.ignite.internal.compute;

import java.lang.reflect.Constructor;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
import org.apache.ignite.internal.compute.message.ExecuteRequest;
import org.apache.ignite.internal.compute.message.ExecuteResponse;
import org.apache.ignite.internal.future.InFlightFutures;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/compute/ComputeComponentImpl.class */
public class ComputeComponentImpl implements ComputeComponent {
    private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
    private static final long THREAD_KEEP_ALIVE_SECONDS = 60;
    private final Ignite ignite;
    private final MessagingService messagingService;
    private final ComputeConfiguration configuration;
    private ExecutorService jobExecutorService;
    private final ClassLoader jobClassLoader = Thread.currentThread().getContextClassLoader();
    private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final InFlightFutures inFlightFutures = new InFlightFutures();
    static final /* synthetic */ boolean $assertionsDisabled;

    public ComputeComponentImpl(Ignite ignite, MessagingService messagingService, ComputeConfiguration computeConfiguration) {
        this.ignite = ignite;
        this.messagingService = messagingService;
        this.configuration = computeConfiguration;
    }

    @Override // org.apache.ignite.internal.compute.ComputeComponent
    public <R> CompletableFuture<R> executeLocally(Class<? extends ComputeJob<R>> cls, Object... objArr) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            CompletableFuture<R> doExecuteLocally = doExecuteLocally(cls, objArr);
            this.busyLock.leaveBusy();
            return doExecuteLocally;
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    @Override // org.apache.ignite.internal.compute.ComputeComponent
    public <R> CompletableFuture<R> executeLocally(String str, Object... objArr) {
        return CompletableFuture.completedFuture(null).thenCompose(obj -> {
            return executeLocally(jobClass(str), objArr);
        });
    }

    private <R> CompletableFuture<R> doExecuteLocally(Class<? extends ComputeJob<R>> cls, Object[] objArr) {
        if (!$assertionsDisabled && this.jobExecutorService == null) {
            throw new AssertionError("Not started yet!");
        }
        CompletableFuture<R> startLocalExecution = startLocalExecution(cls, objArr);
        this.inFlightFutures.registerFuture(startLocalExecution);
        return startLocalExecution;
    }

    private <R> CompletableFuture<R> startLocalExecution(Class<? extends ComputeJob<R>> cls, Object[] objArr) {
        try {
            return CompletableFuture.supplyAsync(() -> {
                return executeJob(cls, objArr);
            }, this.jobExecutorService);
        } catch (RejectedExecutionException e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    private <R> R executeJob(Class<? extends ComputeJob<R>> cls, Object[] objArr) {
        return (R) instantiateJob(cls).execute(new JobExecutionContextImpl(this.ignite), objArr);
    }

    private <R> ComputeJob<R> instantiateJob(Class<? extends ComputeJob<R>> cls) {
        if (!ComputeJob.class.isAssignableFrom(cls)) {
            throw new IgniteInternalException("'" + cls.getName() + "' does not implement ComputeJob interface");
        }
        try {
            Constructor<? extends ComputeJob<R>> declaredConstructor = cls.getDeclaredConstructor(new Class[0]);
            if (!declaredConstructor.canAccess(null)) {
                declaredConstructor.setAccessible(true);
            }
            return declaredConstructor.newInstance(new Object[0]);
        } catch (ReflectiveOperationException e) {
            throw new IgniteInternalException("Cannot instantiate job", e);
        }
    }

    @Override // org.apache.ignite.internal.compute.ComputeComponent
    public <R> CompletableFuture<R> executeRemotely(ClusterNode clusterNode, Class<? extends ComputeJob<R>> cls, Object... objArr) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            CompletableFuture<R> doExecuteRemotely = doExecuteRemotely(clusterNode, cls, objArr);
            this.busyLock.leaveBusy();
            return doExecuteRemotely;
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    @Override // org.apache.ignite.internal.compute.ComputeComponent
    public <R> CompletableFuture<R> executeRemotely(ClusterNode clusterNode, String str, Object... objArr) {
        return CompletableFuture.completedFuture(null).thenCompose(obj -> {
            return executeRemotely(clusterNode, jobClass(str), objArr);
        });
    }

    private <R> CompletableFuture<R> doExecuteRemotely(ClusterNode clusterNode, Class<? extends ComputeJob<R>> cls, Object[] objArr) {
        CompletableFuture<R> thenCompose = this.messagingService.invoke(clusterNode, this.messagesFactory.executeRequest().jobClassName(cls.getName()).args(objArr).build(), NETWORK_TIMEOUT_MILLIS).thenCompose(networkMessage -> {
            return resultFromExecuteResponse((ExecuteResponse) networkMessage);
        });
        this.inFlightFutures.registerFuture(thenCompose);
        return thenCompose;
    }

    private <R> CompletableFuture<R> resultFromExecuteResponse(ExecuteResponse executeResponse) {
        return executeResponse.throwable() != null ? CompletableFuture.failedFuture(executeResponse.throwable()) : CompletableFuture.completedFuture(executeResponse.result());
    }

    public synchronized void start() {
        this.jobExecutorService = new ThreadPoolExecutor(((Integer) this.configuration.threadPoolSize().value()).intValue(), ((Integer) this.configuration.threadPoolSize().value()).intValue(), THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, newExecutorServiceTaskQueue(), (ThreadFactory) new NamedThreadFactory("[" + this.ignite.name() + "] Compute-"));
        this.messagingService.addMessageHandler(ComputeMessageTypes.class, (networkMessage, networkAddress, l) -> {
            if (!$assertionsDisabled && l == null) {
                throw new AssertionError();
            }
            if (!(networkMessage instanceof ExecuteRequest)) {
                throw new IgniteInternalException("Unexpected message type " + networkMessage.getClass());
            }
            processExecuteRequest((ExecuteRequest) networkMessage, networkAddress, l.longValue());
        });
    }

    BlockingQueue<Runnable> newExecutorServiceTaskQueue() {
        return new LinkedBlockingQueue();
    }

    private void processExecuteRequest(ExecuteRequest executeRequest, NetworkAddress networkAddress, long j) {
        if (!this.busyLock.enterBusy()) {
            sendExecuteResponse(null, new NodeStoppingException(), networkAddress, Long.valueOf(j));
            return;
        }
        try {
            doExecuteLocally(jobClass(executeRequest.jobClassName()), executeRequest.args()).handle((obj, th) -> {
                return sendExecuteResponse(obj, th, networkAddress, Long.valueOf(j));
            });
            this.busyLock.leaveBusy();
        } catch (Throwable th2) {
            this.busyLock.leaveBusy();
            throw th2;
        }
    }

    @Nullable
    private Object sendExecuteResponse(Object obj, Throwable th, NetworkAddress networkAddress, Long l) {
        this.messagingService.respond(networkAddress, this.messagesFactory.executeResponse().result(obj).throwable(th).build(), l.longValue());
        return null;
    }

    private <R, J extends ComputeJob<R>> Class<J> jobClass(String str) {
        try {
            return (Class<J>) Class.forName(str, true, this.jobClassLoader);
        } catch (ClassNotFoundException e) {
            throw new IgniteInternalException("Cannot load job class by name '" + str + "'", e);
        }
    }

    public void stop() throws Exception {
        if (this.stopGuard.compareAndSet(false, true)) {
            this.busyLock.block();
            IgniteUtils.shutdownAndAwaitTermination(this.jobExecutorService, stopTimeoutMillis(), TimeUnit.MILLISECONDS);
            this.inFlightFutures.cancelInFlightFutures();
        }
    }

    long stopTimeoutMillis() {
        return ((Long) this.configuration.threadPoolStopTimeoutMillis().value()).longValue();
    }

    static {
        $assertionsDisabled = !ComputeComponentImpl.class.desiredAssertionStatus();
    }
}
