package co.cask.cdap.internal.app.runtime.flow;

import co.cask.cdap.api.flow.flowlet.Callback;
import co.cask.cdap.api.flow.flowlet.FailurePolicy;
import co.cask.cdap.api.flow.flowlet.FailureReason;
import co.cask.cdap.api.flow.flowlet.InputContext;
import co.cask.cdap.app.queue.InputDatum;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.common.metrics.MetricsCollector;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.internal.app.queue.SingleItemQueueReader;
import co.cask.cdap.internal.app.runtime.DataFabricFacade;
import co.cask.cdap.internal.app.runtime.flow.ProcessMethod;
import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionFailureException;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/flow/FlowletProcessDriver.class */
public final class FlowletProcessDriver extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(FlowletProcessDriver.class);
    private final BasicFlowletContext flowletContext;
    private final DataFabricFacade dataFabricFacade;
    private final Callback txCallback;
    private final LoggingContext loggingContext;
    private final PriorityQueue<FlowletProcessEntry<?>> processQueue;
    private Thread runThread;
    private ExecutorService processExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlowletProcessDriver(BasicFlowletContext basicFlowletContext, DataFabricFacade dataFabricFacade, Callback callback, Collection<? extends ProcessSpecification<?>> collection) {
        this.flowletContext = basicFlowletContext;
        this.dataFabricFacade = dataFabricFacade;
        this.txCallback = callback;
        this.loggingContext = basicFlowletContext.getLoggingContext();
        this.processQueue = new PriorityQueue<>(collection.size());
        Iterator<? extends ProcessSpecification<?>> it = collection.iterator();
        while (it.hasNext()) {
            this.processQueue.offer(FlowletProcessEntry.create(it.next()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlowletProcessDriver(FlowletProcessDriver flowletProcessDriver) {
        Preconditions.checkArgument(flowletProcessDriver.state() == Service.State.TERMINATED, "FlowletProcessDriver is not terminated");
        this.flowletContext = flowletProcessDriver.flowletContext;
        this.dataFabricFacade = flowletProcessDriver.dataFabricFacade;
        this.txCallback = flowletProcessDriver.txCallback;
        this.loggingContext = flowletProcessDriver.loggingContext;
        this.processQueue = new PriorityQueue<>(flowletProcessDriver.processQueue.size());
        Iterables.addAll(this.processQueue, flowletProcessDriver.processQueue);
    }

    protected String getServiceName() {
        return getClass().getSimpleName() + "-" + this.flowletContext.getName() + "-" + this.flowletContext.getInstanceId();
    }

    protected void startUp() throws Exception {
        this.runThread = Thread.currentThread();
        this.processExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory(getServiceName() + "-executor"));
    }

    protected void shutDown() throws Exception {
        this.processExecutor.shutdown();
    }

    protected void triggerShutdown() {
        this.runThread.interrupt();
    }

    protected void run() throws Exception {
        LoggingContextAccessor.setLoggingContext(this.loggingContext);
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(this.processQueue.size() * 2);
        Runnable createProcessRunner = createProcessRunner(this.processQueue, newArrayListWithExpectedSize, this.flowletContext.getProgram().getClassLoader());
        while (isRunning()) {
            try {
                this.processQueue.peek().await();
                newArrayListWithExpectedSize.clear();
                drainQueue(this.processQueue, newArrayListWithExpectedSize);
                Future<?> submit = this.processExecutor.submit(createProcessRunner);
                while (!submit.isDone()) {
                    try {
                        Uninterruptibles.getUninterruptibly(submit, 30L, TimeUnit.SECONDS);
                    } catch (ExecutionException e) {
                        LOG.error("Unexpected execution exception.", e);
                    } catch (TimeoutException e2) {
                        if (!isRunning()) {
                            LOG.info("Flowlet {} takes longer than 30 seconds to quit. Force quitting.", this.flowletContext.getFlowletId());
                            submit.cancel(true);
                        }
                    }
                }
            } catch (InterruptedException e3) {
            }
        }
    }

    private void drainQueue(PriorityQueue<FlowletProcessEntry<?>> priorityQueue, List<? super FlowletProcessEntry<?>> list) {
        FlowletProcessEntry<?> poll = priorityQueue.poll();
        while (true) {
            FlowletProcessEntry<?> flowletProcessEntry = poll;
            if (flowletProcessEntry == null) {
                return;
            }
            list.add(flowletProcessEntry);
            poll = priorityQueue.poll();
        }
    }

    private Runnable createProcessRunner(final PriorityQueue<FlowletProcessEntry<?>> priorityQueue, final List<FlowletProcessEntry<?>> list, final ClassLoader classLoader) {
        return new Runnable() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver.1
            @Override // java.lang.Runnable
            public void run() {
                Thread.currentThread().setContextClassLoader(classLoader);
                for (FlowletProcessEntry flowletProcessEntry : list) {
                    if (!FlowletProcessDriver.this.handleProcessEntry(flowletProcessEntry, priorityQueue)) {
                        priorityQueue.offer(flowletProcessEntry);
                    }
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> boolean handleProcessEntry(FlowletProcessEntry<T> flowletProcessEntry, PriorityQueue<FlowletProcessEntry<?>> priorityQueue) {
        if (!flowletProcessEntry.shouldProcess()) {
            return false;
        }
        ProcessMethod<T> processMethod = flowletProcessEntry.getProcessSpec().getProcessMethod();
        if (processMethod.needsInput()) {
            this.flowletContext.getProgramMetrics().increment("process.tuples.attempt.read", 1L);
        }
        TransactionContext createTransactionManager = this.dataFabricFacade.createTransactionManager();
        try {
            createTransactionManager.start();
            try {
                InputDatum<T> dequeue = flowletProcessEntry.getProcessSpec().getQueueReader().dequeue(0L, TimeUnit.MILLISECONDS);
                if (!dequeue.needProcess()) {
                    flowletProcessEntry.backOff();
                    createTransactionManager.finish();
                    return false;
                }
                flowletProcessEntry.resetBackOff();
                postProcess(processMethodCallback(priorityQueue, flowletProcessEntry, dequeue), createTransactionManager, dequeue, processMethod.invoke(dequeue));
                return true;
            } catch (Throwable th) {
                LOG.error("System failure: {}", this.flowletContext, th);
                try {
                    createTransactionManager.abort();
                } catch (Throwable th2) {
                    LOG.error("Fail to abort transaction: {}", this.flowletContext, th2);
                }
                return false;
            }
        } catch (Throwable th3) {
            LOG.error("Failed to start transaction.", th3);
            return false;
        }
    }

    private void postProcess(ProcessMethodCallback processMethodCallback, TransactionContext transactionContext, InputDatum inputDatum, ProcessMethod.ProcessResult processResult) {
        InputContext inputContext = inputDatum.getInputContext();
        Throwable th = null;
        FailureReason.Type type = FailureReason.Type.IO_ERROR;
        try {
            if (processResult.isSuccess()) {
                if (inputDatum.getRetry() > 0) {
                    inputDatum.reclaim();
                }
                transactionContext.finish();
            } else {
                th = processResult.getCause();
                type = FailureReason.Type.USER;
                transactionContext.abort();
            }
        } catch (Throwable th2) {
            LOG.error("Transaction operation failed: {}", th2.getMessage(), th2);
            type = FailureReason.Type.IO_ERROR;
            if (0 == 0) {
                th = th2;
            }
            try {
                if (processResult.isSuccess()) {
                    transactionContext.abort();
                }
            } catch (Throwable th3) {
                LOG.error("Fail to abort transaction: {}", inputContext, th3);
            }
        }
        try {
            if (th == null) {
                processMethodCallback.onSuccess(processResult.getEvent(), inputContext);
            } else {
                processMethodCallback.onFailure(processResult.getEvent(), inputContext, new FailureReason(type, th.getMessage(), th), createInputAcknowledger(inputDatum));
            }
        } catch (Throwable th4) {
            LOG.error("Failed to invoke callback.", th4);
        }
    }

    private InputAcknowledger createInputAcknowledger(final InputDatum inputDatum) {
        return new InputAcknowledger() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver.2
            @Override // co.cask.cdap.internal.app.runtime.flow.InputAcknowledger
            public void ack() throws TransactionFailureException {
                TransactionContext createTransactionManager = FlowletProcessDriver.this.dataFabricFacade.createTransactionManager();
                createTransactionManager.start();
                inputDatum.reclaim();
                createTransactionManager.finish();
            }
        };
    }

    private <T> ProcessMethodCallback processMethodCallback(final PriorityQueue<FlowletProcessEntry<?>> priorityQueue, final FlowletProcessEntry<T> flowletProcessEntry, final InputDatum<T> inputDatum) {
        final int size = flowletProcessEntry.getProcessSpec().getProcessMethod().needsInput() ? inputDatum.size() : 1;
        return new ProcessMethodCallback() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver.3
            private final LoadingCache<String, MetricsCollector> queueMetricsCollectors = CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).build(new CacheLoader<String, MetricsCollector>() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProcessDriver.3.1
                public MetricsCollector load(String str) throws Exception {
                    return FlowletProcessDriver.this.flowletContext.getProgramMetrics().childCollector("flq", str);
                }
            });

            @Override // co.cask.cdap.internal.app.runtime.flow.ProcessMethodCallback
            public void onSuccess(Object obj, InputContext inputContext) {
                try {
                    try {
                        gaugeEventProcessed(inputDatum.getQueueName());
                        FlowletProcessDriver.this.txCallback.onSuccess(obj, inputContext);
                        enqueueEntry();
                    } catch (Throwable th) {
                        FlowletProcessDriver.LOG.error("Exception on onSuccess call: {}", FlowletProcessDriver.this.flowletContext, th);
                        enqueueEntry();
                    }
                } catch (Throwable th2) {
                    enqueueEntry();
                    throw th2;
                }
            }

            @Override // co.cask.cdap.internal.app.runtime.flow.ProcessMethodCallback
            public void onFailure(Object obj, InputContext inputContext, FailureReason failureReason, InputAcknowledger inputAcknowledger) {
                FailurePolicy failurePolicy;
                FlowletProcessDriver.LOG.warn("Process failure: {}, {}, input: {}", new Object[]{FlowletProcessDriver.this.flowletContext, failureReason.getMessage(), inputDatum, failureReason.getCause()});
                try {
                    FlowletProcessDriver.this.flowletContext.getProgramMetrics().increment("process.errors", 1L);
                    failurePolicy = FlowletProcessDriver.this.txCallback.onFailure(obj, inputContext, failureReason);
                    if (failurePolicy == null) {
                        failurePolicy = FailurePolicy.RETRY;
                        FlowletProcessDriver.LOG.info("Callback returns null for failure policy. Default to {}.", failurePolicy);
                    }
                } catch (Throwable th) {
                    FlowletProcessDriver.LOG.error("Exception on onFailure call: {}", FlowletProcessDriver.this.flowletContext, th);
                    failurePolicy = FailurePolicy.RETRY;
                }
                if (inputDatum.getRetry() >= flowletProcessEntry.getProcessSpec().getProcessMethod().getMaxRetries()) {
                    FlowletProcessDriver.LOG.info("Too many retries, ignores the input: {}", inputDatum);
                    failurePolicy = FailurePolicy.IGNORE;
                }
                if (failurePolicy == FailurePolicy.RETRY) {
                    priorityQueue.offer(flowletProcessEntry.isRetry() ? flowletProcessEntry : FlowletProcessEntry.create(flowletProcessEntry.getProcessSpec(), new ProcessSpecification(new SingleItemQueueReader(inputDatum), flowletProcessEntry.getProcessSpec().getProcessMethod(), null)));
                    return;
                }
                if (failurePolicy == FailurePolicy.IGNORE) {
                    try {
                        try {
                            gaugeEventProcessed(inputDatum.getQueueName());
                            inputAcknowledger.ack();
                            enqueueEntry();
                        } catch (Throwable th2) {
                            FlowletProcessDriver.LOG.error("Fatal problem, fail to ack an input: {}", FlowletProcessDriver.this.flowletContext, th2);
                            enqueueEntry();
                        }
                    } catch (Throwable th3) {
                        enqueueEntry();
                        throw th3;
                    }
                }
            }

            private void enqueueEntry() {
                priorityQueue.offer(flowletProcessEntry.resetRetry());
            }

            private void gaugeEventProcessed(QueueName queueName) {
                if (flowletProcessEntry.isTick()) {
                    FlowletProcessDriver.this.flowletContext.getProgramMetrics().increment("process.ticks.processed", size);
                } else if (queueName == null) {
                    FlowletProcessDriver.this.flowletContext.getProgramMetrics().increment("process.events.processed", size);
                } else {
                    ((MetricsCollector) this.queueMetricsCollectors.getUnchecked("input." + queueName.toString())).increment("process.events.processed", size);
                }
            }
        };
    }
}
