package org.apache.reef.vortex.evaluator;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import javax.inject.Inject;
import org.apache.commons.lang.SerializationUtils;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.TaskSide;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.task.HeartBeatTriggerManager;
import org.apache.reef.task.Task;
import org.apache.reef.task.TaskMessage;
import org.apache.reef.task.TaskMessageSource;
import org.apache.reef.task.events.CloseEvent;
import org.apache.reef.task.events.DriverMessage;
import org.apache.reef.util.Optional;
import org.apache.reef.vortex.common.TaskletExecutionRequest;
import org.apache.reef.vortex.common.TaskletFailureReport;
import org.apache.reef.vortex.common.TaskletResultReport;
import org.apache.reef.vortex.common.VortexRequest;
import org.apache.reef.vortex.driver.VortexWorkerConf;
import org.apache.reef.wake.EventHandler;

@Unit
@Unstable
@TaskSide
/* loaded from: input_file:org/apache/reef/vortex/evaluator/VortexWorker.class */
public final class VortexWorker implements Task, TaskMessageSource {
    private static final String MESSAGE_SOURCE_ID = "";
    private final HeartBeatTriggerManager heartBeatTriggerManager;
    private final int numOfThreads;
    private final BlockingDeque<byte[]> pendingRequests = new LinkedBlockingDeque();
    private final BlockingDeque<byte[]> workerReports = new LinkedBlockingDeque();
    private final CountDownLatch terminated = new CountDownLatch(1);

    /* renamed from: org.apache.reef.vortex.evaluator.VortexWorker$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/reef/vortex/evaluator/VortexWorker$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$reef$vortex$common$VortexRequest$RequestType = new int[VortexRequest.RequestType.values().length];

        static {
            try {
                $SwitchMap$org$apache$reef$vortex$common$VortexRequest$RequestType[VortexRequest.RequestType.ExecuteTasklet.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    /* loaded from: input_file:org/apache/reef/vortex/evaluator/VortexWorker$DriverMessageHandler.class */
    public final class DriverMessageHandler implements EventHandler<DriverMessage> {
        public DriverMessageHandler() {
        }

        public void onNext(DriverMessage driverMessage) {
            if (driverMessage.get().isPresent()) {
                VortexWorker.this.pendingRequests.addLast(driverMessage.get().get());
            }
        }
    }

    /* loaded from: input_file:org/apache/reef/vortex/evaluator/VortexWorker$TaskCloseHandler.class */
    public final class TaskCloseHandler implements EventHandler<CloseEvent> {
        public TaskCloseHandler() {
        }

        public void onNext(CloseEvent closeEvent) {
            VortexWorker.this.terminated.countDown();
        }
    }

    @Inject
    private VortexWorker(HeartBeatTriggerManager heartBeatTriggerManager, @Parameter(VortexWorkerConf.NumOfThreads.class) int i) {
        this.heartBeatTriggerManager = heartBeatTriggerManager;
        this.numOfThreads = i;
    }

    public byte[] call(byte[] bArr) throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.numOfThreads);
        newSingleThreadExecutor.execute(new Runnable() { // from class: org.apache.reef.vortex.evaluator.VortexWorker.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        final byte[] bArr2 = (byte[]) VortexWorker.this.pendingRequests.takeFirst();
                        newFixedThreadPool.execute(new Runnable() { // from class: org.apache.reef.vortex.evaluator.VortexWorker.1.1
                            @Override // java.lang.Runnable
                            public void run() {
                                VortexRequest vortexRequest = (VortexRequest) SerializationUtils.deserialize(bArr2);
                                switch (AnonymousClass2.$SwitchMap$org$apache$reef$vortex$common$VortexRequest$RequestType[vortexRequest.getType().ordinal()]) {
                                    case 1:
                                        TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest;
                                        try {
                                            VortexWorker.this.workerReports.addLast(SerializationUtils.serialize(new TaskletResultReport(taskletExecutionRequest.getTaskletId(), taskletExecutionRequest.execute())));
                                        } catch (Exception e) {
                                            VortexWorker.this.workerReports.addLast(SerializationUtils.serialize(new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e)));
                                        }
                                        VortexWorker.this.heartBeatTriggerManager.triggerHeartBeat();
                                        return;
                                    default:
                                        throw new RuntimeException("Unknown Command");
                                }
                            }
                        });
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        });
        this.terminated.await();
        return null;
    }

    public Optional<TaskMessage> getMessage() {
        byte[] pollFirst = this.workerReports.pollFirst();
        return pollFirst != null ? Optional.of(TaskMessage.from(MESSAGE_SOURCE_ID, pollFirst)) : Optional.empty();
    }
}
