/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app;

import com.google.common.base.Joiner;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.MockClock;
import org.apache.tez.dag.app.MockDAGAppMaster;
import org.apache.tez.dag.app.MockTezClient;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;

public class TestSpeculation {
    static Configuration defaultConf;
    static FileSystem localFs;
    MockDAGAppMaster mockApp;
    MockDAGAppMaster.MockContainerLauncher mockLauncher;

    MockTezClient createTezSession() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
        MockTezClient tezClient = new MockTezClient("testspeculation", tezconf, true, null, null, new MockClock(), mockAppLauncherGoFlag, false, false, 1, 2);
        tezClient.start();
        this.syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
        return tezClient;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag, MockTezClient tezClient) throws Exception {
        AtomicBoolean atomicBoolean = mockAppLauncherGoFlag;
        synchronized (atomicBoolean) {
            while (!mockAppLauncherGoFlag.get()) {
                mockAppLauncherGoFlag.wait();
            }
            this.mockApp = tezClient.getLocalClient().getMockApp();
            this.mockLauncher = this.mockApp.getContainerLauncher();
            this.mockLauncher.startScheduling(allowScheduling);
            mockAppLauncherGoFlag.notify();
        }
    }

    public void testBasicSpeculation(boolean withProgress) throws Exception {
        DAG dag = DAG.create((String)"test");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5);
        dag.addVertex(vA);
        MockTezClient tezClient = this.createTezSession();
        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGImpl dagImpl = (DAGImpl)this.mockApp.getContext().getCurrentDAG();
        TezVertexID vertexId = TezVertexID.getInstance((TezDAGID)dagImpl.getID(), (int)0);
        TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)0);
        TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)1);
        this.mockLauncher.updateProgress(withProgress);
        this.mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
        this.mockLauncher.startScheduling(true);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        TaskImpl task = dagImpl.getTask(killedTaId.getTaskID());
        Assert.assertEquals((long)2L, (long)task.getAttempts().size());
        Assert.assertEquals((Object)successTaId, (Object)task.getSuccessfulAttempt().getID());
        TaskAttempt killedAttempt = task.getAttempt(killedTaId);
        Joiner.on((String)",").join((Iterable)killedAttempt.getDiagnostics()).contains("Killed as speculative attempt");
        Assert.assertEquals((Object)TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, (Object)killedAttempt.getTerminationCause());
        if (withProgress) {
            Assert.assertEquals((long)1L, (long)task.getCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
            Assert.assertEquals((long)1L, (long)dagImpl.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
            Vertex v = dagImpl.getVertex(killedTaId.getTaskID().getVertexID());
            Assert.assertEquals((long)1L, (long)v.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
        }
        tezClient.stop();
    }

    @Test(timeout=10000L)
    public void testBasicSpeculationWithProgress() throws Exception {
        this.testBasicSpeculation(true);
    }

    @Test(timeout=10000L)
    public void testBasicSpeculationWithoutProgress() throws Exception {
        this.testBasicSpeculation(false);
    }

    @Test(timeout=10000L)
    public void testBasicSpeculationPerVertexConf() throws Exception {
        DAG dag = DAG.create((String)"test");
        String vNameNoSpec = "A";
        String vNameSpec = "B";
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)vNameNoSpec, (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5);
        org.apache.tez.dag.api.Vertex vB = org.apache.tez.dag.api.Vertex.create((String)vNameSpec, (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5);
        vA.setConf("tez.am.speculation.enabled", "false");
        dag.addVertex(vA);
        dag.addVertex(vB);
        dag.addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vA, (org.apache.tez.dag.api.Vertex)vB, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"O"), (InputDescriptor)InputDescriptor.create((String)"I"))));
        MockTezClient tezClient = this.createTezSession();
        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGImpl dagImpl = (DAGImpl)this.mockApp.getContext().getCurrentDAG();
        TezVertexID vertexId = dagImpl.getVertex(vNameSpec).getVertexId();
        TezVertexID vertexIdNoSpec = dagImpl.getVertex(vNameNoSpec).getVertexId();
        TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)0);
        TezTaskAttemptID noSpecTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexIdNoSpec, (int)0), (int)0);
        this.mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
        this.mockLauncher.setStatusUpdatesForTask(noSpecTaId, 100);
        this.mockLauncher.startScheduling(true);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        Vertex vSpec = dagImpl.getVertex(vertexId);
        Vertex vNoSpec = dagImpl.getVertex(vertexIdNoSpec);
        Assert.assertTrue((vSpec.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)vNoSpec.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
        tezClient.stop();
    }

    @Test(timeout=10000L)
    public void testBasicSpeculationNotUseful() throws Exception {
        DAG dag = DAG.create((String)"test");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5);
        dag.addVertex(vA);
        MockTezClient tezClient = this.createTezSession();
        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGImpl dagImpl = (DAGImpl)this.mockApp.getContext().getCurrentDAG();
        TezVertexID vertexId = TezVertexID.getInstance((TezDAGID)dagImpl.getID(), (int)0);
        TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)0);
        TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)1);
        this.mockLauncher.setStatusUpdatesForTask(successTaId, 100);
        this.mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
        this.mockLauncher.startScheduling(true);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        TaskImpl task = dagImpl.getTask(killedTaId.getTaskID());
        Assert.assertEquals((long)2L, (long)task.getAttempts().size());
        Assert.assertEquals((Object)successTaId, (Object)task.getSuccessfulAttempt().getID());
        TaskAttempt killedAttempt = task.getAttempt(killedTaId);
        Joiner.on((String)",").join((Iterable)killedAttempt.getDiagnostics()).contains("Killed speculative attempt as");
        Assert.assertEquals((Object)TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION, (Object)killedAttempt.getTerminationCause());
        Assert.assertEquals((long)1L, (long)task.getCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
        Assert.assertEquals((long)1L, (long)dagImpl.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
        Vertex v = dagImpl.getVertex(killedTaId.getTaskID().getVertexID());
        Assert.assertEquals((long)1L, (long)v.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
        tezClient.stop();
    }

    static {
        try {
            defaultConf = new Configuration(false);
            defaultConf.set("fs.defaultFS", "file:///");
            defaultConf.setBoolean("tez.local.mode", true);
            defaultConf.setBoolean("tez.am.speculation.enabled", true);
            defaultConf.setFloat("tez.shuffle-vertex-manager.min-src-fraction", 1.0f);
            defaultConf.setFloat("tez.shuffle-vertex-manager.max-src-fraction", 1.0f);
            localFs = FileSystem.getLocal((Configuration)defaultConf);
            String stagingDir = "target/" + TestSpeculation.class.getName() + "-tmpDir";
            defaultConf.set("tez.staging-dir", stagingDir);
        }
        catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }
}

