package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Lists;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import javax.ws.rs.core.MediaType;
import org.apache.apex.common.util.JarHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.util.Records;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/StramMiniClusterTest.class */
public class StramMiniClusterTest {

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
    private static final Logger LOG = LoggerFactory.getLogger(StramMiniClusterTest.class);
    protected static MiniYARNCluster yarnCluster = null;
    protected static Configuration conf = new Configuration();
    private static String APP_NAME = "$test\\\"'";

    /* loaded from: input_file:com/datatorrent/stram/StramMiniClusterTest$AddAttributeToArgsOperator.class */
    public static class AddAttributeToArgsOperator extends BaseOperator implements InputOperator {
        public void emitTuples() {
            if (!StramMiniClusterTest.APP_NAME.equals(System.getProperty(LogicalPlan.APPLICATION_NAME.getLongName()))) {
                throw new RuntimeException();
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StramMiniClusterTest$FailingOperator.class */
    public static class FailingOperator extends BaseOperator implements InputOperator {
        public void emitTuples() {
            throw new RuntimeException("Operator failure");
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StramMiniClusterTest$ShipJarsBaseOperator.class */
    protected class ShipJarsBaseOperator extends BaseOperator {
        protected ShipJarsBaseOperator() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StramMiniClusterTest$ShipJarsOperator.class */
    protected class ShipJarsOperator extends ShipJarsBaseOperator {
        protected ShipJarsOperator() {
            super();
        }
    }

    @Before
    public void setupEachTime() throws IOException {
        StreamingContainer.eventloop.start();
    }

    @After
    public void teardown() {
        StreamingContainer.eventloop.stop();
    }

    @BeforeClass
    public static void setup() throws InterruptedException, IOException {
        LOG.info("Starting up YARN cluster");
        conf = StramClientUtils.addDTDefaultResources(conf);
        conf.setInt("yarn.scheduler.minimum-allocation-mb", 64);
        conf.setInt("yarn.nodemanager.vmem-pmem-ratio", 20);
        conf.set("yarn.scheduler.capacity.root.queues", "default");
        conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
        conf.setBoolean("yarn.nodemanager.disk-health-checker.enable", false);
        conf.setFloat("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", 100.0f);
        conf.set("yarn.nodemanager.admin-env", String.format("JAVA_HOME=%s,CLASSPATH=%s", System.getProperty("java.home"), getTestRuntimeClasspath()));
        conf.set("yarn.nodemanager.env-whitelist", YarnConfiguration.DEFAULT_NM_ENV_WHITELIST.replaceAll("JAVA_HOME,*", ""));
        if (yarnCluster == null) {
            yarnCluster = new MiniYARNCluster(StramMiniClusterTest.class.getName(), 1, 1, 1);
            yarnCluster.init(conf);
            yarnCluster.start();
        }
        conf = yarnCluster.getConfig();
        URL resource = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
        if (resource == null) {
            LOG.error("Could not find 'yarn-site.xml' dummy file in classpath");
            throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
        }
        File file = new File(resource.getPath());
        yarnCluster.getConfig().set("yarn.application.classpath", file.getParent());
        FileOutputStream fileOutputStream = new FileOutputStream(file);
        LOG.debug("Conf file: {}", file);
        yarnCluster.getConfig().writeXml(fileOutputStream);
        fileOutputStream.close();
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            LOG.info("setup thread sleep interrupted. message=" + e.getMessage());
        }
    }

    @AfterClass
    public static void tearDown() throws IOException {
        if (yarnCluster != null) {
            yarnCluster.stop();
            yarnCluster = null;
        }
    }

    private void checkNodeState() throws YarnException {
        List nodeReports = yarnCluster.getResourceManager().getClientRMService().getClusterNodes((GetClusterNodesRequest) Records.newRecord(GetClusterNodesRequest.class)).getNodeReports();
        LOG.info("{}", nodeReports);
        Iterator it = nodeReports.iterator();
        while (it.hasNext()) {
            if (!((NodeReport) it.next()).getNodeState().isUnusable()) {
                return;
            }
        }
        Assert.fail("Yarn Mini cluster should have at least one usable node.");
    }

    @Test
    public void testSetupShutdown() throws Exception {
        checkNodeState();
        JarHelper jarHelper = new JarHelper();
        LOG.info("engine jar: {}", jarHelper.getJar(StreamingAppMaster.class));
        LOG.info("engine test jar: {}", jarHelper.getJar(StramMiniClusterTest.class));
        Properties properties = new Properties();
        properties.put("apex.operator.numGen.classname", TestGeneratorInputOperator.class.getName());
        properties.put("apex.operator.numGen.maxTuples", "1");
        properties.put("apex.operator.module1.classname", GenericTestOperator.class.getName());
        properties.put("apex.operator.module2.classname", GenericTestOperator.class.getName());
        properties.put("apex.stream.fromNumGen.source", "numGen.outport");
        properties.put("apex.stream.fromNumGen.sinks", "module1.inport1");
        properties.put("apex.stream.n1n2.source", "module1.outport1");
        properties.put("apex.stream.n1n2.sinks", "module2.inport1");
        properties.setProperty("apex." + LogicalPlan.MASTER_MEMORY_MB.getName(), "128");
        properties.setProperty("apex." + LogicalPlan.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties");
        properties.setProperty("apex.operator.*." + Context.OperatorContext.MEMORY_MB.getName(), "64");
        properties.setProperty("apex.operator.*." + Context.OperatorContext.VCORES.getName(), "1");
        properties.setProperty("apex.operator.*.port.*." + Context.PortContext.BUFFER_MEMORY_MB.getName(), "32");
        properties.setProperty("apex." + LogicalPlan.DEBUG.getName(), "true");
        LOG.info("dag properties: {}", properties);
        LOG.info("Initializing Client");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(conf);
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        StramClient stramClient = new StramClient(new Configuration(yarnCluster.getConfig()), createDAG(logicalPlanConfiguration));
        try {
            stramClient.start();
            LOG.info("Running client");
            stramClient.startApplication();
            boolean monitorApplication = stramClient.monitorApplication();
            LOG.info("Client run completed. Result=" + monitorApplication);
            Assert.assertTrue(monitorApplication);
            stramClient.stop();
        } catch (Throwable th) {
            stramClient.stop();
            throw th;
        }
    }

    private LogicalPlan createDAG(LogicalPlanConfiguration logicalPlanConfiguration) throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(LogicalPlan.APPLICATION_PATH, this.testMeta.toURI().toString());
        logicalPlanConfiguration.prepareDAG(logicalPlan, (StreamingApplication) null, "testApp");
        logicalPlan.validate();
        Assert.assertEquals("", 128, logicalPlan.getValue(DAG.MASTER_MEMORY_MB));
        Assert.assertEquals("", "-Dlog4j.properties=custom_log4j.properties", logicalPlan.getValue(DAG.CONTAINER_JVM_OPTIONS));
        return logicalPlan;
    }

    @Test
    @Ignore
    public void testWebService() throws Exception {
        Properties properties = new Properties();
        properties.put("apex.stream.input.classname", TestGeneratorInputOperator.class.getName());
        properties.put("apex.stream.input.outputNode", "module1");
        properties.put("apex.module.module1.classname", GenericTestOperator.class.getName());
        LOG.info("Initializing Client");
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration(false));
        logicalPlanConfiguration.addFromProperties(properties, (Configuration) null);
        StramClient stramClient = new StramClient(new Configuration(yarnCluster.getConfig()), createDAG(logicalPlanConfiguration));
        try {
            stramClient.start();
            stramClient.startApplication();
            ApplicationReport applicationReport = stramClient.getApplicationReport();
            Thread.sleep(5000L);
            Client create = Client.create();
            create.setFollowRedirects(true);
            WebResource path = create.resource("http://" + applicationReport.getTrackingUrl()).path("/ws/v2/stram").path("info");
            LOG.info("Requesting: " + path.getURI());
            ClientResponse clientResponse = (ClientResponse) path.accept(new String[]{"application/json"}).get(ClientResponse.class);
            Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, clientResponse.getType());
            JSONObject jSONObject = (JSONObject) clientResponse.getEntity(JSONObject.class);
            LOG.info("Got response: " + jSONObject.toString());
            Assert.assertEquals("incorrect number of elements", 1L, jSONObject.length());
            Assert.assertEquals("appId", applicationReport.getApplicationId().toString(), jSONObject.get("id"));
            WebResource path2 = create.resource("http://" + applicationReport.getTrackingUrl()).path("/ws/v2/stram").path("physicalPlan/operators");
            LOG.info("Requesting: " + path2.getURI());
            ClientResponse clientResponse2 = (ClientResponse) path2.accept(new String[]{"application/json"}).get(ClientResponse.class);
            Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, clientResponse2.getType());
            LOG.info("Got response: " + ((JSONObject) clientResponse2.getEntity(JSONObject.class)).toString());
            stramClient.killApplication();
            stramClient.stop();
        } catch (Throwable th) {
            stramClient.killApplication();
            stramClient.stop();
            throw th;
        }
    }

    /* JADX WARN: String concatenation convert failed
    jadx.core.utils.exceptions.JadxRuntimeException: Can't remove SSA var: r8v0 java.lang.String, still in use, count: 3, list:
      (r8v0 java.lang.String) from 0x00d7: PHI (r8v1 java.lang.String) = (r8v0 java.lang.String), (r8v3 java.lang.String) binds: [B:26:0x00b9, B:11:0x009a] A[DONT_GENERATE, DONT_INLINE]
      (r8v0 java.lang.String) from 0x0044: RETURN (r8v0 java.lang.String) A[TRY_LEAVE]
      (r8v0 java.lang.String) from STR_CONCAT 
      (r8v0 java.lang.String)
      (wrap:java.lang.String:0x008b: INVOKE (r0v20 java.lang.String) VIRTUAL call: java.lang.String.trim():java.lang.String A[Catch: IOException -> 0x00b8, MD:():java.lang.String (c), WRAPPED])
      (":")
     A[Catch: IOException -> 0x00b8, MD:():java.lang.String (c), SYNTHETIC, WRAPPED]
    	at jadx.core.utils.InsnRemover.removeSsaVar(InsnRemover.java:151)
    	at jadx.core.utils.InsnRemover.unbindResult(InsnRemover.java:116)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:80)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.dex.visitors.SimplifyVisitor.removeStringBuilderInsns(SimplifyVisitor.java:495)
    	at jadx.core.dex.visitors.SimplifyVisitor.convertStringBuilderChain(SimplifyVisitor.java:422)
    	at jadx.core.dex.visitors.SimplifyVisitor.convertInvoke(SimplifyVisitor.java:314)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyInsn(SimplifyVisitor.java:145)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyBlock(SimplifyVisitor.java:86)
    	at jadx.core.dex.visitors.SimplifyVisitor.visit(SimplifyVisitor.java:71)
     */
    private static String getTestRuntimeClasspath() {
        String str;
        ClassLoader contextClassLoader;
        InputStream inputStream = null;
        BufferedReader bufferedReader = null;
        LOG.info("Trying to generate classpath for app master from current thread's classpath");
        try {
            contextClassLoader = Thread.currentThread().getContextClassLoader();
            inputStream = contextClassLoader.getResourceAsStream("mvn-generated-classpath");
        } catch (IOException e) {
            LOG.info("Could not find the necessary resource to generate class path for tests. Error=" + e.getMessage());
        }
        if (inputStream == null) {
            LOG.info("Could not load classpath resource mvn-generated-classpath");
            return str;
        }
        LOG.info("Readable bytes from stream=" + inputStream.available());
        bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
        String readLine = bufferedReader.readLine();
        str = new StringBuilder().append(readLine != null ? str + readLine.trim() + ":" : "").append(contextClassLoader.getResource("mvn-generated-classpath").getFile()).toString();
        if (inputStream != null) {
            try {
                inputStream.close();
            } catch (IOException e2) {
                LOG.info("Failed to close class path file stream or reader. Error=" + e2.getMessage());
            }
        }
        if (bufferedReader != null) {
            bufferedReader.close();
        }
        return str;
    }

    @Test
    public void testOperatorFailureRecovery() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(LogicalPlan.APPLICATION_PATH, this.testMeta.toURI().toString());
        logicalPlan.getContextAttributes(logicalPlan.addOperator("badOperator", FailingOperator.class)).put(Context.OperatorContext.RECOVERY_ATTEMPTS, 1);
        LOG.info("Initializing Client");
        StramClient stramClient = new StramClient(conf, logicalPlan);
        try {
            stramClient.start();
            stramClient.startApplication();
            stramClient.setClientTimeout(120000L);
            boolean monitorApplication = stramClient.monitorApplication();
            LOG.info("Client run completed. Result=" + monitorApplication);
            Assert.assertFalse("should fail", monitorApplication);
            Assert.assertEquals("should fail", FinalApplicationStatus.FAILED, stramClient.getApplicationReport().getFinalApplicationStatus());
            stramClient.stop();
        } catch (Throwable th) {
            stramClient.stop();
            throw th;
        }
    }

    @Test
    @Ignore
    public void testUnmanagedAM() throws Exception {
        new InlineAM(conf) { // from class: com.datatorrent.stram.StramMiniClusterTest.1
            @Override // com.datatorrent.stram.InlineAM
            public void runAM(ApplicationAttemptId applicationAttemptId) throws Exception {
                StramMiniClusterTest.LOG.debug("AM running {}", applicationAttemptId);
                ApplicationMasterProtocol connectToRM = new StramClientUtils.YarnClientHelper(StramMiniClusterTest.conf).connectToRM();
                connectToRM.registerApplicationMaster((RegisterApplicationMasterRequest) Records.newRecord(RegisterApplicationMasterRequest.class));
                AllocateRequest allocateRequest = (AllocateRequest) Records.newRecord(AllocateRequest.class);
                int i = 0 + 1;
                allocateRequest.setResponseId(0);
                ArrayList newArrayList = Lists.newArrayList();
                newArrayList.add(setupContainerAskForRM("hdev-vm", 1, 128, 10));
                newArrayList.add(setupContainerAskForRM("/default-rack", 1, 128, 10));
                newArrayList.add(setupContainerAskForRM("*", 1, 128, 10));
                allocateRequest.setAskList(newArrayList);
                StramMiniClusterTest.LOG.info("Requesting: " + allocateRequest.getAskList());
                connectToRM.allocate(allocateRequest);
                for (int i2 = 0; i2 < 100; i2++) {
                    AllocateRequest allocateRequest2 = (AllocateRequest) Records.newRecord(AllocateRequest.class);
                    int i3 = i;
                    i++;
                    allocateRequest2.setResponseId(i3);
                    AllocateResponse allocate = connectToRM.allocate(allocateRequest2);
                    Thread.sleep(1000L);
                    StramMiniClusterTest.LOG.debug("allocateResponse: {}", allocate);
                }
                FinishApplicationMasterRequest finishApplicationMasterRequest = (FinishApplicationMasterRequest) Records.newRecord(FinishApplicationMasterRequest.class);
                finishApplicationMasterRequest.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
                finishApplicationMasterRequest.setDiagnostics("testUnmanagedAM finished");
                connectToRM.finishApplicationMaster(finishApplicationMasterRequest);
            }

            private ResourceRequest setupContainerAskForRM(String str, int i, int i2, int i3) {
                ResourceRequest resourceRequest = (ResourceRequest) Records.newRecord(ResourceRequest.class);
                resourceRequest.setResourceName(str);
                resourceRequest.setNumContainers(i);
                Priority priority = (Priority) Records.newRecord(Priority.class);
                priority.setPriority(i3);
                resourceRequest.setPriority(priority);
                Resource resource = (Resource) Records.newRecord(Resource.class);
                resource.setMemory(i2);
                resourceRequest.setCapability(resource);
                return resourceRequest;
            }
        }.run();
    }

    @Test
    @Ignore
    public void testUnmanagedAM2() throws Exception {
        new InlineAM(conf) { // from class: com.datatorrent.stram.StramMiniClusterTest.2
            @Override // com.datatorrent.stram.InlineAM
            public void runAM(ApplicationAttemptId applicationAttemptId) throws Exception {
                StramMiniClusterTest.LOG.debug("AM running {}", applicationAttemptId);
                AMRMClient createAMRMClient = AMRMClient.createAMRMClient();
                createAMRMClient.init(StramMiniClusterTest.conf);
                createAMRMClient.start();
                createAMRMClient.registerApplicationMaster("", 0, (String) null);
                Resource resource = (Resource) Records.newRecord(Resource.class);
                resource.setMemory(1000);
                Priority priority = (Priority) Records.newRecord(Priority.class);
                priority.setPriority(10);
                AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(resource, new String[]{"vm1"}, new String[]{"somerack"}, priority);
                createAMRMClient.addContainerRequest(containerRequest);
                createAMRMClient.addContainerRequest(containerRequest);
                for (int i = 0; i < 100; i++) {
                    AllocateResponse allocate = createAMRMClient.allocate(0.0f);
                    Thread.sleep(1000L);
                    StramMiniClusterTest.LOG.debug("allocateResponse: {}", allocate);
                    Iterator it = allocate.getAllocatedContainers().iterator();
                    while (it.hasNext()) {
                        StramMiniClusterTest.LOG.debug("*** allocated {}", ((Container) it.next()).getResource());
                        createAMRMClient.removeContainerRequest(containerRequest);
                    }
                }
                createAMRMClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "testUnmanagedAM finished", (String) null);
            }
        }.run();
    }

    @Test
    public void testAddAttributeToArgs() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(LogicalPlan.APPLICATION_NAME, APP_NAME);
        logicalPlan.getContextAttributes(logicalPlan.addOperator("test", AddAttributeToArgsOperator.class)).put(Context.OperatorContext.RECOVERY_ATTEMPTS, 0);
        StramClient stramClient = new StramClient(conf, logicalPlan);
        try {
            stramClient.start();
            stramClient.startApplication();
            Assert.assertTrue(stramClient.monitorApplication());
            stramClient.stop();
        } catch (Throwable th) {
            stramClient.stop();
            throw th;
        }
    }
}
