/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.ExecutionContext;

public class ExecutionGraphDeploymentTest {
    @Test
    public void testBuildDeploymentDescriptor() {
        try {
            JobID jobId = new JobID();
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertexID jid3 = new JobVertexID();
            JobVertexID jid4 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            JobVertex v3 = new JobVertex("v3", jid3);
            JobVertex v4 = new JobVertex("v4", jid4);
            v1.setParallelism(10);
            v2.setParallelism(10);
            v3.setParallelism(10);
            v4.setParallelism(10);
            v1.setInvokableClass(BatchTask.class);
            v2.setInvokableClass(BatchTask.class);
            v3.setInvokableClass(BatchTask.class);
            v4.setInvokableClass(BatchTask.class);
            v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
            v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
            v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
            ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), jobId, "some job", new Configuration(), new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy());
            List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
            eg.attachJobGraph(ordered);
            ExecutionJobVertex ejv = (ExecutionJobVertex)eg.getAllVertices().get(jid2);
            ExecutionVertex vertex = ejv.getTaskVertices()[3];
            ExecutionGraphTestUtils.SimpleActorGateway instanceGateway = new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext());
            Instance instance = ExecutionGraphTestUtils.getInstance(instanceGateway);
            SimpleSlot slot = instance.allocateSimpleSlot(jobId);
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot(slot);
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            TaskDeploymentDescriptor descr = instanceGateway.lastTDD;
            Assert.assertNotNull((Object)descr);
            Assert.assertEquals((Object)jobId, (Object)descr.getJobID());
            Assert.assertEquals((Object)jid2, (Object)descr.getVertexID());
            Assert.assertEquals((long)3L, (long)descr.getIndexInSubtaskGroup());
            Assert.assertEquals((long)10L, (long)descr.getNumberOfSubtasks());
            Assert.assertEquals((Object)BatchTask.class.getName(), (Object)descr.getInvokableClassName());
            Assert.assertEquals((Object)"v2", (Object)descr.getTaskName());
            List producedPartitions = descr.getProducedPartitions();
            List consumedPartitions = descr.getInputGates();
            Assert.assertEquals((long)2L, (long)producedPartitions.size());
            Assert.assertEquals((long)1L, (long)consumedPartitions.size());
            Assert.assertEquals((long)10L, (long)((ResultPartitionDeploymentDescriptor)producedPartitions.get(0)).getNumberOfSubpartitions());
            Assert.assertEquals((long)10L, (long)((ResultPartitionDeploymentDescriptor)producedPartitions.get(1)).getNumberOfSubpartitions());
            Assert.assertEquals((long)10L, (long)((InputGateDeploymentDescriptor)consumedPartitions.get(0)).getInputChannelDeploymentDescriptors().length);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFinishing() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Map<ExecutionAttemptID, Execution> executions = this.setupExecution(v1, 7650, v2, 2350);
            for (Execution e : executions.values()) {
                e.markFinished();
            }
            Assert.assertEquals((long)0L, (long)executions.size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailing() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Map<ExecutionAttemptID, Execution> executions = this.setupExecution(v1, 7, v2, 6);
            for (Execution e : executions.values()) {
                e.markFailed(null);
            }
            Assert.assertEquals((long)0L, (long)executions.size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailedExternally() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Map<ExecutionAttemptID, Execution> executions = this.setupExecution(v1, 7, v2, 6);
            for (Execution e : executions.values()) {
                e.fail(null);
            }
            Assert.assertEquals((long)0L, (long)executions.size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsCanceled() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Map<ExecutionAttemptID, Execution> executions = this.setupExecution(v1, 19, v2, 37);
            for (Execution e : executions.values()) {
                e.cancel();
                e.cancelingComplete();
            }
            Assert.assertEquals((long)0L, (long)executions.size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailingFinalize() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            FailingFinalizeJobVertex v1 = new FailingFinalizeJobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Map<ExecutionAttemptID, Execution> executions = this.setupExecution(v1, 6, v2, 4);
            ArrayList<Execution> execList = new ArrayList<Execution>();
            execList.addAll(executions.values());
            Collections.sort(execList, new Comparator<Execution>(){

                @Override
                public int compare(Execution o1, Execution o2) {
                    return o1.getVertex().getSimpleName().compareTo(o2.getVertex().getSimpleName());
                }
            });
            int cnt = 0;
            for (Execution e : execList) {
                e.markFinished();
                if (++cnt <= 6) {
                    Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)e.getState());
                    continue;
                }
                Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)e.getState());
            }
            Assert.assertEquals((long)0L, (long)executions.size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private Map<ExecutionAttemptID, Execution> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
        JobID jobId = new JobID();
        v1.setParallelism(dop1);
        v2.setParallelism(dop2);
        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.directExecutionContext(), jobId, "some job", new Configuration(), new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy());
        eg.setQueuedSchedulingAllowed(false);
        List<JobVertex> ordered = Arrays.asList(v1, v2);
        eg.attachJobGraph(ordered);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        for (int i = 0; i < dop1 + dop2; ++i) {
            scheduler.newInstanceAvailable(ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())));
        }
        Assert.assertEquals((long)(dop1 + dop2), (long)scheduler.getNumberOfAvailableSlots());
        eg.scheduleForExecution(scheduler);
        Map executions = eg.getRegisteredExecutions();
        Assert.assertEquals((long)(dop1 + dop2), (long)executions.size());
        return executions;
    }

    public static class FailingFinalizeJobVertex
    extends JobVertex {
        public FailingFinalizeJobVertex(String name) {
            super(name);
        }

        public FailingFinalizeJobVertex(String name, JobVertexID id) {
            super(name, id);
        }

        public void finalizeOnMaster(ClassLoader cl) throws Exception {
            throw new Exception();
        }
    }
}

