package cascading.flow.local.planner;

import cascading.flow.FlowNode;
import cascading.flow.FlowProcess;
import cascading.flow.SliceCounters;
import cascading.flow.local.LocalFlowStep;
import cascading.flow.local.stream.graph.LocalStepStreamGraph;
import cascading.flow.stream.duct.Duct;
import cascading.flow.stream.graph.StreamGraph;
import cascading.util.LogUtil;
import cascading.util.Util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/flow/local/planner/LocalStepRunner.class */
public class LocalStepRunner implements Callable<Throwable> {
    private static final Logger LOG = LoggerFactory.getLogger(LocalStepRunner.class);
    private final FlowProcess<Properties> currentProcess;
    private final FlowNode flowNode;
    private final StreamGraph streamGraph;
    private final Collection<Duct> heads;
    private volatile boolean started = false;
    private volatile boolean completed = false;
    private volatile boolean successful = false;
    private volatile boolean stopped = false;
    private Throwable throwable = null;
    private Semaphore markComplete = new Semaphore(0);
    private List<Future<Throwable>> futures = Collections.emptyList();

    public LocalStepRunner(FlowProcess<Properties> flowProcess, LocalFlowStep localFlowStep) {
        this.currentProcess = flowProcess;
        this.flowNode = (FlowNode) Util.getFirst(localFlowStep.getFlowNodeGraph().vertexSet());
        this.streamGraph = new LocalStepStreamGraph(this.currentProcess, localFlowStep, this.flowNode);
        this.heads = this.streamGraph.getHeads();
    }

    public FlowProcess<Properties> getFlowProcess() {
        return this.currentProcess;
    }

    public boolean isCompleted() {
        return this.completed;
    }

    public void blockUntilStopped() {
        if (!this.started || this.completed) {
            return;
        }
        this.stopped = true;
        Iterator<Future<Throwable>> it = this.futures.iterator();
        while (it.hasNext()) {
            it.next().cancel(true);
        }
        try {
            this.markComplete.acquire();
        } catch (InterruptedException e) {
        }
    }

    public boolean isStopped() {
        return this.stopped;
    }

    public boolean isSuccessful() {
        return this.successful;
    }

    public Throwable getThrowable() {
        return this.throwable;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    /* JADX WARN: Finally extract failed */
    @Override // java.util.concurrent.Callable
    public Throwable call() throws Exception {
        long currentTimeMillis;
        this.started = true;
        boolean z = false;
        long j = 0;
        try {
            try {
                this.streamGraph.prepare();
                LogUtil.logMemory(LOG, "flow node id: " + this.flowNode.getID() + ", mem on start");
                if (this.stopped) {
                    this.markComplete.release();
                    if (0 == 0) {
                        try {
                            try {
                                this.streamGraph.cleanup();
                            } catch (Throwable th) {
                                if (!(th instanceof OutOfMemoryError)) {
                                    LOG.error("unable to cleanup operation graph", th);
                                }
                                if (this.throwable == null) {
                                    this.throwable = th;
                                }
                                this.successful = false;
                                this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis);
                                this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis - 0);
                                this.markComplete.release();
                            }
                        } catch (Throwable th2) {
                            long currentTimeMillis2 = System.currentTimeMillis();
                            this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis2);
                            this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis2 - 0);
                            this.markComplete.release();
                            throw th2;
                        }
                    }
                    this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis);
                    this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis - 0);
                    this.markComplete.release();
                    String str = "flow node id: " + this.flowNode.getID();
                    LogUtil.logMemory(LOG, str + ", mem on close");
                    LogUtil.logCounters(LOG, str + ", counter:", this.currentProcess);
                    return null;
                }
                j = System.currentTimeMillis();
                this.currentProcess.increment(SliceCounters.Process_Begin_Time, j);
                try {
                    this.futures = spawnHeads();
                    Iterator<Future<Throwable>> it = this.futures.iterator();
                    while (it.hasNext()) {
                        try {
                            this.throwable = it.next().get();
                            if (this.throwable != null) {
                                break;
                            }
                        } catch (CancellationException e) {
                        }
                    }
                    this.futures = Collections.emptyList();
                } catch (Throwable th3) {
                    if (!(th3 instanceof OutOfMemoryError)) {
                        LOG.error("unable to complete step", th3);
                    }
                    this.throwable = th3;
                }
                try {
                    z = true;
                    if (!(this.throwable instanceof OutOfMemoryError)) {
                        this.streamGraph.cleanup();
                    }
                } catch (Throwable th4) {
                    if (!(th4 instanceof OutOfMemoryError)) {
                        LOG.error("unable to cleanup operation graph", th4);
                    }
                    if (this.throwable == null) {
                        this.throwable = th4;
                    }
                }
                this.completed = true;
                this.successful = this.throwable == null;
                Throwable th5 = this.throwable;
                if (!z) {
                    try {
                        try {
                            this.streamGraph.cleanup();
                        } catch (Throwable th6) {
                            if (!(th6 instanceof OutOfMemoryError)) {
                                LOG.error("unable to cleanup operation graph", th6);
                            }
                            if (this.throwable == null) {
                                this.throwable = th6;
                            }
                            this.successful = false;
                            this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis);
                            this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis - j);
                            this.markComplete.release();
                        }
                    } catch (Throwable th7) {
                        long currentTimeMillis3 = System.currentTimeMillis();
                        this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis3);
                        this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis3 - j);
                        this.markComplete.release();
                        throw th7;
                    }
                }
                this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis);
                this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis - j);
                this.markComplete.release();
                String str2 = "flow node id: " + this.flowNode.getID();
                LogUtil.logMemory(LOG, str2 + ", mem on close");
                LogUtil.logCounters(LOG, str2 + ", counter:", this.currentProcess);
                return th5;
            } catch (Throwable th8) {
                try {
                    if (!(th8 instanceof OutOfMemoryError)) {
                        LOG.error("unable to prepare operation graph", th8);
                    }
                    this.completed = true;
                    this.successful = false;
                    this.throwable = th8;
                    Throwable th9 = this.throwable;
                    this.markComplete.release();
                    if (0 == 0) {
                        try {
                            try {
                                this.streamGraph.cleanup();
                            } catch (Throwable th10) {
                                if (!(th10 instanceof OutOfMemoryError)) {
                                    LOG.error("unable to cleanup operation graph", th10);
                                }
                                if (this.throwable == null) {
                                    this.throwable = th10;
                                }
                                this.successful = false;
                                this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis);
                                this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis - 0);
                                this.markComplete.release();
                                String str3 = "flow node id: " + this.flowNode.getID();
                                LogUtil.logMemory(LOG, str3 + ", mem on close");
                                LogUtil.logCounters(LOG, str3 + ", counter:", this.currentProcess);
                                return th9;
                            }
                        } catch (Throwable th11) {
                            long currentTimeMillis4 = System.currentTimeMillis();
                            this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis4);
                            this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis4 - 0);
                            this.markComplete.release();
                            throw th11;
                        }
                    }
                    this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis);
                    this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis - 0);
                    this.markComplete.release();
                    String str32 = "flow node id: " + this.flowNode.getID();
                    LogUtil.logMemory(LOG, str32 + ", mem on close");
                    LogUtil.logCounters(LOG, str32 + ", counter:", this.currentProcess);
                    return th9;
                } catch (Throwable th12) {
                    this.markComplete.release();
                    throw th12;
                }
            }
        } catch (Throwable th13) {
            if (!z) {
                try {
                    this.streamGraph.cleanup();
                } catch (Throwable th14) {
                    if (!(th14 instanceof OutOfMemoryError)) {
                        LOG.error("unable to cleanup operation graph", th14);
                    }
                    if (this.throwable == null) {
                        this.throwable = th14;
                    }
                    this.successful = false;
                    this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis);
                    this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis - j);
                    this.markComplete.release();
                    String str4 = "flow node id: " + this.flowNode.getID();
                    LogUtil.logMemory(LOG, str4 + ", mem on close");
                    LogUtil.logCounters(LOG, str4 + ", counter:", this.currentProcess);
                    throw th13;
                } finally {
                    currentTimeMillis = System.currentTimeMillis();
                    this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis);
                    this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis - j);
                    this.markComplete.release();
                }
            }
            long currentTimeMillis5 = System.currentTimeMillis();
            this.currentProcess.increment(SliceCounters.Process_End_Time, currentTimeMillis5);
            this.currentProcess.increment(SliceCounters.Process_Duration, currentTimeMillis5 - j);
            this.markComplete.release();
            String str42 = "flow node id: " + this.flowNode.getID();
            LogUtil.logMemory(LOG, str42 + ", mem on close");
            LogUtil.logCounters(LOG, str42 + ", counter:", this.currentProcess);
            throw th13;
        }
    }

    private List<Future<Throwable>> spawnHeads() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.heads.size());
        ArrayList arrayList = new ArrayList();
        Iterator<Duct> it = this.heads.iterator();
        while (it.hasNext()) {
            arrayList.add(newFixedThreadPool.submit((Callable) it.next()));
        }
        newFixedThreadPool.shutdown();
        return arrayList;
    }
}
