package org.apache.tez.test;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManagerOnDemand;
import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.Writer;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.test.TestAMRecovery;
import org.apache.tez.test.dag.MultiAttemptDAG;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/test/TestExceptionPropagation.class */
public class TestExceptionPropagation {
    private static TezConfiguration tezConf;
    private static final Logger LOG = LoggerFactory.getLogger(TestExceptionPropagation.class);
    private static Configuration conf = new Configuration();
    private static MiniTezCluster miniTezCluster = null;
    private static String TEST_ROOT_DIR = "target/" + TestExceptionPropagation.class.getName() + "-tmpDir";
    private static MiniDFSCluster dfsCluster = null;
    private static FileSystem remoteFs = null;
    private static TezClient tezSession = null;
    private static TezClient tezClient = null;

    /* loaded from: input_file:org/apache/tez/test/TestExceptionPropagation$CustomEdgeManager.class */
    public static class CustomEdgeManager extends OneToOneEdgeManagerOnDemand {
        private ExceptionLocation exLocation;

        public CustomEdgeManager(EdgeManagerPluginContext edgeManagerPluginContext) {
            super(edgeManagerPluginContext);
            this.exLocation = ExceptionLocation.valueOf(new String(edgeManagerPluginContext.getUserPayload().deepCopyAsArray()));
        }

        public void initialize() {
            if (this.exLocation == ExceptionLocation.EM_Initialize) {
                throw new RuntimeException(this.exLocation.name());
            }
            try {
                super.initialize();
            } catch (TezUncheckedException e) {
                if (!e.getMessage().equals("Atleast 1 bipartite source should exist")) {
                    throw e;
                }
            }
        }

        public int getNumDestinationConsumerTasks(int i) {
            if (this.exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) {
                throw new RuntimeException(this.exLocation.name());
            }
            return super.getNumDestinationConsumerTasks(i);
        }

        public int getNumSourceTaskPhysicalOutputs(int i) {
            if (this.exLocation == ExceptionLocation.EM_GetNumSourceTaskPhysicalOutputs) {
                throw new RuntimeException(this.exLocation.name());
            }
            TestExceptionPropagation.LOG.info("ExLocation:" + this.exLocation);
            return super.getNumSourceTaskPhysicalOutputs(i);
        }

        public int getNumDestinationTaskPhysicalInputs(int i) {
            if (this.exLocation == ExceptionLocation.EM_GetNumDestinationTaskPhysicalInputs) {
                throw new RuntimeException(this.exLocation.name());
            }
            return super.getNumDestinationTaskPhysicalInputs(i);
        }

        public void routeDataMovementEventToDestination(DataMovementEvent dataMovementEvent, int i, int i2, Map<Integer, List<Integer>> map) {
            if (this.exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.routeDataMovementEventToDestination(dataMovementEvent, i, i2, map);
        }

        public void prepareForRouting() throws Exception {
            if (this.exLocation == ExceptionLocation.EM_PrepareForRouting) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.prepareForRouting();
        }

        public EdgeManagerPluginOnDemand.EventRouteMetadata routeDataMovementEventToDestination(int i, int i2, int i3) throws Exception {
            if (this.exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
                throw new RuntimeException(this.exLocation.name());
            }
            return super.routeDataMovementEventToDestination(i, i2, i3);
        }

        public EdgeManagerPluginOnDemand.EventRouteMetadata routeCompositeDataMovementEventToDestination(int i, int i2) throws Exception {
            if (this.exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
                throw new RuntimeException(this.exLocation.name());
            }
            return super.routeCompositeDataMovementEventToDestination(i, i2);
        }

        public int routeInputErrorEventToSource(InputReadErrorEvent inputReadErrorEvent, int i, int i2) {
            if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource) {
                throw new RuntimeException(this.exLocation.name());
            }
            return super.routeInputErrorEventToSource(inputReadErrorEvent, i, i2);
        }

        public int routeInputErrorEventToSource(int i, int i2) {
            if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource) {
                throw new RuntimeException(this.exLocation.name());
            }
            return super.routeInputErrorEventToSource(i, i2);
        }

        public void routeInputSourceTaskFailedEventToDestination(int i, Map<Integer, List<Integer>> map) {
            super.routeInputSourceTaskFailedEventToDestination(i, map);
        }
    }

    /* loaded from: input_file:org/apache/tez/test/TestExceptionPropagation$ExceptionLocation.class */
    public enum ExceptionLocation {
        INPUT_START,
        INPUT_GET_READER,
        INPUT_HANDLE_EVENTS,
        INPUT_CLOSE,
        INPUT_INITIALIZE,
        OUTPUT_START,
        OUTPUT_GET_WRITER,
        OUTPUT_CLOSE,
        OUTPUT_INITIALIZE,
        PROCESSOR_RUN_ERROR,
        PROCESSOR_CLOSE_ERROR,
        PROCESSOR_INITIALIZE_ERROR,
        PROCESSOR_RUN_EXCEPTION,
        PROCESSOR_CLOSE_EXCEPTION,
        PROCESSOR_INITIALIZE_EXCEPTION,
        VM_INITIALIZE,
        VM_ON_ROOTVERTEX_INITIALIZE,
        VM_ON_SOURCETASK_COMPLETED,
        VM_ON_VERTEX_STARTED,
        VM_ON_VERTEXMANAGEREVENT_RECEIVED,
        EM_Initialize,
        EM_GetNumDestinationTaskPhysicalInputs,
        EM_GetNumSourceTaskPhysicalOutputs,
        EM_RouteDataMovementEventToDestination,
        EM_GetNumDestinationConsumerTasks,
        EM_RouteInputErrorEventToSource,
        EM_PrepareForRouting,
        II_Initialize,
        II_HandleInputInitializerEvents,
        II_OnVertexStateUpdated
    }

    /* loaded from: input_file:org/apache/tez/test/TestExceptionPropagation$InputInitializerWithException.class */
    public static class InputInitializerWithException extends InputInitializer {
        private ExceptionLocation exLocation;

        public InputInitializerWithException(InputInitializerContext inputInitializerContext) {
            super(inputInitializerContext);
            this.exLocation = ExceptionLocation.valueOf(new String(getContext().getUserPayload().deepCopyAsArray()));
        }

        public List<Event> initialize() throws Exception {
            ArrayList arrayList = new ArrayList();
            arrayList.add(InputDataInformationEvent.createWithObjectPayload(0, (Object) null));
            return arrayList;
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> list) throws Exception {
        }

        public static InputInitializerDescriptor getIIDesc(UserPayload userPayload) {
            return InputInitializerDescriptor.create(InputInitializerWithException.class.getName()).setUserPayload(userPayload);
        }
    }

    /* loaded from: input_file:org/apache/tez/test/TestExceptionPropagation$InputInitializerWithException2.class */
    public static class InputInitializerWithException2 extends InputInitializer {
        private ExceptionLocation exLocation;
        private Object condition;

        public InputInitializerWithException2(InputInitializerContext inputInitializerContext) {
            super(inputInitializerContext);
            this.condition = new Object();
            this.exLocation = ExceptionLocation.valueOf(new String(getContext().getUserPayload().deepCopyAsArray()));
        }

        public List<Event> initialize() throws Exception {
            if (this.exLocation == ExceptionLocation.II_Initialize) {
                throw new Exception(this.exLocation.name());
            }
            if (this.exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
                getContext().registerForVertexStateUpdates("v1", (Set) null);
            }
            if (this.exLocation != ExceptionLocation.II_HandleInputInitializerEvents && this.exLocation != ExceptionLocation.II_OnVertexStateUpdated) {
                return null;
            }
            synchronized (this.condition) {
                this.condition.wait();
            }
            return null;
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> list) throws Exception {
            if (this.exLocation == ExceptionLocation.II_HandleInputInitializerEvents) {
                throw new RuntimeException(this.exLocation.name());
            }
        }

        public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) throws Exception {
            if (this.exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
                throw new Exception(this.exLocation.name());
            }
            super.onVertexStateUpdated(vertexStateUpdate);
        }

        public static InputInitializerDescriptor getIIDesc(UserPayload userPayload) {
            return InputInitializerDescriptor.create(InputInitializerWithException2.class.getName()).setUserPayload(userPayload);
        }
    }

    /* loaded from: input_file:org/apache/tez/test/TestExceptionPropagation$InputReadyVertexManagerWithException.class */
    public static class InputReadyVertexManagerWithException extends InputReadyVertexManager {
        private ExceptionLocation exLocation;
        private static final String Test_ExceptionLocation = "Test.ExceptionLocation";

        public InputReadyVertexManagerWithException(VertexManagerPluginContext vertexManagerPluginContext) {
            super(vertexManagerPluginContext);
        }

        public void initialize() {
            try {
                super.initialize();
            } catch (TezUncheckedException e) {
                if (!e.getMessage().equals("Atleast 1 bipartite source should exist")) {
                    throw e;
                }
            }
            try {
                this.exLocation = ExceptionLocation.valueOf(TezUtils.createConfFromUserPayload(getContext().getUserPayload()).get(Test_ExceptionLocation));
            } catch (IOException e2) {
                throw new TezUncheckedException(e2);
            }
        }

        public void onSourceTaskCompleted(TaskAttemptIdentifier taskAttemptIdentifier) {
            if (this.exLocation == ExceptionLocation.VM_ON_SOURCETASK_COMPLETED) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onSourceTaskCompleted(taskAttemptIdentifier);
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vertexManagerEvent) {
            if (this.exLocation == ExceptionLocation.VM_ON_VERTEXMANAGEREVENT_RECEIVED) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onVertexManagerEventReceived(vertexManagerEvent);
        }

        public static VertexManagerPluginDescriptor getVMDesc(ExceptionLocation exceptionLocation) throws IOException {
            Configuration configuration = new Configuration();
            configuration.set(Test_ExceptionLocation, exceptionLocation.name());
            return VertexManagerPluginDescriptor.create(InputReadyVertexManagerWithException.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(configuration));
        }
    }

    /* loaded from: input_file:org/apache/tez/test/TestExceptionPropagation$InputWithException.class */
    public static class InputWithException extends AbstractLogicalInput {
        private ExceptionLocation exLocation;
        private Object condition;

        public InputWithException(InputContext inputContext, int i) {
            super(inputContext, i);
            this.condition = new Object();
            this.exLocation = ExceptionLocation.valueOf(new String(getContext().getUserPayload().deepCopyAsArray()));
        }

        public void start() throws Exception {
            if (this.exLocation == ExceptionLocation.INPUT_START) {
                throw new Exception(this.exLocation.name());
            }
        }

        public Reader getReader() throws Exception {
            if (this.exLocation == ExceptionLocation.INPUT_HANDLE_EVENTS) {
                synchronized (this.condition) {
                    this.condition.wait();
                }
            }
            if (this.exLocation == ExceptionLocation.INPUT_GET_READER) {
                throw new Exception(this.exLocation.name());
            }
            return null;
        }

        public void handleEvents(List<Event> list) throws Exception {
            if (this.exLocation == ExceptionLocation.INPUT_HANDLE_EVENTS) {
                throw new Exception(this.exLocation.name());
            }
        }

        public List<Event> close() throws Exception {
            if (this.exLocation == ExceptionLocation.INPUT_CLOSE) {
                throw new Exception(this.exLocation.name());
            }
            return null;
        }

        public List<Event> initialize() throws Exception {
            getContext().requestInitialMemory(0L, (MemoryUpdateCallback) null);
            if (this.exLocation == ExceptionLocation.INPUT_INITIALIZE) {
                throw new Exception(this.exLocation.name());
            }
            if (!getContext().getSourceVertexName().equals("v1")) {
                return null;
            }
            if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource || this.exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) {
                return Lists.newArrayList(new Event[]{InputReadErrorEvent.create("read error", 0, 0)});
            }
            return null;
        }

        public static InputDescriptor getInputDesc(UserPayload userPayload) {
            return InputDescriptor.create(InputWithException.class.getName()).setUserPayload(userPayload);
        }
    }

    /* loaded from: input_file:org/apache/tez/test/TestExceptionPropagation$OutputWithException.class */
    public static class OutputWithException extends AbstractLogicalOutput {
        private ExceptionLocation exLocation;

        public OutputWithException(OutputContext outputContext, int i) {
            super(outputContext, i);
            this.exLocation = ExceptionLocation.valueOf(new String(getContext().getUserPayload().deepCopyAsArray()));
        }

        public void start() throws Exception {
            if (this.exLocation == ExceptionLocation.OUTPUT_START) {
                throw new Exception(this.exLocation.name());
            }
        }

        public Writer getWriter() throws Exception {
            if (this.exLocation == ExceptionLocation.OUTPUT_GET_WRITER) {
                throw new Exception(this.exLocation.name());
            }
            return null;
        }

        public void handleEvents(List<Event> list) {
        }

        public List<Event> close() throws Exception {
            if (this.exLocation == ExceptionLocation.OUTPUT_CLOSE) {
                throw new RuntimeException(this.exLocation.name());
            }
            if (this.exLocation == ExceptionLocation.VM_ON_VERTEXMANAGEREVENT_RECEIVED) {
                ArrayList arrayList = new ArrayList();
                arrayList.add(VertexManagerEvent.create("v2", ByteBuffer.wrap(new byte[0])));
                return arrayList;
            }
            if (this.exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])));
                return arrayList2;
            }
            if (this.exLocation != ExceptionLocation.II_HandleInputInitializerEvents) {
                return null;
            }
            ArrayList arrayList3 = new ArrayList();
            arrayList3.add(InputInitializerEvent.create("v2", "input2", ByteBuffer.wrap(new byte[0])));
            return arrayList3;
        }

        public List<Event> initialize() throws Exception {
            getContext().requestInitialMemory(0L, (MemoryUpdateCallback) null);
            if (this.exLocation == ExceptionLocation.OUTPUT_INITIALIZE) {
                throw new RuntimeException(this.exLocation.name());
            }
            return null;
        }

        public static OutputDescriptor getOutputDesc(UserPayload userPayload) {
            return OutputDescriptor.create(OutputWithException.class.getName()).setUserPayload(userPayload);
        }
    }

    /* loaded from: input_file:org/apache/tez/test/TestExceptionPropagation$ProcessorWithException.class */
    public static class ProcessorWithException extends AbstractLogicalIOProcessor {
        private ExceptionLocation exLocation;

        public ProcessorWithException(ProcessorContext processorContext) {
            super(processorContext);
            this.exLocation = ExceptionLocation.valueOf(new String(getContext().getUserPayload().deepCopyAsArray()));
        }

        public void run(Map<String, LogicalInput> map, Map<String, LogicalOutput> map2) throws Exception {
            InputWithException inputWithException = map.get("input");
            inputWithException.start();
            inputWithException.getReader();
            OutputWithException outputWithException = map2.get("v2");
            outputWithException.start();
            outputWithException.getWriter();
            Thread.sleep(3000L);
            if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_ERROR) {
                throw new Error(this.exLocation.name());
            }
            if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_EXCEPTION) {
                throw new Exception(this.exLocation.name());
            }
        }

        public void handleEvents(List<Event> list) {
        }

        public void close() throws Exception {
            if (this.exLocation == ExceptionLocation.PROCESSOR_CLOSE_ERROR) {
                throw new Error(this.exLocation.name());
            }
            if (this.exLocation == ExceptionLocation.PROCESSOR_CLOSE_EXCEPTION) {
                throw new Exception(this.exLocation.name());
            }
        }

        public void initialize() throws Exception {
            if (this.exLocation == ExceptionLocation.PROCESSOR_INITIALIZE_ERROR) {
                throw new Error(this.exLocation.name());
            }
            if (this.exLocation == ExceptionLocation.PROCESSOR_INITIALIZE_EXCEPTION) {
                throw new Exception(this.exLocation.name());
            }
        }

        public static ProcessorDescriptor getProcDesc(UserPayload userPayload) {
            return ProcessorDescriptor.create(ProcessorWithException.class.getName()).setUserPayload(userPayload);
        }
    }

    /* loaded from: input_file:org/apache/tez/test/TestExceptionPropagation$RootInputVertexManagerWithException.class */
    public static class RootInputVertexManagerWithException extends RootInputVertexManager {
        private ExceptionLocation exLocation;

        public RootInputVertexManagerWithException(VertexManagerPluginContext vertexManagerPluginContext) {
            super(vertexManagerPluginContext);
        }

        public void initialize() {
            super.initialize();
            this.exLocation = ExceptionLocation.valueOf(new String(getContext().getUserPayload().deepCopyAsArray()));
            if (this.exLocation == ExceptionLocation.VM_INITIALIZE) {
                throw new RuntimeException(this.exLocation.name());
            }
        }

        public void onRootVertexInitialized(String str, InputDescriptor inputDescriptor, List<Event> list) {
            if (this.exLocation == ExceptionLocation.VM_ON_ROOTVERTEX_INITIALIZE) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onRootVertexInitialized(str, inputDescriptor, list);
        }

        public void onVertexStarted(List<TaskAttemptIdentifier> list) {
            if (this.exLocation == ExceptionLocation.VM_ON_VERTEX_STARTED) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onVertexStarted(list);
        }

        public static VertexManagerPluginDescriptor getVMDesc(UserPayload userPayload) {
            return VertexManagerPluginDescriptor.create(RootInputVertexManagerWithException.class.getName()).setUserPayload(userPayload);
        }
    }

    private void startMiniTezCluster() {
        LOG.info("Starting mini clusters");
        try {
            conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
            dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).racks((String[]) null).build();
            remoteFs = dfsCluster.getFileSystem();
            miniTezCluster = new MiniTezCluster(TestExceptionPropagation.class.getName(), 1, 1, 1);
            Configuration configuration = new Configuration(conf);
            configuration.setInt("yarn.resourcemanager.am.max-attempts", 4);
            configuration.set("fs.defaultFS", remoteFs.getUri().toString());
            miniTezCluster.init(configuration);
            miniTezCluster.start();
        } catch (IOException e) {
            throw new RuntimeException("problem starting mini dfs cluster", e);
        }
    }

    private void stopTezMiniCluster() {
        if (miniTezCluster != null) {
            try {
                LOG.info("Stopping MiniTezCluster");
                miniTezCluster.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (dfsCluster != null) {
            try {
                LOG.info("Stopping DFSCluster");
                dfsCluster.shutdown();
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }

    private void startSessionClient() throws Exception {
        LOG.info("Starting session");
        tezConf = new TezConfiguration();
        tezConf.setInt("tez.dag.recovery.max.unflushed.events", 0);
        tezConf.setBoolean("tez.am.node-blacklisting.enabled", false);
        tezConf.setInt("tez.am.max.app.attempts", 4);
        tezConf.setBoolean("tez.am.one-to-one.routing.use.on-demand-routing", true);
        tezConf.setInt("tez.am.resource.memory.mb", 500);
        tezConf.set("tez.am.launch.cmd-opts", " -Xmx256m");
        tezConf.setBoolean("tez.am.mode.session", true);
        tezConf.setBoolean("tez.local.mode", true);
        tezConf.set("fs.defaultFS", "file:///");
        tezConf.setBoolean("tez.runtime.optimize.local.fetch", true);
        tezSession = TezClient.create("TestExceptionPropagation", tezConf);
        tezSession.start();
    }

    private void stopSessionClient() {
        if (tezSession != null) {
            try {
                LOG.info("Stopping Tez Session");
                tezSession.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        tezSession = null;
    }

    private void startNonSessionClient() throws Exception {
        LOG.info("Starting Client");
        tezConf = new TezConfiguration(miniTezCluster.getConfig());
        tezConf.setInt("tez.dag.recovery.max.unflushed.events", 0);
        tezConf.setBoolean("tez.am.node-blacklisting.enabled", false);
        tezConf.setInt("tez.am.max.app.attempts", 4);
        tezConf.setInt("tez.am.resource.memory.mb", 500);
        tezConf.set("tez.am.launch.cmd-opts", " -Xmx256m");
        tezConf.setBoolean("tez.am.mode.session", false);
        tezClient = TezClient.create("TestExceptionPropagation", tezConf);
        tezClient.start();
    }

    private void stopNonSessionClient() {
        if (tezClient != null) {
            try {
                LOG.info("Stopping Tez Client");
                tezClient.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        tezClient = null;
    }

    @Test(timeout = 600000)
    public void testExceptionPropagationSession() throws Exception {
        try {
            startSessionClient();
            for (ExceptionLocation exceptionLocation : ExceptionLocation.values()) {
                LOG.info("Session mode, Test for Exception from:" + exceptionLocation.name());
                String join = StringUtils.join(tezSession.submitDAG(createDAG(exceptionLocation)).waitForCompletion().getDiagnostics(), ",");
                LOG.info("Diagnostics:" + join);
                Assert.assertTrue(join.contains(exceptionLocation.name()));
            }
        } finally {
            stopSessionClient();
        }
    }

    @Test(timeout = 120000)
    public void testExceptionPropagationNonSession() throws Exception {
        ApplicationReport applicationReport;
        try {
            startMiniTezCluster();
            startNonSessionClient();
            ExceptionLocation exceptionLocation = ExceptionLocation.EM_GetNumSourceTaskPhysicalOutputs;
            LOG.info("NonSession mode, Test for Exception from:" + exceptionLocation.name());
            DAGStatus waitForCompletion = tezClient.submitDAG(createDAG(exceptionLocation)).waitForCompletion();
            String join = StringUtils.join(waitForCompletion.getDiagnostics(), ",");
            LOG.info("Diagnostics:" + join);
            Assert.assertTrue(join.contains(exceptionLocation.name()));
            ApplicationId appMasterApplicationId = tezClient.getAppMasterApplicationId();
            YarnClient createYarnClient = YarnClient.createYarnClient();
            createYarnClient.init(tezConf);
            createYarnClient.start();
            EnumSet of = EnumSet.of(YarnApplicationState.KILLED, YarnApplicationState.FAILED, YarnApplicationState.FINISHED);
            do {
                applicationReport = createYarnClient.getApplicationReport(appMasterApplicationId);
                Thread.sleep(1000L);
                LOG.info("FinalAppStatus:" + applicationReport.getFinalApplicationStatus());
                LOG.info("Diagnostics from appReport:" + applicationReport.getDiagnostics());
            } while (!of.contains(applicationReport.getYarnApplicationState()));
            Thread.sleep(1000L);
            ApplicationReport applicationReport2 = createYarnClient.getApplicationReport(appMasterApplicationId);
            LOG.info("FinalAppStatus:" + applicationReport2.getFinalApplicationStatus());
            LOG.info("Diagnostics from appReport:" + applicationReport2.getDiagnostics());
            Assert.assertTrue(applicationReport2.getDiagnostics().contains(exceptionLocation.name()));
            Assert.assertEquals(StringUtils.join(waitForCompletion.getDiagnostics(), "\n").trim(), applicationReport2.getDiagnostics().trim());
            stopNonSessionClient();
            Thread.sleep(10000L);
            stopTezMiniCluster();
        } catch (Throwable th) {
            stopNonSessionClient();
            Thread.sleep(10000L);
            stopTezMiniCluster();
            throw th;
        }
    }

    private DAG createDAG(ExceptionLocation exceptionLocation) throws IOException {
        DAG create = DAG.create("dag_" + exceptionLocation.name());
        UserPayload create2 = UserPayload.create(ByteBuffer.wrap(exceptionLocation.name().getBytes()));
        Vertex create3 = Vertex.create("v1", ProcessorWithException.getProcDesc(create2), 1);
        create3.addDataSource("input", DataSourceDescriptor.create(InputWithException.getInputDesc(create2), InputInitializerWithException.getIIDesc(create2), (Credentials) null));
        create3.setVertexManagerPlugin(RootInputVertexManagerWithException.getVMDesc(create2));
        Vertex create4 = Vertex.create("v2", TestAMRecovery.DoNothingProcessor.getProcDesc(), 1);
        create4.addDataSource("input2", DataSourceDescriptor.create(InputDescriptor.create(MultiAttemptDAG.NoOpInput.class.getName()), InputInitializerWithException2.getIIDesc(create2), (Credentials) null));
        create.addVertex(create3).addVertex(create4);
        if (exceptionLocation.name().startsWith("EM_")) {
            create.addEdge(Edge.create(create3, create4, EdgeProperty.create(EdgeManagerPluginDescriptor.create(CustomEdgeManager.class.getName()).setUserPayload(create2), EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputWithException.getOutputDesc(create2), InputWithException.getInputDesc(create2))));
        } else {
            create4.setVertexManagerPlugin(InputReadyVertexManagerWithException.getVMDesc(exceptionLocation));
            create.addEdge(Edge.create(create3, create4, EdgeProperty.create(EdgeProperty.DataMovementType.ONE_TO_ONE, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputWithException.getOutputDesc(create2), InputWithException.getInputDesc(create2))));
        }
        return create;
    }
}
