package cascading.flow.tez;

import cascading.CascadingException;
import cascading.flow.FlowElements;
import cascading.flow.FlowException;
import cascading.flow.FlowNode;
import cascading.flow.FlowSession;
import cascading.flow.SliceCounters;
import cascading.flow.StepCounters;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.planner.BaseFlowNode;
import cascading.flow.stream.element.ElementDuct;
import cascading.flow.stream.element.InputSource;
import cascading.flow.tez.stream.graph.Hadoop2TezStreamGraph;
import cascading.flow.tez.util.TezUtil;
import cascading.tap.Tap;
import cascading.util.LogUtil;
import cascading.util.Util;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/flow/tez/FlowProcessor.class */
public class FlowProcessor extends AbstractLogicalIOProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(FlowProcessor.class);
    private TezConfiguration configuration;
    private Hadoop2TezFlowProcess currentProcess;
    private FlowNode flowNode;
    private Hadoop2TezStreamGraph streamGraph;

    public FlowProcessor(ProcessorContext processorContext) {
        super(processorContext);
    }

    public void initialize() throws Exception {
        this.configuration = new TezConfiguration(TezUtils.createConfFromUserPayload(getContext().getUserPayload()));
        TezUtil.setMRProperties(getContext(), this.configuration, true);
        try {
            HadoopUtil.initLog4j(this.configuration);
            LOG.info("cascading version: {}", this.configuration.get("cascading.version", ""));
            this.currentProcess = new Hadoop2TezFlowProcess(new FlowSession(), getContext(), this.configuration);
            this.flowNode = (FlowNode) HadoopUtil.deserializeBase64(this.configuration.getRaw("cascading.flow.node"), this.configuration, BaseFlowNode.class);
            LOG.info("flow node id: {}, ordinal: {}", this.flowNode.getID(), Integer.valueOf(this.flowNode.getOrdinal()));
            LogUtil.logMemory(LOG, "flow node id: " + this.flowNode.getID() + ", mem on start");
        } catch (Throwable th) {
            if (!(th instanceof CascadingException)) {
                throw new FlowException("internal error during processor configuration", th);
            }
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    public void run(Map<String, LogicalInput> map, Map<String, LogicalOutput> map2) throws Exception {
        try {
            this.streamGraph = new Hadoop2TezStreamGraph(this.currentProcess, this.flowNode, map, map2);
            Collection<ElementDuct> heads = this.streamGraph.getHeads();
            ElementDuct streamedHead = this.streamGraph.getStreamedHead();
            for (ElementDuct elementDuct : heads) {
                Logger logger = LOG;
                Object[] objArr = new Object[3];
                objArr[0] = elementDuct.getFlowElement();
                objArr[1] = Boolean.valueOf(elementDuct == streamedHead);
                objArr[2] = FlowElements.id(elementDuct.getFlowElement());
                logger.info("sourcing from: {} streamed: {}, id: {}", objArr);
            }
            for (ElementDuct elementDuct2 : this.streamGraph.getTails()) {
                LOG.info("sinking to: {}, id: {}", elementDuct2.getFlowElement(), FlowElements.id(elementDuct2.getFlowElement()));
            }
            for (Tap tap : this.flowNode.getTraps()) {
                LOG.info("trapping to: {}, id: {}", tap, FlowElements.id(tap));
            }
            this.streamGraph.prepare();
            waitForInputsReady(map);
            long currentTimeMillis = System.currentTimeMillis();
            this.currentProcess.increment(SliceCounters.Process_Begin_Time, currentTimeMillis);
            this.currentProcess.increment(StepCounters.Process_Begin_Time, currentTimeMillis);
            for (ElementDuct elementDuct3 : heads) {
                try {
                    try {
                        try {
                            if (elementDuct3 != streamedHead) {
                                ((InputSource) elementDuct3).run((Object) null);
                                LogUtil.logMemory(LOG, "mem after accumulating source: " + elementDuct3.getFlowElement() + ", ");
                            }
                        } catch (IOException | OutOfMemoryError e) {
                            throw e;
                        }
                    } catch (Throwable th) {
                        if (!(th instanceof CascadingException)) {
                            throw new FlowException("internal error during processor execution on node: " + this.flowNode.getOrdinal(), th);
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    try {
                        this.streamGraph.cleanup();
                        long currentTimeMillis2 = System.currentTimeMillis();
                        this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis2);
                        this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis2 - currentTimeMillis);
                        this.currentProcess.increment(StepCounters.Process_End_Time, currentTimeMillis2);
                        this.currentProcess.increment(StepCounters.Process_Duration, currentTimeMillis2 - currentTimeMillis);
                        throw th2;
                    } catch (Throwable th3) {
                        long currentTimeMillis3 = System.currentTimeMillis();
                        this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis3);
                        this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis3 - currentTimeMillis);
                        this.currentProcess.increment(StepCounters.Process_End_Time, currentTimeMillis3);
                        this.currentProcess.increment(StepCounters.Process_Duration, currentTimeMillis3 - currentTimeMillis);
                        throw th3;
                    }
                }
            }
            streamedHead.run((Object) null);
            try {
                this.streamGraph.cleanup();
                long currentTimeMillis4 = System.currentTimeMillis();
                this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis4);
                this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis4 - currentTimeMillis);
                this.currentProcess.increment(StepCounters.Process_End_Time, currentTimeMillis4);
                this.currentProcess.increment(StepCounters.Process_Duration, currentTimeMillis4 - currentTimeMillis);
            } catch (Throwable th4) {
                long currentTimeMillis5 = System.currentTimeMillis();
                this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis5);
                this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis5 - currentTimeMillis);
                this.currentProcess.increment(StepCounters.Process_End_Time, currentTimeMillis5);
                this.currentProcess.increment(StepCounters.Process_Duration, currentTimeMillis5 - currentTimeMillis);
                throw th4;
            }
        } catch (Throwable th5) {
            if (!(th5 instanceof CascadingException)) {
                throw new FlowException("internal error during processor configuration", th5);
            }
            throw th5;
        }
    }

    protected void waitForInputsReady(Map<String, LogicalInput> map) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        HashSet hashSet = new HashSet(map.values());
        getContext().waitForAllInputsReady(hashSet);
        LOG.info("flow node id: {}, all {} inputs ready in: {}", new Object[]{this.flowNode.getID(), Integer.valueOf(hashSet.size()), Util.formatDurationHMSms(System.currentTimeMillis() - currentTimeMillis)});
    }

    public void handleEvents(List<Event> list) {
        LOG.debug("in events");
    }

    public void close() throws Exception {
        String str = "flow node id: " + this.flowNode.getID();
        LogUtil.logMemory(LOG, str + ", mem on close");
        LogUtil.logCounters(LOG, str + ", counter:", this.currentProcess);
    }
}
