package org.apache.nemo.runtime.executor.task;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.Edge;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
import org.apache.nemo.common.punctuation.Finishmark;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.common.plan.Task;
import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.executor.MetricMessageSender;
import org.apache.nemo.runtime.executor.TaskStateManager;
import org.apache.nemo.runtime.executor.TransformContextImpl;
import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
import org.apache.nemo.runtime.executor.datatransfer.DataFetcherOutputCollector;
import org.apache.nemo.runtime.executor.datatransfer.DynOptDataOutputCollector;
import org.apache.nemo.runtime.executor.datatransfer.InputReader;
import org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager;
import org.apache.nemo.runtime.executor.datatransfer.IntermediateDataIOFactory;
import org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager;
import org.apache.nemo.runtime.executor.datatransfer.NextIntraTaskOperatorInfo;
import org.apache.nemo.runtime.executor.datatransfer.OperatorVertexOutputCollector;
import org.apache.nemo.runtime.executor.datatransfer.OperatorWatermarkCollector;
import org.apache.nemo.runtime.executor.datatransfer.OutputWriter;
import org.apache.nemo.runtime.executor.datatransfer.PipeInputReader;
import org.apache.nemo.runtime.executor.datatransfer.SingleInputWatermarkManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/executor/task/TaskExecutor.class */
public final class TaskExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class.getName());
    private final String taskId;
    private final TaskStateManager taskStateManager;
    private final List<DataFetcher> dataFetchers;
    private final BroadcastManagerWorker broadcastManagerWorker;
    private final List<VertexHarness> sortedHarnesses;
    private final MetricMessageSender metricMessageSender;
    private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
    private long boundedSourceReadTime = 0;
    private long serializedReadBytes = 0;
    private long encodedReadBytes = 0;
    private boolean isExecuted = false;
    private String idOfVertexPutOnHold = null;

    public TaskExecutor(Task task, DAG<IRVertex, RuntimeEdge<IRVertex>> dag, TaskStateManager taskStateManager, IntermediateDataIOFactory intermediateDataIOFactory, BroadcastManagerWorker broadcastManagerWorker, MetricMessageSender metricMessageSender, PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
        this.taskId = task.getTaskId();
        this.taskStateManager = taskStateManager;
        this.broadcastManagerWorker = broadcastManagerWorker;
        this.metricMessageSender = metricMessageSender;
        this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
        Pair<List<DataFetcher>, List<VertexHarness>> prepare = prepare(task, dag, intermediateDataIOFactory);
        this.dataFetchers = (List) prepare.left();
        this.sortedHarnesses = (List) prepare.right();
    }

    private List<Edge> getAllIncomingEdges(Task task, DAG<IRVertex, RuntimeEdge<IRVertex>> dag, IRVertex iRVertex) {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(dag.getIncomingEdgesOf(iRVertex));
        arrayList.addAll((List) task.getTaskIncomingEdges().stream().filter(stageEdge -> {
            return stageEdge.getDstIRVertex().getId().equals(iRVertex.getId());
        }).collect(Collectors.toList()));
        return arrayList;
    }

    private Pair<List<DataFetcher>, List<VertexHarness>> prepare(Task task, DAG<IRVertex, RuntimeEdge<IRVertex>> dag, IntermediateDataIOFactory intermediateDataIOFactory) {
        int indexFromTaskId = RuntimeIdManager.getIndexFromTaskId(task.getTaskId());
        List reverse = Lists.reverse(dag.getTopologicalSort());
        HashMap hashMap = new HashMap();
        reverse.forEach(iRVertex -> {
            List<Edge> allIncomingEdges = getAllIncomingEdges(task, dag, iRVertex);
            for (int i = 0; i < allIncomingEdges.size(); i++) {
                hashMap.putIfAbsent(allIncomingEdges.get(i), Integer.valueOf(i));
            }
        });
        HashMap hashMap2 = new HashMap();
        reverse.forEach(iRVertex2 -> {
            if (iRVertex2 instanceof OperatorVertex) {
                List<Edge> allIncomingEdges = getAllIncomingEdges(task, dag, iRVertex2);
                if (allIncomingEdges.size() == 1) {
                    hashMap2.putIfAbsent(iRVertex2, new SingleInputWatermarkManager(new OperatorWatermarkCollector((OperatorVertex) iRVertex2)));
                } else {
                    hashMap2.putIfAbsent(iRVertex2, new MultiInputWatermarkManager(allIncomingEdges.size(), new OperatorWatermarkCollector((OperatorVertex) iRVertex2)));
                }
            }
        });
        ArrayList arrayList = new ArrayList();
        HashMap hashMap3 = new HashMap();
        reverse.forEach(iRVertex3 -> {
            Optional<Readable> sourceVertexReader = getSourceVertexReader(iRVertex3, task.getIrVertexIdToReadable());
            if (sourceVertexReader.isPresent() != (iRVertex3 instanceof SourceVertex)) {
                throw new IllegalStateException(iRVertex3.toString());
            }
            Map<String, List<NextIntraTaskOperatorInfo>> internalAdditionalOutputMap = getInternalAdditionalOutputMap(iRVertex3, dag, hashMap, hashMap2);
            Map<String, List<OutputWriter>> externalAdditionalOutputMap = getExternalAdditionalOutputMap(iRVertex3, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
            List<NextIntraTaskOperatorInfo> internalMainOutputs = getInternalMainOutputs(iRVertex3, dag, hashMap, hashMap2);
            List<OutputWriter> externalMainOutputs = getExternalMainOutputs(iRVertex3, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
            OutputCollector dynOptDataOutputCollector = ((iRVertex3 instanceof OperatorVertex) && (((OperatorVertex) iRVertex3).getTransform() instanceof AggregateMetricTransform)) ? new DynOptDataOutputCollector(iRVertex3, this.persistentConnectionToMasterMap, this) : new OperatorVertexOutputCollector(iRVertex3, internalMainOutputs, internalAdditionalOutputMap, externalMainOutputs, externalAdditionalOutputMap);
            VertexHarness vertexHarness = new VertexHarness(iRVertex3, dynOptDataOutputCollector, new TransformContextImpl(this.broadcastManagerWorker), externalMainOutputs, externalAdditionalOutputMap);
            prepareTransform(vertexHarness);
            hashMap3.put(iRVertex3.getId(), vertexHarness);
            if (iRVertex3 instanceof SourceVertex) {
                arrayList.add(new SourceVertexDataFetcher((SourceVertex) iRVertex3, sourceVertexReader.get(), dynOptDataOutputCollector));
            }
            task.getTaskIncomingEdges().stream().filter(stageEdge -> {
                return stageEdge.getDstIRVertex().getId().equals(iRVertex3.getId());
            }).map(stageEdge2 -> {
                return Pair.of(stageEdge2, intermediateDataIOFactory.createReader(indexFromTaskId, stageEdge2.getSrcIRVertex(), stageEdge2));
            }).forEach(pair -> {
                if (iRVertex3 instanceof OperatorVertex) {
                    int intValue = ((Integer) hashMap.get((StageEdge) pair.left())).intValue();
                    InputWatermarkManager inputWatermarkManager = (InputWatermarkManager) hashMap2.get(iRVertex3);
                    InputReader inputReader = (InputReader) pair.right();
                    DataFetcherOutputCollector dataFetcherOutputCollector = new DataFetcherOutputCollector((OperatorVertex) iRVertex3, intValue, inputWatermarkManager);
                    if (inputReader instanceof PipeInputReader) {
                        arrayList.add(new MultiThreadParentTaskDataFetcher(inputReader.getSrcIrVertex(), inputReader, dataFetcherOutputCollector));
                    } else {
                        arrayList.add(new ParentTaskDataFetcher(inputReader.getSrcIrVertex(), inputReader, dataFetcherOutputCollector));
                    }
                }
            });
        });
        return Pair.of(arrayList, (List) dag.getTopologicalSort().stream().map(iRVertex4 -> {
            return (VertexHarness) hashMap3.get(iRVertex4.getId());
        }).collect(Collectors.toList()));
    }

    private void processElement(OutputCollector outputCollector, Object obj) {
        outputCollector.emit(obj);
    }

    private void processWatermark(OutputCollector outputCollector, Watermark watermark) {
        outputCollector.emitWatermark(watermark);
    }

    public void execute() {
        try {
            doExecute();
        } catch (Throwable th) {
            this.taskStateManager.onTaskStateChanged(TaskState.State.FAILED, Optional.empty(), Optional.empty());
            LOG.error(ExceptionUtils.getStackTrace(th));
        }
    }

    private void doExecute() {
        if (this.isExecuted) {
            throw new RuntimeException("Task {" + this.taskId + "} execution called again");
        }
        LOG.info("{} started", this.taskId);
        this.taskStateManager.onTaskStateChanged(TaskState.State.EXECUTING, Optional.empty(), Optional.empty());
        if (handleDataFetchers(this.dataFetchers)) {
            this.metricMessageSender.send("TaskMetric", this.taskId, "boundedSourceReadTime", SerializationUtils.serialize(Long.valueOf(this.boundedSourceReadTime)));
            this.metricMessageSender.send("TaskMetric", this.taskId, "serializedReadBytes", SerializationUtils.serialize(Long.valueOf(this.serializedReadBytes)));
            this.metricMessageSender.send("TaskMetric", this.taskId, "encodedReadBytes", SerializationUtils.serialize(Long.valueOf(this.encodedReadBytes)));
            Iterator<VertexHarness> it = this.sortedHarnesses.iterator();
            while (it.hasNext()) {
                finalizeVertex(it.next());
            }
            if (this.idOfVertexPutOnHold == null) {
                this.taskStateManager.onTaskStateChanged(TaskState.State.COMPLETE, Optional.empty(), Optional.empty());
                LOG.info("{} completed", this.taskId);
            } else {
                this.taskStateManager.onTaskStateChanged(TaskState.State.ON_HOLD, Optional.of(this.idOfVertexPutOnHold), Optional.empty());
                LOG.info("{} on hold", this.taskId);
            }
        }
    }

    private void finalizeVertex(VertexHarness vertexHarness) {
        closeTransform(vertexHarness);
        finalizeOutputWriters(vertexHarness);
    }

    private void onEventFromDataFetcher(Object obj, DataFetcher dataFetcher) {
        if (!(obj instanceof Finishmark)) {
            if (obj instanceof Watermark) {
                processWatermark(dataFetcher.getOutputCollector(), (Watermark) obj);
                return;
            } else {
                processElement(dataFetcher.getOutputCollector(), obj);
                return;
            }
        }
        if (dataFetcher instanceof SourceVertexDataFetcher) {
            this.boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
            return;
        }
        if (dataFetcher instanceof ParentTaskDataFetcher) {
            this.serializedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getSerializedBytes();
            this.encodedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getEncodedBytes();
        } else if (dataFetcher instanceof MultiThreadParentTaskDataFetcher) {
            this.serializedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getSerializedBytes();
            this.encodedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getEncodedBytes();
        }
    }

    private boolean isPollingTime(long j, long j2, long j3) {
        return j2 - j3 >= j;
    }

    private boolean handleDataFetchers(List<DataFetcher> list) {
        LinkedList linkedList = new LinkedList(list);
        LinkedList linkedList2 = new LinkedList();
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            if (linkedList.isEmpty() && linkedList2.isEmpty()) {
                list.forEach(dataFetcher -> {
                    try {
                        dataFetcher.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                    }
                });
                return true;
            }
            Iterator it = linkedList.iterator();
            while (it.hasNext()) {
                DataFetcher dataFetcher2 = (DataFetcher) it.next();
                try {
                    Object fetchDataElement = dataFetcher2.fetchDataElement();
                    onEventFromDataFetcher(fetchDataElement, dataFetcher2);
                    if (fetchDataElement instanceof Finishmark) {
                        it.remove();
                    }
                } catch (IOException e) {
                    this.taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY, Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
                    LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", this.taskId, e);
                    return false;
                } catch (NoSuchElementException e2) {
                    it.remove();
                    linkedList2.add(dataFetcher2);
                }
            }
            Iterator it2 = linkedList2.iterator();
            long currentTimeMillis2 = System.currentTimeMillis();
            if (isPollingTime(100L, currentTimeMillis2, currentTimeMillis)) {
                currentTimeMillis = currentTimeMillis2;
                while (it2.hasNext()) {
                    DataFetcher dataFetcher3 = (DataFetcher) it2.next();
                    try {
                        Object fetchDataElement2 = dataFetcher3.fetchDataElement();
                        onEventFromDataFetcher(fetchDataElement2, dataFetcher3);
                        it2.remove();
                        if (!(fetchDataElement2 instanceof Finishmark)) {
                            linkedList.add(dataFetcher3);
                        }
                    } catch (IOException e3) {
                        this.taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY, Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
                        LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", this.taskId, e3);
                        return false;
                    } catch (NoSuchElementException e4) {
                    }
                }
            }
            if (linkedList.isEmpty() && !linkedList2.isEmpty()) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e5) {
                    e5.printStackTrace();
                    throw new RuntimeException(e5);
                }
            }
        }
    }

    private Map<String, List<OutputWriter>> getExternalAdditionalOutputMap(IRVertex iRVertex, List<StageEdge> list, IntermediateDataIOFactory intermediateDataIOFactory) {
        HashMap hashMap = new HashMap();
        list.stream().filter(stageEdge -> {
            return stageEdge.getSrcIRVertex().getId().equals(iRVertex.getId());
        }).filter(stageEdge2 -> {
            return stageEdge2.getPropertyValue(AdditionalOutputTagProperty.class).isPresent();
        }).map(stageEdge3 -> {
            return Pair.of(stageEdge3.getPropertyValue(AdditionalOutputTagProperty.class).get(), intermediateDataIOFactory.createWriter(this.taskId, stageEdge3));
        }).forEach(pair -> {
            hashMap.putIfAbsent(pair.left(), new ArrayList());
            ((List) hashMap.get(pair.left())).add(pair.right());
        });
        return hashMap;
    }

    private Map<String, List<NextIntraTaskOperatorInfo>> getInternalAdditionalOutputMap(IRVertex iRVertex, DAG<IRVertex, RuntimeEdge<IRVertex>> dag, Map<Edge, Integer> map, Map<IRVertex, InputWatermarkManager> map2) {
        HashMap hashMap = new HashMap();
        dag.getOutgoingEdgesOf(iRVertex.getId()).stream().filter(runtimeEdge -> {
            return runtimeEdge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent();
        }).map(runtimeEdge2 -> {
            String str = (String) runtimeEdge2.getPropertyValue(AdditionalOutputTagProperty.class).get();
            int intValue = ((Integer) map.get(runtimeEdge2)).intValue();
            OperatorVertex dst = runtimeEdge2.getDst();
            return Pair.of(str, new NextIntraTaskOperatorInfo(intValue, dst, (InputWatermarkManager) map2.get(dst)));
        }).forEach(pair -> {
            hashMap.putIfAbsent(pair.left(), new ArrayList());
            ((List) hashMap.get(pair.left())).add(pair.right());
        });
        return hashMap;
    }

    private List<NextIntraTaskOperatorInfo> getInternalMainOutputs(IRVertex iRVertex, DAG<IRVertex, RuntimeEdge<IRVertex>> dag, Map<Edge, Integer> map, Map<IRVertex, InputWatermarkManager> map2) {
        return (List) dag.getOutgoingEdgesOf(iRVertex.getId()).stream().filter(runtimeEdge -> {
            return !runtimeEdge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent();
        }).map(runtimeEdge2 -> {
            int intValue = ((Integer) map.get(runtimeEdge2)).intValue();
            OperatorVertex dst = runtimeEdge2.getDst();
            return new NextIntraTaskOperatorInfo(intValue, dst, (InputWatermarkManager) map2.get(dst));
        }).collect(Collectors.toList());
    }

    private List<OutputWriter> getExternalMainOutputs(IRVertex iRVertex, List<StageEdge> list, IntermediateDataIOFactory intermediateDataIOFactory) {
        return (List) list.stream().filter(stageEdge -> {
            return stageEdge.getSrcIRVertex().getId().equals(iRVertex.getId());
        }).filter(stageEdge2 -> {
            return !stageEdge2.getPropertyValue(AdditionalOutputTagProperty.class).isPresent();
        }).map(stageEdge3 -> {
            return intermediateDataIOFactory.createWriter(this.taskId, stageEdge3);
        }).collect(Collectors.toList());
    }

    private Optional<Readable> getSourceVertexReader(IRVertex iRVertex, Map<String, Readable> map) {
        if (!(iRVertex instanceof SourceVertex)) {
            return Optional.empty();
        }
        Readable readable = map.get(iRVertex.getId());
        if (readable == null) {
            throw new IllegalStateException(iRVertex.toString());
        }
        return Optional.of(readable);
    }

    private List<InputReader> getParentTaskReaders(int i, List<StageEdge> list, IntermediateDataIOFactory intermediateDataIOFactory) {
        return (List) list.stream().map(stageEdge -> {
            return intermediateDataIOFactory.createReader(i, stageEdge.getSrcIRVertex(), stageEdge);
        }).collect(Collectors.toList());
    }

    private void prepareTransform(VertexHarness vertexHarness) {
        OperatorVertex iRVertex = vertexHarness.getIRVertex();
        if (iRVertex instanceof OperatorVertex) {
            iRVertex.getTransform().prepare(vertexHarness.getContext(), vertexHarness.getOutputCollector());
        }
    }

    private void closeTransform(VertexHarness vertexHarness) {
        OperatorVertex iRVertex = vertexHarness.getIRVertex();
        if (iRVertex instanceof OperatorVertex) {
            iRVertex.getTransform().close();
        }
        vertexHarness.getContext().getSerializedData().ifPresent(str -> {
            this.persistentConnectionToMasterMap.getMessageSender("RUNTIME_MASTER_MESSAGE_LISTENER_ID").send(ControlMessage.Message.newBuilder().setId(RuntimeIdManager.generateMessageId()).setListenerId("RUNTIME_MASTER_MESSAGE_LISTENER_ID").setType(ControlMessage.MessageType.ExecutorDataCollected).setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(str).build()).build());
        });
    }

    public void setIRVertexPutOnHold(IRVertex iRVertex) {
        this.idOfVertexPutOnHold = iRVertex.getId();
    }

    private void finalizeOutputWriters(VertexHarness vertexHarness) {
        ArrayList arrayList = new ArrayList();
        vertexHarness.getWritersToMainChildrenTasks().forEach(outputWriter -> {
            outputWriter.close();
            Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
            arrayList.getClass();
            writtenBytes.ifPresent((v1) -> {
                r1.add(v1);
            });
        });
        vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(list -> {
            list.forEach(outputWriter2 -> {
                outputWriter2.close();
                Optional<Long> writtenBytes = outputWriter2.getWrittenBytes();
                arrayList.getClass();
                writtenBytes.ifPresent((v1) -> {
                    r1.add(v1);
                });
            });
        });
        long j = 0;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            j += ((Long) it.next()).longValue();
        }
        this.metricMessageSender.send("TaskMetric", this.taskId, "writtenBytes", SerializationUtils.serialize(Long.valueOf(j)));
    }
}
