package org.apache.nemo.runtime.executor;

import com.google.protobuf.ByteString;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nemo.common.coder.BytesDecoderFactory;
import org.apache.nemo.common.coder.BytesEncoderFactory;
import org.apache.nemo.common.coder.DecoderFactory;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.exception.IllegalMessageException;
import org.apache.nemo.common.exception.UnknownFailureCauseException;
import org.apache.nemo.common.ir.edge.executionproperty.CompressionProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DecompressionProperty;
import org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageContext;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.common.message.MessageListener;
import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import org.apache.nemo.runtime.common.plan.Task;
import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
import org.apache.nemo.runtime.executor.data.SerializerManager;
import org.apache.nemo.runtime.executor.datatransfer.IntermediateDataIOFactory;
import org.apache.nemo.runtime.executor.datatransfer.NemoEventDecoderFactory;
import org.apache.nemo.runtime.executor.datatransfer.NemoEventEncoderFactory;
import org.apache.nemo.runtime.executor.task.TaskExecutor;
import org.apache.reef.tang.annotations.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/runtime/executor/Executor.class */
public final class Executor {
    private static final Logger LOG = LoggerFactory.getLogger(Executor.class.getName());
    private final String executorId;
    private final ExecutorService executorService = Executors.newCachedThreadPool(new BasicThreadFactory.Builder().namingPattern("TaskExecutor thread-%d").build());
    private final SerializerManager serializerManager;
    private final IntermediateDataIOFactory intermediateDataIOFactory;
    private final BroadcastManagerWorker broadcastManagerWorker;
    private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
    private final MetricMessageSender metricMessageSender;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.nemo.runtime.executor.Executor$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/nemo/runtime/executor/Executor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType = new int[ControlMessage.MessageType.values().length];

        static {
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[ControlMessage.MessageType.ScheduleTask.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[ControlMessage.MessageType.RequestMetricFlush.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/nemo/runtime/executor/Executor$ExecutorMessageReceiver.class */
    private final class ExecutorMessageReceiver implements MessageListener<ControlMessage.Message> {
        private ExecutorMessageReceiver() {
        }

        public void onMessage(ControlMessage.Message message) {
            switch (AnonymousClass1.$SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[message.getType().ordinal()]) {
                case 1:
                    Executor.this.onTaskReceived((Task) SerializationUtils.deserialize(message.getScheduleTaskMsg().getTask().toByteArray()));
                    return;
                case 2:
                    Executor.this.metricMessageSender.flush();
                    return;
                default:
                    throw new IllegalMessageException(new Exception("This message should not be received by an executor :" + message.getType()));
            }
        }

        public void onMessageWithContext(ControlMessage.Message message, MessageContext messageContext) {
            switch (AnonymousClass1.$SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[message.getType().ordinal()]) {
                default:
                    throw new IllegalMessageException(new Exception("This message should not be requested to an executor :" + message.getType()));
            }
        }

        /* synthetic */ ExecutorMessageReceiver(Executor executor, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    @Inject
    private Executor(@Parameter(JobConf.ExecutorId.class) String str, PersistentConnectionToMasterMap persistentConnectionToMasterMap, MessageEnvironment messageEnvironment, SerializerManager serializerManager, IntermediateDataIOFactory intermediateDataIOFactory, BroadcastManagerWorker broadcastManagerWorker, MetricManagerWorker metricManagerWorker) {
        this.executorId = str;
        this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
        this.serializerManager = serializerManager;
        this.intermediateDataIOFactory = intermediateDataIOFactory;
        this.broadcastManagerWorker = broadcastManagerWorker;
        this.metricMessageSender = metricManagerWorker;
        messageEnvironment.setupListener("EXECUTOR_MESSAGE_LISTENER_ID", new ExecutorMessageReceiver(this, null));
    }

    public String getExecutorId() {
        return this.executorId;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void onTaskReceived(Task task) {
        LOG.debug("Executor [{}] received Task [{}] to execute.", new Object[]{this.executorId, task.getTaskId()});
        this.executorService.execute(() -> {
            launchTask(task);
        });
    }

    private void launchTask(Task task) {
        LOG.info("Launch task: {}", task.getTaskId());
        try {
            DAG dag = (DAG) SerializationUtils.deserialize(task.getSerializedIRDag());
            TaskStateManager taskStateManager = new TaskStateManager(task, this.executorId, this.persistentConnectionToMasterMap, this.metricMessageSender);
            task.getTaskIncomingEdges().forEach(stageEdge -> {
                this.serializerManager.register(stageEdge.getId(), getEncoderFactory((EncoderFactory) stageEdge.getPropertyValue(EncoderProperty.class).get()), getDecoderFactory((DecoderFactory) stageEdge.getPropertyValue(DecoderProperty.class).get()), (CompressionProperty.Value) stageEdge.getPropertyValue(CompressionProperty.class).orElse(null), (CompressionProperty.Value) stageEdge.getPropertyValue(DecompressionProperty.class).orElse(null));
            });
            task.getTaskOutgoingEdges().forEach(stageEdge2 -> {
                this.serializerManager.register(stageEdge2.getId(), getEncoderFactory((EncoderFactory) stageEdge2.getPropertyValue(EncoderProperty.class).get()), getDecoderFactory((DecoderFactory) stageEdge2.getPropertyValue(DecoderProperty.class).get()), (CompressionProperty.Value) stageEdge2.getPropertyValue(CompressionProperty.class).orElse(null), (CompressionProperty.Value) stageEdge2.getPropertyValue(DecompressionProperty.class).orElse(null));
            });
            dag.getVertices().forEach(iRVertex -> {
                dag.getOutgoingEdgesOf(iRVertex).forEach(runtimeEdge -> {
                    this.serializerManager.register(runtimeEdge.getId(), getEncoderFactory((EncoderFactory) runtimeEdge.getPropertyValue(EncoderProperty.class).get()), getDecoderFactory((DecoderFactory) runtimeEdge.getPropertyValue(DecoderProperty.class).get()), (CompressionProperty.Value) runtimeEdge.getPropertyValue(CompressionProperty.class).orElse(null), (CompressionProperty.Value) runtimeEdge.getPropertyValue(DecompressionProperty.class).orElse(null));
                });
            });
            new TaskExecutor(task, dag, taskStateManager, this.intermediateDataIOFactory, this.broadcastManagerWorker, this.metricMessageSender, this.persistentConnectionToMasterMap).execute();
        } catch (Exception e) {
            this.persistentConnectionToMasterMap.getMessageSender("RUNTIME_MASTER_MESSAGE_LISTENER_ID").send(ControlMessage.Message.newBuilder().setId(RuntimeIdManager.generateMessageId()).setListenerId("RUNTIME_MASTER_MESSAGE_LISTENER_ID").setType(ControlMessage.MessageType.ExecutorFailed).setExecutorFailedMsg(ControlMessage.ExecutorFailedMsg.newBuilder().setExecutorId(this.executorId).setException(ByteString.copyFrom(SerializationUtils.serialize(e))).build()).build());
            throw e;
        }
    }

    private EncoderFactory getEncoderFactory(EncoderFactory encoderFactory) {
        return encoderFactory instanceof BytesEncoderFactory ? encoderFactory : new NemoEventEncoderFactory(encoderFactory);
    }

    private DecoderFactory getDecoderFactory(DecoderFactory decoderFactory) {
        return decoderFactory instanceof BytesDecoderFactory ? decoderFactory : new NemoEventDecoderFactory(decoderFactory);
    }

    public void terminate() {
        try {
            this.metricMessageSender.close();
        } catch (UnknownFailureCauseException e) {
            throw new UnknownFailureCauseException(new Exception("Closing MetricManagerWorker failed in executor " + this.executorId));
        }
    }
}
