package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.TestPlanContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.requests.CreateOperatorRequest;
import com.datatorrent.stram.plan.logical.requests.CreateStreamRequest;
import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.plan.physical.PhysicalPlanTest;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Lists;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.MockitoUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/StramRecoveryTest.class */
public class StramRecoveryTest {
    private static final Logger LOG = LoggerFactory.getLogger(StramRecoveryTest.class);

    @Rule
    public final StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();

    /* loaded from: input_file:com/datatorrent/stram/StramRecoveryTest$StatsListeningOperator.class */
    public static class StatsListeningOperator extends TestGeneratorInputOperator implements StatsListener {
        int processStatsCnt = 0;

        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
            this.processStatsCnt++;
            return null;
        }
    }

    private void testPhysicalPlanSerialization(StorageAgent storageAgent) throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("o1", GenericTestOperator.class);
        PhysicalPlanTest.PartitioningTestOperator addOperator2 = logicalPlan.addOperator("o2", PhysicalPlanTest.PartitioningTestOperator.class);
        addOperator2.setPartitionCount(3);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("o3", GenericTestOperator.class);
        logicalPlan.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1, addOperator2.inportWithCodec);
        logicalPlan.addStream("mergeStream", addOperator2.outport1, addOperator3.inport1);
        logicalPlan.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
        TestPlanContext testPlanContext = new TestPlanContext();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, storageAgent);
        PhysicalPlan physicalPlan = new PhysicalPlan(logicalPlan, testPlanContext);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        LogicalPlan.write(logicalPlan, byteArrayOutputStream);
        LOG.debug("logicalPlan size: " + byteArrayOutputStream.toByteArray().length);
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        new ObjectOutputStream(byteArrayOutputStream2).writeObject(physicalPlan);
        LOG.debug("physicalPlan size: " + byteArrayOutputStream2.toByteArray().length);
        PhysicalPlan physicalPlan2 = (PhysicalPlan) new ObjectInputStream(new ByteArrayInputStream(byteArrayOutputStream2.toByteArray())).readObject();
        LogicalPlan logicalPlan2 = physicalPlan2.getLogicalPlan();
        Field declaredField = PhysicalPlan.class.getDeclaredField("ctx");
        declaredField.setAccessible(true);
        declaredField.set(physicalPlan2, testPlanContext);
        declaredField.setAccessible(false);
        List<PTOperator> operators = physicalPlan2.getOperators(logicalPlan2.getOperatorMeta("o2"));
        Assert.assertEquals(3L, operators.size());
        for (PTOperator pTOperator : operators) {
            Assert.assertNotNull("partition null " + pTOperator, pTOperator.getPartitionKeys());
            Assert.assertEquals("partition keys " + pTOperator + " " + pTOperator.getPartitionKeys(), 2L, pTOperator.getPartitionKeys().size());
            Assert.assertEquals("instance per partition", pTOperator.getPartitionKeys().values().toString(), physicalPlan2.loadOperator(pTOperator).pks);
            Assert.assertNotNull("partition stats null " + pTOperator, pTOperator.stats);
        }
    }

    @Test
    public void testPhysicalPlanSerializationWithSyncAgent() throws Exception {
        testPhysicalPlanSerialization(new FSStorageAgent(this.testMeta.dir, (Configuration) null));
    }

    @Test
    public void testPhysicalPlanSerializationWithAsyncAgent() throws Exception {
        testPhysicalPlanSerialization(new AsyncFSStorageAgent(this.testMeta.dir, (Configuration) null));
    }

    public static void checkpoint(StreamingContainerManager streamingContainerManager, PTOperator pTOperator, Checkpoint checkpoint) throws Exception {
        ((StorageAgent) pTOperator.getOperatorMeta().getValue(Context.OperatorContext.STORAGE_AGENT)).save(pTOperator.getOperatorMeta().getOperator(), pTOperator.getId(), checkpoint.windowId);
    }

    private void testContainerManager(StorageAgent storageAgent) throws Exception {
        FileUtils.deleteDirectory(new File(this.testMeta.dir));
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(LogicalPlan.APPLICATION_PATH, this.testMeta.dir);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, storageAgent);
        StatsListeningOperator addOperator = logicalPlan.addOperator("o1", StatsListeningOperator.class);
        FSRecoveryHandler fSRecoveryHandler = new FSRecoveryHandler(logicalPlan.assertAppPath(), new Configuration(false));
        StreamingContainerManager streamingContainerManager = StreamingContainerManager.getInstance(fSRecoveryHandler, logicalPlan, false);
        File file = new File(fSRecoveryHandler.getDir(), "snapshot");
        Assert.assertTrue("snapshot file " + file, file.exists());
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        Assert.assertEquals("number required containers", 1L, physicalPlan.getContainers().size());
        PTOperator pTOperator = (PTOperator) physicalPlan.getOperators(logicalPlan.getMeta(addOperator)).get(0);
        new MockContainer(streamingContainerManager, pTOperator.getContainer());
        PTContainer container = pTOperator.getContainer();
        Assert.assertNotNull(pTOperator.getContainer().bufferServerAddress);
        Assert.assertEquals(PTContainer.State.ACTIVE, pTOperator.getContainer().getState());
        Assert.assertEquals("state " + pTOperator, PTOperator.State.PENDING_DEPLOY, pTOperator.getState());
        LogicalPlan logicalPlan2 = new LogicalPlan();
        logicalPlan2.setAttribute(LogicalPlan.APPLICATION_PATH, this.testMeta.dir);
        StreamingContainerManager streamingContainerManager2 = StreamingContainerManager.getInstance(new FSRecoveryHandler(logicalPlan2.assertAppPath(), new Configuration(false)), logicalPlan2, false);
        LogicalPlan logicalPlan3 = streamingContainerManager2.getLogicalPlan();
        PTOperator pTOperator2 = (PTOperator) streamingContainerManager2.getPhysicalPlan().getOperators(logicalPlan3.getOperatorMeta("o1")).get(0);
        Assert.assertEquals("post restore state " + pTOperator2, PTOperator.State.PENDING_DEPLOY, pTOperator2.getState());
        StatsListeningOperator operator = pTOperator2.getOperatorMeta().getOperator();
        Assert.assertEquals("containerId", container.getExternalId(), pTOperator2.getContainer().getExternalId());
        Assert.assertEquals("stats listener", 1L, pTOperator2.statsListeners.size());
        Assert.assertEquals("number stats calls", 0L, operator.processStatsCnt);
        Assert.assertEquals("post restore 1", PTContainer.State.ALLOCATED, pTOperator2.getContainer().getState());
        Assert.assertEquals("post restore 1", container.bufferServerAddress, pTOperator2.getContainer().bufferServerAddress);
        Assert.assertNotNull("allocated container restored " + container, streamingContainerManager2.getContainerAgent(container.getExternalId()));
        Assert.assertEquals("memory usage allocated container", ((Integer) Context.OperatorContext.MEMORY_MB.defaultValue).intValue(), r0.container.getAllocatedMemoryMB());
        streamingContainerManager2.scheduleContainerRestart(container.getExternalId());
        Assert.assertEquals("memory usage of failed container", 0L, r0.container.getAllocatedMemoryMB());
        Checkpoint checkpoint = new Checkpoint(3L, 0, 0);
        MockContainer mockContainer = new MockContainer(streamingContainerManager2, pTOperator2.getContainer());
        checkpoint(streamingContainerManager2, pTOperator2, checkpoint);
        mockContainer.stats(pTOperator2.getId()).deployState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE).currentWindowId(3L).checkpointWindowId(3L);
        mockContainer.sendHeartbeat();
        Assert.assertEquals("state " + pTOperator2, PTOperator.State.ACTIVE, pTOperator2.getState());
        LogicalPlanRequest createOperatorRequest = new CreateOperatorRequest();
        createOperatorRequest.setOperatorFQCN(GenericTestOperator.class.getName());
        createOperatorRequest.setOperatorName("o2");
        LogicalPlanRequest createStreamRequest = new CreateStreamRequest();
        createStreamRequest.setSourceOperatorName("o1");
        createStreamRequest.setSourceOperatorPortName(TestGeneratorInputOperator.OUTPUT_PORT);
        createStreamRequest.setSinkOperatorName("o2");
        createStreamRequest.setSinkOperatorPortName(GenericTestOperator.IPORT1);
        FutureTask logicalPlanModification = streamingContainerManager2.logicalPlanModification(Lists.newArrayList(new LogicalPlanRequest[]{createOperatorRequest, createStreamRequest}));
        while (!logicalPlanModification.isDone()) {
            streamingContainerManager2.monitorHeartbeat();
        }
        Assert.assertNull(logicalPlanModification.get());
        Assert.assertSame("dag references", logicalPlan3, streamingContainerManager2.getLogicalPlan());
        Assert.assertEquals("number operators after plan modification", 2L, logicalPlan3.getAllOperators().size());
        pTOperator2.setState(PTOperator.State.INACTIVE);
        Checkpoint checkpoint2 = new Checkpoint(10L, 0, 0);
        checkpoint(streamingContainerManager2, pTOperator2, checkpoint2);
        LogicalPlan logicalPlan4 = new LogicalPlan();
        logicalPlan4.setAttribute(LogicalPlan.APPLICATION_PATH, this.testMeta.dir);
        StreamingContainerManager streamingContainerManager3 = StreamingContainerManager.getInstance(new FSRecoveryHandler(logicalPlan4.assertAppPath(), new Configuration(false)), logicalPlan4, false);
        Assert.assertNotSame("dag references", logicalPlan4, streamingContainerManager3.getLogicalPlan());
        Assert.assertEquals("number operators after restore", 2L, streamingContainerManager3.getLogicalPlan().getAllOperators().size());
        PTOperator pTOperator3 = (PTOperator) streamingContainerManager3.getPhysicalPlan().getOperators(streamingContainerManager3.getLogicalPlan().getOperatorMeta("o1")).get(0);
        Assert.assertEquals("post restore state " + pTOperator3, PTOperator.State.INACTIVE, pTOperator3.getState());
        StatsListeningOperator operator2 = pTOperator3.getOperatorMeta().getOperator();
        Assert.assertEquals("stats listener", 1L, pTOperator3.statsListeners.size());
        Assert.assertEquals("number stats calls post restore", 1L, operator2.processStatsCnt);
        Assert.assertEquals("post restore 1", PTContainer.State.ACTIVE, pTOperator3.getContainer().getState());
        Assert.assertEquals("post restore 1", container.bufferServerAddress, pTOperator3.getContainer().bufferServerAddress);
        Assert.assertEquals("checkpoints after recovery", Lists.newArrayList(new Checkpoint[]{checkpoint, checkpoint2}), pTOperator3.checkpoints);
    }

    @Test
    public void testContainerManagerWithSyncAgent() throws Exception {
        testPhysicalPlanSerialization(new FSStorageAgent(this.testMeta.dir, (Configuration) null));
    }

    @Test
    public void testContainerManagerWithAsyncAgent() throws Exception {
        testPhysicalPlanSerialization(new AsyncFSStorageAgent(this.testMeta.dir, (Configuration) null));
    }

    @Test
    public void testWriteAheadLog() throws Exception {
        final MutableInt mutableInt = new MutableInt();
        final MutableBoolean mutableBoolean = new MutableBoolean(false);
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(LogicalPlan.APPLICATION_PATH, this.testMeta.dir);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(this.testMeta.dir, (Configuration) null));
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("o1", TestGeneratorInputOperator.class);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        Journal journal = streamingContainerManager.getJournal();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream() { // from class: com.datatorrent.stram.StramRecoveryTest.1
            @Override // java.io.OutputStream, java.io.Flushable
            public void flush() throws IOException {
                super.flush();
                mutableInt.increment();
            }

            @Override // java.io.ByteArrayOutputStream, java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                super.close();
                mutableBoolean.setValue(true);
            }
        };
        journal.setOutputStream(new DataOutputStream(byteArrayOutputStream));
        PTOperator pTOperator = (PTOperator) physicalPlan.getOperators(logicalPlan.getMeta(addOperator)).get(0);
        Assert.assertEquals(PTOperator.State.PENDING_DEPLOY, pTOperator.getState());
        String externalId = new MockContainer(streamingContainerManager, pTOperator.getContainer()).container.getExternalId();
        Assert.assertEquals("flush count", 1L, mutableInt.intValue());
        pTOperator.setState(PTOperator.State.ACTIVE);
        Assert.assertEquals(PTOperator.State.ACTIVE, pTOperator.getState());
        Assert.assertEquals("flush count", 2L, mutableInt.intValue());
        Assert.assertEquals("is closed", false, Boolean.valueOf(mutableBoolean.booleanValue()));
        journal.setOutputStream((OutputStream) null);
        Assert.assertEquals("flush count", 4L, mutableInt.intValue());
        Assert.assertEquals("is closed", true, Boolean.valueOf(mutableBoolean.booleanValue()));
        pTOperator.setState(PTOperator.State.INACTIVE);
        Assert.assertEquals(PTOperator.State.INACTIVE, pTOperator.getState());
        Assert.assertEquals("flush count", 4L, mutableInt.intValue());
        journal.replay(new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())));
        Assert.assertEquals(PTOperator.State.ACTIVE, pTOperator.getState());
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("host1", 1);
        PTContainer pTContainer = (PTContainer) physicalPlan.getContainers().get(0);
        pTContainer.setState(PTContainer.State.ALLOCATED);
        pTContainer.host = "host1";
        pTContainer.bufferServerAddress = createUnresolved;
        pTContainer.setAllocatedMemoryMB(2);
        pTContainer.setRequiredMemoryMB(1);
        pTContainer.setAllocatedVCores(3);
        pTContainer.setRequiredVCores(4);
        journal.setOutputStream(new DataOutputStream(byteArrayOutputStream));
        journal.write(pTContainer.getSetContainerState());
        pTContainer.setExternalId((String) null);
        pTContainer.setState(PTContainer.State.NEW);
        pTContainer.setExternalId((String) null);
        pTContainer.host = null;
        pTContainer.bufferServerAddress = null;
        journal.replay(new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())));
        Assert.assertEquals(externalId, pTContainer.getExternalId());
        Assert.assertEquals(PTContainer.State.ALLOCATED, pTContainer.getState());
        Assert.assertEquals("host1", pTContainer.host);
        Assert.assertEquals(createUnresolved, pTContainer.bufferServerAddress);
        Assert.assertEquals(1L, pTContainer.getRequiredMemoryMB());
        Assert.assertEquals(2L, pTContainer.getAllocatedMemoryMB());
        Assert.assertEquals(3L, pTContainer.getAllocatedVCores());
        Assert.assertEquals(4L, pTContainer.getRequiredVCores());
        journal.write(streamingContainerManager.getSetOperatorProperty("o1", "maxTuples", "100"));
        addOperator.setMaxTuples(10);
        journal.setOutputStream((OutputStream) null);
        journal.replay(new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())));
        Assert.assertEquals(100L, addOperator.getMaxTuples());
        journal.setOutputStream(new DataOutputStream(byteArrayOutputStream));
        streamingContainerManager.setOperatorProperty("o1", "maxTuples", "10");
        Assert.assertEquals(10L, addOperator.getMaxTuples());
        addOperator.setMaxTuples(100);
        Assert.assertEquals(100L, addOperator.getMaxTuples());
        journal.setOutputStream((OutputStream) null);
        journal.replay(new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())));
        Assert.assertEquals(10L, addOperator.getMaxTuples());
        journal.setOutputStream(new DataOutputStream(byteArrayOutputStream));
        streamingContainerManager.setPhysicalOperatorProperty(pTOperator.getId(), "maxTuples", "50");
    }

    private void testRestartApp(StorageAgent storageAgent, String str) throws Exception {
        FileUtils.deleteDirectory(new File(this.testMeta.dir));
        String str2 = this.testMeta.dir + "/app2";
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(LogicalPlan.APPLICATION_ID, "app1");
        logicalPlan.setAttribute(LogicalPlan.APPLICATION_PATH, str);
        logicalPlan.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, 1);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, storageAgent);
        logicalPlan.addOperator("o1", StatsListeningOperator.class);
        StreamingContainerManager.getInstance(new FSRecoveryHandler(logicalPlan.assertAppPath(), new Configuration(false)), logicalPlan, false);
        LogicalPlan logicalPlan2 = new LogicalPlan();
        logicalPlan2.setAttribute(LogicalPlan.APPLICATION_PATH, str);
        StreamingContainerManager streamingContainerManager = StreamingContainerManager.getInstance(new FSRecoveryHandler(logicalPlan2.assertAppPath(), new Configuration(false)), logicalPlan2, false);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        LogicalPlan logicalPlan3 = physicalPlan.getLogicalPlan();
        Assert.assertNotNull("operator", logicalPlan3.getOperatorMeta("o1"));
        PTOperator pTOperator = (PTOperator) physicalPlan.getOperators(logicalPlan3.getOperatorMeta("o1")).get(0);
        Assert.assertArrayEquals(new long[]{pTOperator.getRecoveryCheckpoint().getWindowId()}, new FSStorageAgent(str + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, new Configuration()).getWindowIds(pTOperator.getId()));
        Assert.assertNull(pTOperator.getContainer().getExternalId());
        pTOperator.getContainer().setExternalId("cid1");
        streamingContainerManager.writeJournal(pTOperator.getContainer().getSetContainerState());
        LogicalPlan logicalPlan4 = new LogicalPlan();
        logicalPlan4.setAttribute(LogicalPlan.APPLICATION_PATH, str2);
        logicalPlan4.setAttribute(LogicalPlan.APPLICATION_ID, "app2");
        StramClient stramClient = new StramClient(new Configuration(), logicalPlan4);
        try {
            stramClient.start();
            stramClient.copyInitialState(new Path(str));
            stramClient.stop();
            PhysicalPlan physicalPlan2 = StreamingContainerManager.getInstance(new FSRecoveryHandler(logicalPlan4.assertAppPath(), new Configuration(false)), logicalPlan4, false).getPhysicalPlan();
            LogicalPlan logicalPlan5 = physicalPlan2.getLogicalPlan();
            Assert.assertEquals("modified appId", "app2", logicalPlan5.getValue(LogicalPlan.APPLICATION_ID));
            Assert.assertEquals("modified appPath", str2, logicalPlan5.getValue(LogicalPlan.APPLICATION_PATH));
            Assert.assertNotNull("operator", logicalPlan5.getOperatorMeta("o1"));
            PTOperator pTOperator2 = (PTOperator) physicalPlan2.getOperators(logicalPlan5.getOperatorMeta("o1")).get(0);
            Assert.assertEquals("journal copied", "cid1", pTOperator2.getContainer().getExternalId());
            Assert.assertArrayEquals("checkpoints copied", new long[]{pTOperator2.getRecoveryCheckpoint().getWindowId()}, new FSStorageAgent(str2 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, new Configuration()).getWindowIds(pTOperator2.getId()));
        } catch (Throwable th) {
            stramClient.stop();
            throw th;
        }
    }

    @Test
    public void testRestartAppWithSyncAgent() throws Exception {
        String str = this.testMeta.dir + "/app1";
        testRestartApp(new FSStorageAgent(str + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, (Configuration) null), str);
    }

    @Test
    public void testRestartAppWithAsyncAgent() throws Exception {
        String str = this.testMeta.dir + "/app1";
        testRestartApp(new AsyncFSStorageAgent(str + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, (Configuration) null), str);
    }

    @Test
    public void testRpcFailover() throws Exception {
        String str = this.testMeta.dir;
        Configuration configuration = new Configuration(false);
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        StreamingContainerUmbilicalProtocol streamingContainerUmbilicalProtocol = (StreamingContainerUmbilicalProtocol) MockitoUtil.mockProtocol(StreamingContainerUmbilicalProtocol.class);
        ((StreamingContainerUmbilicalProtocol) Mockito.doAnswer(new Answer<Void>() { // from class: com.datatorrent.stram.StramRecoveryTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m8answer(InvocationOnMock invocationOnMock) {
                StramRecoveryTest.LOG.debug("got call: " + invocationOnMock.getMethod());
                if (atomicBoolean.get()) {
                    return null;
                }
                try {
                    atomicBoolean.set(true);
                    Thread.sleep(1000L);
                    return null;
                } catch (Exception e) {
                    return null;
                }
            }
        }).when(streamingContainerUmbilicalProtocol)).log("containerId", "timeout");
        RPC.Server build = new RPC.Builder(configuration).setProtocol(StreamingContainerUmbilicalProtocol.class).setInstance(streamingContainerUmbilicalProtocol).setBindAddress("0.0.0.0").setPort(0).setNumHandlers(1).setVerbose(false).build();
        build.start();
        InetSocketAddress connectAddress = NetUtils.getConnectAddress(build);
        LOG.info("Mock server listening at " + connectAddress);
        FSRecoveryHandler fSRecoveryHandler = new FSRecoveryHandler(str, configuration);
        fSRecoveryHandler.writeConnectUri(RecoverableRpcProxy.toConnectURI(connectAddress, 500, 100, 500).toString());
        RecoverableRpcProxy recoverableRpcProxy = new RecoverableRpcProxy(str, configuration);
        StreamingContainerUmbilicalProtocol proxy = recoverableRpcProxy.getProxy();
        proxy.log("containerId", "msg");
        try {
            proxy.log("containerId", "timeout");
            Assert.fail("expected socket timeout");
        } catch (SocketTimeoutException e) {
        }
        Assert.assertTrue("timedout", atomicBoolean.get());
        recoverableRpcProxy.close();
        atomicBoolean.set(false);
        fSRecoveryHandler.writeConnectUri(RecoverableRpcProxy.toConnectURI(connectAddress, 500, 100, 1500).toString());
        proxy.log("containerId", "timeout");
        Assert.assertTrue("timedout", atomicBoolean.get());
        recoverableRpcProxy.close();
        String property = System.getProperty("com.datatorrent.stram.rpc.timeout");
        String property2 = System.getProperty("com.datatorrent.stram.rpc.delay.timeout");
        String property3 = System.getProperty("com.datatorrent.stram.rpc.retry.timeout");
        System.setProperty("com.datatorrent.stram.rpc.timeout", Integer.toString(500));
        System.setProperty("com.datatorrent.stram.rpc.delay.timeout", Long.toString(100L));
        System.setProperty("com.datatorrent.stram.rpc.retry.timeout", Long.toString(500L));
        atomicBoolean.set(false);
        fSRecoveryHandler.writeConnectUri(RecoverableRpcProxy.toConnectURI(connectAddress).toString());
        RecoverableRpcProxy recoverableRpcProxy2 = new RecoverableRpcProxy(str, configuration);
        StreamingContainerUmbilicalProtocol proxy2 = recoverableRpcProxy2.getProxy();
        proxy2.log("containerId", "msg");
        try {
            proxy2.log("containerId", "timeout");
            Assert.fail("expected socket timeout");
        } catch (SocketTimeoutException e2) {
        }
        Assert.assertTrue("timedout", atomicBoolean.get());
        recoverableRpcProxy2.close();
        atomicBoolean.set(false);
        System.setProperty("com.datatorrent.stram.rpc.retry.timeout", Long.toString(1500L));
        fSRecoveryHandler.writeConnectUri(RecoverableRpcProxy.toConnectURI(connectAddress).toString());
        proxy2.log("containerId", "timeout");
        Assert.assertTrue("timedout", atomicBoolean.get());
        restoreSystemProperty("com.datatorrent.stram.rpc.timeout", property);
        restoreSystemProperty("com.datatorrent.stram.rpc.delay.timeout", property2);
        restoreSystemProperty("com.datatorrent.stram.rpc.retry.timeout", property3);
        build.stop();
    }

    private static String restoreSystemProperty(String str, String str2) {
        return str2 == null ? System.clearProperty(str) : System.setProperty(str, str2);
    }
}
