/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.MockDAGAppMaster;
import org.apache.tez.dag.app.MockTezClient;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;

public class TestPreemption {
    static Configuration defaultConf;
    static FileSystem localFs;
    static Path workDir;
    MockDAGAppMaster mockApp;
    MockDAGAppMaster.MockContainerLauncher mockLauncher;
    int dagCount = 0;

    DAG createDAG(EdgeProperty.DataMovementType dmType) {
        DAG dag = DAG.create((String)("test-" + this.dagCount++));
        Vertex vA = Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5);
        Vertex vB = Vertex.create((String)"B", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5);
        Edge eAB = Edge.create((Vertex)vA, (Vertex)vB, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)dmType, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"O.class"), (InputDescriptor)InputDescriptor.create((String)"I.class")));
        dag.addVertex(vA).addVertex(vB).addEdge(eAB);
        return dag;
    }

    @Test(timeout=5000L)
    public void testPreemptionWithoutSession() throws Exception {
        System.out.println("TestPreemptionWithoutSession");
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        tezconf.setInt("tez.am.task.max.failed.attempts", 0);
        AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
        MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, false, null, null, mockAppLauncherGoFlag);
        tezClient.start();
        DAGClient dagClient = tezClient.submitDAG(this.createDAG(EdgeProperty.DataMovementType.SCATTER_GATHER));
        this.syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
        DAGImpl dagImpl = (DAGImpl)this.mockApp.getContext().getCurrentDAG();
        int vertexIndex = 0;
        int upToTaskVersion = 3;
        TezVertexID vertexId = TezVertexID.getInstance((TezDAGID)dagImpl.getID(), (int)vertexIndex);
        TezTaskAttemptID taId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)0);
        this.mockLauncher.preemptContainerForTask(taId.getTaskID(), upToTaskVersion);
        this.mockLauncher.startScheduling(true);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        for (int i = 0; i <= upToTaskVersion; ++i) {
            TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)i);
            TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
            Assert.assertEquals((Object)TaskAttemptStateInternal.KILLED, (Object)taImpl.getInternalState());
        }
        tezClient.stop();
    }

    @Test(timeout=30000L)
    public void testPreemptionWithSession() throws Exception {
        System.out.println("TestPreemptionWithSession");
        MockTezClient tezClient = this.createTezSession();
        this.testPreemptionSingle(tezClient, this.createDAG(EdgeProperty.DataMovementType.SCATTER_GATHER), 0, "Scatter-Gather");
        this.testPreemptionMultiple(tezClient, this.createDAG(EdgeProperty.DataMovementType.SCATTER_GATHER), 0, "Scatter-Gather");
        this.testPreemptionSingle(tezClient, this.createDAG(EdgeProperty.DataMovementType.BROADCAST), 0, "Broadcast");
        this.testPreemptionMultiple(tezClient, this.createDAG(EdgeProperty.DataMovementType.BROADCAST), 0, "Broadcast");
        this.testPreemptionSingle(tezClient, this.createDAG(EdgeProperty.DataMovementType.ONE_TO_ONE), 0, "1-1");
        this.testPreemptionMultiple(tezClient, this.createDAG(EdgeProperty.DataMovementType.ONE_TO_ONE), 0, "1-1");
        this.testPreemptionSingle(tezClient, this.createDAG(EdgeProperty.DataMovementType.SCATTER_GATHER), 1, "Scatter-Gather");
        this.testPreemptionMultiple(tezClient, this.createDAG(EdgeProperty.DataMovementType.SCATTER_GATHER), 1, "Scatter-Gather");
        this.testPreemptionSingle(tezClient, this.createDAG(EdgeProperty.DataMovementType.BROADCAST), 1, "Broadcast");
        this.testPreemptionMultiple(tezClient, this.createDAG(EdgeProperty.DataMovementType.BROADCAST), 1, "Broadcast");
        this.testPreemptionSingle(tezClient, this.createDAG(EdgeProperty.DataMovementType.ONE_TO_ONE), 1, "1-1");
        this.testPreemptionMultiple(tezClient, this.createDAG(EdgeProperty.DataMovementType.ONE_TO_ONE), 1, "1-1");
        tezClient.stop();
    }

    MockTezClient createTezSession() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        tezconf.setInt("tez.am.task.max.failed.attempts", 0);
        AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
        MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, true, null, null, mockAppLauncherGoFlag);
        tezClient.start();
        this.syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
        return tezClient;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag, MockTezClient tezClient) throws Exception {
        AtomicBoolean atomicBoolean = mockAppLauncherGoFlag;
        synchronized (atomicBoolean) {
            while (!mockAppLauncherGoFlag.get()) {
                mockAppLauncherGoFlag.wait();
            }
            this.mockApp = tezClient.getLocalClient().getMockApp();
            this.mockLauncher = this.mockApp.getContainerLauncher();
            this.mockLauncher.startScheduling(allowScheduling);
            mockAppLauncherGoFlag.notify();
        }
    }

    void testPreemptionSingle(MockTezClient tezClient, DAG dag, int vertexIndex, String info) throws Exception {
        this.testPreemptionJob(tezClient, dag, vertexIndex, 0, info + "-Single");
    }

    void testPreemptionMultiple(MockTezClient tezClient, DAG dag, int vertexIndex, String info) throws Exception {
        this.testPreemptionJob(tezClient, dag, vertexIndex, 3, info + "-Multiple");
    }

    void testPreemptionJob(MockTezClient tezClient, DAG dag, int vertexIndex, int upToTaskVersion, String info) throws Exception {
        System.out.println("TestPreemption - Running - " + info);
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        tezconf.setInt("tez.am.task.max.failed.attempts", 0);
        this.mockLauncher.startScheduling(false);
        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGImpl dagImpl = (DAGImpl)this.mockApp.getContext().getCurrentDAG();
        TezVertexID vertexId = TezVertexID.getInstance((TezDAGID)dagImpl.getID(), (int)vertexIndex);
        TezTaskAttemptID taId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)0);
        this.mockLauncher.preemptContainerForTask(taId.getTaskID(), upToTaskVersion);
        this.mockLauncher.startScheduling(true);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        for (int i = 0; i <= upToTaskVersion; ++i) {
            TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)i);
            TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
            Assert.assertEquals((Object)TaskAttemptStateInternal.KILLED, (Object)taImpl.getInternalState());
        }
        System.out.println("TestPreemption - Done running - " + info);
    }

    static {
        try {
            defaultConf = new Configuration(false);
            defaultConf.set("fs.defaultFS", "file:///");
            defaultConf.setBoolean("tez.local.mode", true);
            localFs = FileSystem.getLocal((Configuration)defaultConf);
            workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), "TestDAGAppMaster").makeQualified(localFs);
        }
        catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }
}

