package org.apache.reef.vortex.evaluator;

import java.util.ArrayList;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.TaskSide;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.task.HeartBeatTriggerManager;
import org.apache.reef.task.Task;
import org.apache.reef.task.TaskMessage;
import org.apache.reef.task.TaskMessageSource;
import org.apache.reef.task.events.CloseEvent;
import org.apache.reef.task.events.DriverMessage;
import org.apache.reef.util.Optional;
import org.apache.reef.vortex.common.KryoUtils;
import org.apache.reef.vortex.driver.VortexWorkerConf;
import org.apache.reef.vortex.protocol.mastertoworker.MasterToWorkerRequest;
import org.apache.reef.vortex.protocol.mastertoworker.TaskletAggregateExecutionRequest;
import org.apache.reef.vortex.protocol.mastertoworker.TaskletAggregationRequest;
import org.apache.reef.vortex.protocol.mastertoworker.TaskletCancellationRequest;
import org.apache.reef.vortex.protocol.mastertoworker.TaskletExecutionRequest;
import org.apache.reef.vortex.protocol.workertomaster.TaskletCancelledReport;
import org.apache.reef.vortex.protocol.workertomaster.TaskletFailureReport;
import org.apache.reef.vortex.protocol.workertomaster.TaskletResultReport;
import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReports;
import org.apache.reef.wake.EventHandler;

@Unit
@Unstable
@TaskSide
/* loaded from: input_file:org/apache/reef/vortex/evaluator/VortexWorker.class */
public final class VortexWorker implements Task, TaskMessageSource {
    private static final Logger LOG;
    private static final String MESSAGE_SOURCE_ID = "";
    private final KryoUtils kryoUtils;
    private final HeartBeatTriggerManager heartBeatTriggerManager;
    private final int numOfThreads;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final BlockingDeque<byte[]> pendingRequests = new LinkedBlockingDeque();
    private final BlockingDeque<byte[]> workerReports = new LinkedBlockingDeque();
    private final ConcurrentMap<Integer, AggregateContainer> aggregates = new ConcurrentHashMap();
    private final CountDownLatch terminated = new CountDownLatch(1);

    /* renamed from: org.apache.reef.vortex.evaluator.VortexWorker$4, reason: invalid class name */
    /* loaded from: input_file:org/apache/reef/vortex/evaluator/VortexWorker$4.class */
    static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$reef$vortex$protocol$mastertoworker$MasterToWorkerRequest$Type = new int[MasterToWorkerRequest.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$reef$vortex$protocol$mastertoworker$MasterToWorkerRequest$Type[MasterToWorkerRequest.Type.AggregateTasklets.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$reef$vortex$protocol$mastertoworker$MasterToWorkerRequest$Type[MasterToWorkerRequest.Type.ExecuteAggregateTasklet.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$reef$vortex$protocol$mastertoworker$MasterToWorkerRequest$Type[MasterToWorkerRequest.Type.ExecuteTasklet.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$reef$vortex$protocol$mastertoworker$MasterToWorkerRequest$Type[MasterToWorkerRequest.Type.CancelTasklet.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/reef/vortex/evaluator/VortexWorker$DriverMessageHandler.class */
    public final class DriverMessageHandler implements EventHandler<DriverMessage> {
        public DriverMessageHandler() {
        }

        public void onNext(DriverMessage driverMessage) {
            if (driverMessage.get().isPresent()) {
                VortexWorker.this.pendingRequests.addLast(driverMessage.get().get());
            }
        }
    }

    /* loaded from: input_file:org/apache/reef/vortex/evaluator/VortexWorker$TaskCloseHandler.class */
    public final class TaskCloseHandler implements EventHandler<CloseEvent> {
        public TaskCloseHandler() {
        }

        public void onNext(CloseEvent closeEvent) {
            VortexWorker.this.terminated.countDown();
        }
    }

    @Inject
    private VortexWorker(HeartBeatTriggerManager heartBeatTriggerManager, KryoUtils kryoUtils, @Parameter(VortexWorkerConf.NumOfThreads.class) int i) {
        this.heartBeatTriggerManager = heartBeatTriggerManager;
        this.kryoUtils = kryoUtils;
        this.numOfThreads = i;
    }

    public byte[] call(byte[] bArr) throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.numOfThreads);
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        newSingleThreadExecutor.execute(new Runnable() { // from class: org.apache.reef.vortex.evaluator.VortexWorker.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        MasterToWorkerRequest masterToWorkerRequest = (MasterToWorkerRequest) VortexWorker.this.kryoUtils.deserialize((byte[]) VortexWorker.this.pendingRequests.takeFirst());
                        switch (AnonymousClass4.$SwitchMap$org$apache$reef$vortex$protocol$mastertoworker$MasterToWorkerRequest$Type[masterToWorkerRequest.getType().ordinal()]) {
                            case 1:
                                TaskletAggregationRequest taskletAggregationRequest = (TaskletAggregationRequest) masterToWorkerRequest;
                                VortexWorker.this.aggregates.put(Integer.valueOf(taskletAggregationRequest.getAggregateFunctionId()), new AggregateContainer(VortexWorker.this.heartBeatTriggerManager, VortexWorker.this.kryoUtils, VortexWorker.this.workerReports, taskletAggregationRequest));
                                break;
                            case 2:
                                VortexWorker.this.executeAggregateTasklet(newFixedThreadPool, masterToWorkerRequest);
                                break;
                            case 3:
                                VortexWorker.this.executeTasklet(newFixedThreadPool, concurrentHashMap, masterToWorkerRequest);
                                break;
                            case 4:
                                TaskletCancellationRequest taskletCancellationRequest = (TaskletCancellationRequest) masterToWorkerRequest;
                                VortexWorker.LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", Integer.valueOf(taskletCancellationRequest.getTaskletId()));
                                Future future = (Future) concurrentHashMap.get(Integer.valueOf(taskletCancellationRequest.getTaskletId()));
                                if (future == null) {
                                    break;
                                } else {
                                    future.cancel(true);
                                    break;
                                }
                            default:
                                throw new RuntimeException("Unknown Command");
                        }
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        });
        this.terminated.await();
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void executeTasklet(ExecutorService executorService, final ConcurrentMap<Integer, Future> concurrentMap, MasterToWorkerRequest masterToWorkerRequest) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) masterToWorkerRequest;
        concurrentMap.put(Integer.valueOf(taskletExecutionRequest.getTaskletId()), executorService.submit(new Runnable() { // from class: org.apache.reef.vortex.evaluator.VortexWorker.2
            @Override // java.lang.Runnable
            public void run() {
                ArrayList arrayList = new ArrayList();
                try {
                    arrayList.add(new TaskletResultReport(taskletExecutionRequest.getTaskletId(), taskletExecutionRequest.execute()));
                } catch (InterruptedException e) {
                    TaskletCancelledReport taskletCancelledReport = new TaskletCancelledReport(taskletExecutionRequest.getTaskletId());
                    VortexWorker.LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", Integer.valueOf(taskletExecutionRequest.getTaskletId()));
                    arrayList.add(taskletCancelledReport);
                } catch (Exception e2) {
                    arrayList.add(new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e2));
                }
                VortexWorker.this.workerReports.addLast(VortexWorker.this.kryoUtils.serialize(new WorkerToMasterReports(arrayList)));
                try {
                    countDownLatch.await();
                    concurrentMap.remove(Integer.valueOf(taskletExecutionRequest.getTaskletId()));
                    VortexWorker.this.heartBeatTriggerManager.triggerHeartBeat();
                } catch (InterruptedException e3) {
                    VortexWorker.LOG.log(Level.SEVERE, "Cannot wait for Future to be put.");
                    throw new RuntimeException(e3);
                }
            }
        }));
        countDownLatch.countDown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void executeAggregateTasklet(ExecutorService executorService, MasterToWorkerRequest masterToWorkerRequest) {
        final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest = (TaskletAggregateExecutionRequest) masterToWorkerRequest;
        if (!$assertionsDisabled && !this.aggregates.containsKey(Integer.valueOf(taskletAggregateExecutionRequest.getAggregateFunctionId()))) {
            throw new AssertionError();
        }
        final AggregateContainer aggregateContainer = this.aggregates.get(Integer.valueOf(taskletAggregateExecutionRequest.getAggregateFunctionId()));
        final TaskletAggregationRequest taskletAggregationRequest = aggregateContainer.getTaskletAggregationRequest();
        executorService.submit(new Runnable() { // from class: org.apache.reef.vortex.evaluator.VortexWorker.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    aggregateContainer.scheduleTasklet(taskletAggregateExecutionRequest.getTaskletId());
                    aggregateContainer.taskletComplete(taskletAggregateExecutionRequest.getTaskletId(), taskletAggregationRequest.executeFunction(taskletAggregateExecutionRequest.getInput()));
                } catch (Exception e) {
                    aggregateContainer.taskletFailed(taskletAggregateExecutionRequest.getTaskletId(), e);
                }
            }
        });
    }

    public Optional<TaskMessage> getMessage() {
        byte[] pollFirst = this.workerReports.pollFirst();
        return pollFirst != null ? Optional.of(TaskMessage.from(MESSAGE_SOURCE_ID, pollFirst)) : Optional.empty();
    }

    static {
        $assertionsDisabled = !VortexWorker.class.desiredAssertionStatus();
        LOG = Logger.getLogger(VortexWorker.class.getName());
    }
}
