package org.apache.hama.bsp;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.HamaTestCase;
import org.apache.hama.bsp.LocalBSPRunner;
import org.apache.hama.bsp.sync.SyncClient;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.util.BSPNetUtils;

/* loaded from: input_file:org/apache/hama/bsp/TestBSPTaskFaults.class */
public class TestBSPTaskFaults extends TestCase {
    public static final String TEST_POINT = "bsp.ft.test.point";
    public static final String TEST_GROOM_PORT = "bsp.ft.test.groomport";
    private volatile MinimalGroomServer groom;
    private volatile BSPPeerProtocol umbilical;
    private Server workerServer;
    private TaskAttemptID taskid = new TaskAttemptID(new TaskID(new BSPJobID("job_201110302255", 1), 1), 1);
    public volatile HamaConfiguration conf;
    private ScheduledExecutorService testBSPTaskService;
    public static final Log LOG = LogFactory.getLog(HamaTestCase.class);
    private static int TEST_NUMBER = 0;

    /* loaded from: input_file:org/apache/hama/bsp/TestBSPTaskFaults$FaulTestBSP.class */
    private static class FaulTestBSP extends BSP<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> {
        private FaulTestBSP() {
        }

        public void setup(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> bSPPeer) throws IOException, SyncException, InterruptedException {
            if (bSPPeer.getConfiguration().getInt(TestBSPTaskFaults.TEST_POINT, 0) == 1) {
                throw new RuntimeException("Error injected in setup");
            }
            Thread.sleep(500L);
            super.setup(bSPPeer);
            TestBSPTaskFaults.LOG.info("Succesfully completed setup for bsp.");
        }

        public void cleanup(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> bSPPeer) throws IOException {
            if (bSPPeer.getConfiguration().getInt(TestBSPTaskFaults.TEST_POINT, 0) == 3) {
                throw new RuntimeException("Error injected in cleanup");
            }
            try {
                Thread.sleep(500L);
            } catch (Exception e) {
                TestBSPTaskFaults.LOG.error("Interrupted BSP thread.", e);
            }
            super.cleanup(bSPPeer);
            TestBSPTaskFaults.LOG.info("Succesfully cleaned up after bsp.");
        }

        public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable> bSPPeer) throws IOException, SyncException, InterruptedException {
            if (bSPPeer.getConfiguration().getInt(TestBSPTaskFaults.TEST_POINT, 0) == 2) {
                throw new RuntimeException("Error injected in bsp function");
            }
            Thread.sleep(500L);
            TestBSPTaskFaults.LOG.info("Succesfully completed bsp.");
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/TestBSPTaskFaults$MinimalGroomServer.class */
    public static class MinimalGroomServer implements BSPPeerProtocol {
        private volatile int pingCount;
        private volatile long firstPingTime;
        private volatile long lastPingTime;
        private boolean isShutDown = false;
        private boolean taskComplete = false;
        private boolean errorCondition = false;
        private Configuration conf;

        public MinimalGroomServer(Configuration configuration) throws IOException {
            this.conf = configuration;
        }

        public long getProtocolVersion(String str, long j) throws IOException {
            return 1L;
        }

        public void close() throws IOException {
            this.isShutDown = true;
        }

        public Task getTask(TaskAttemptID taskAttemptID) throws IOException {
            return new BSPTask();
        }

        public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
            TestBSPTaskFaults.LOG.error("Pinged");
            this.pingCount++;
            if (this.pingCount == 1) {
                this.firstPingTime = System.currentTimeMillis();
            }
            this.lastPingTime = System.currentTimeMillis();
            return true;
        }

        public void done(TaskAttemptID taskAttemptID) throws IOException {
            this.taskComplete = true;
        }

        public void fsError(TaskAttemptID taskAttemptID, String str) throws IOException {
            this.errorCondition = true;
        }

        public void fatalError(TaskAttemptID taskAttemptID, String str) throws IOException {
            this.errorCondition = true;
        }

        public boolean statusUpdate(TaskAttemptID taskAttemptID, TaskStatus taskStatus) throws IOException, InterruptedException {
            return true;
        }

        public int getAssignedPortNum(TaskAttemptID taskAttemptID) {
            return 0;
        }

        public synchronized int getPingCount() {
            return this.pingCount;
        }

        public void setPingCount(int i) {
            this.pingCount = i;
            if (i == 0) {
                this.firstPingTime = 0L;
                this.lastPingTime = 0L;
            }
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/TestBSPTaskFaults$TestBSPProcessRunner.class */
    public static class TestBSPProcessRunner implements Callable<Integer> {
        private final ScheduledExecutorService sched = Executors.newScheduledThreadPool(1);
        private final AtomicReference<ScheduledFuture<Integer>> future = new AtomicReference<>();
        private Process bspTaskProcess = null;
        private Thread errorLog;
        private Thread infoLog;
        private int testPoint;
        private int testPort;

        TestBSPProcessRunner(int i, int i2) {
            this.testPoint = i;
            this.testPort = i2;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void readStream(InputStream inputStream) throws IOException {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    return;
                } else {
                    TestBSPTaskFaults.LOG.info(readLine);
                }
            }
        }

        public void startBSPProcess() {
            this.future.set(this.sched.schedule(this, 0L, TimeUnit.SECONDS));
            TestBSPTaskFaults.LOG.debug("Start building BSPPeer process.");
        }

        public int getBSPExitCode() {
            try {
                return this.future.get().get().intValue();
            } catch (Exception e) {
                TestBSPTaskFaults.LOG.error("Error while fetching exit status from BSPTask", e);
                return -1;
            }
        }

        public void destroyProcess() {
            this.bspTaskProcess.destroy();
            this.sched.shutdown();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() throws Exception {
            String property = System.getProperty("path.separator");
            ArrayList arrayList = new ArrayList();
            String absolutePath = new File(".").getAbsolutePath();
            arrayList.add(new File(new File(System.getProperty("java.home"), "bin"), "java").toString());
            StringBuffer stringBuffer = new StringBuffer();
            stringBuffer.append(System.getProperty("java.class.path"));
            stringBuffer.append(property);
            stringBuffer.append(new File(absolutePath, "core/target/test-classes"));
            stringBuffer.append(property);
            stringBuffer.append(absolutePath);
            arrayList.add("-classpath");
            arrayList.add(stringBuffer.toString());
            arrayList.add(TestBSPProcessRunner.class.getName());
            TestBSPTaskFaults.LOG.info("starting process for failure case - " + this.testPoint);
            arrayList.add("" + this.testPoint);
            arrayList.add("" + this.testPort);
            TestBSPTaskFaults.LOG.info(arrayList.toString());
            try {
                this.bspTaskProcess = new ProcessBuilder(arrayList).start();
                this.errorLog = new Thread() { // from class: org.apache.hama.bsp.TestBSPTaskFaults.TestBSPProcessRunner.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            TestBSPProcessRunner.readStream(TestBSPProcessRunner.this.bspTaskProcess.getErrorStream());
                        } catch (Exception e) {
                        }
                    }
                };
                this.errorLog.start();
                this.infoLog = new Thread() { // from class: org.apache.hama.bsp.TestBSPTaskFaults.TestBSPProcessRunner.2
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            TestBSPProcessRunner.readStream(TestBSPProcessRunner.this.bspTaskProcess.getInputStream());
                        } catch (Exception e) {
                        }
                    }
                };
                this.infoLog.start();
                return Integer.valueOf(this.bspTaskProcess.waitFor());
            } catch (Exception e) {
                TestBSPTaskFaults.LOG.error("Error getting exit code of child process", e);
                return -1;
            }
        }

        public static void main(String[] strArr) {
            HamaConfiguration hamaConfiguration = new HamaConfiguration();
            hamaConfiguration.setInt("bsp.groomserver.pingperiod", 200);
            hamaConfiguration.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
            hamaConfiguration.setClass("hama.sync.peer.class", LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
            hamaConfiguration.setInt("bsp.master.port", 610002);
            TaskAttemptID taskAttemptID = new TaskAttemptID(new TaskID(new BSPJobID("job_201110102255", 1), 1), 1);
            hamaConfiguration.setInt(TestBSPTaskFaults.TEST_POINT, Integer.parseInt(strArr[0]));
            int parseInt = Integer.parseInt(strArr[1]);
            try {
                BSPJob bSPJob = new BSPJob(hamaConfiguration);
                bSPJob.setInputFormat(NullInputFormat.class);
                bSPJob.setOutputFormat(NullOutputFormat.class);
                final BSPPeerProtocol proxy = RPC.getProxy(BSPPeerProtocol.class, 1L, new InetSocketAddress("127.0.0.1", parseInt), hamaConfiguration);
                BSPTask bSPTask = new BSPTask();
                bSPTask.setConf(bSPJob);
                TestBSPTaskFaults.LOG.info("Testing failure case in process - " + hamaConfiguration.getInt(TestBSPTaskFaults.TEST_POINT, 0));
                Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.hama.bsp.TestBSPTaskFaults.TestBSPProcessRunner.3
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            proxy.close();
                        } catch (Exception e) {
                        }
                    }
                });
                bSPTask.run(bSPJob, new BSPPeerImpl(bSPJob, hamaConfiguration, taskAttemptID, proxy, 0, (String) null, (BytesWritable) null, new Counters()), proxy);
            } catch (Exception e) {
                TestBSPTaskFaults.LOG.error("Error in bsp child process.", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/TestBSPTaskFaults$TestBSPTaskThreadRunner.class */
    private class TestBSPTaskThreadRunner extends Thread {
        BSPJob job;

        TestBSPTaskThreadRunner(BSPJob bSPJob) {
            this.job = bSPJob;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            BSPTask bSPTask = new BSPTask();
            bSPTask.setConf(this.job);
            try {
                bSPTask.run(this.job, new BSPPeerImpl(this.job, TestBSPTaskFaults.this.conf, TestBSPTaskFaults.this.taskid, TestBSPTaskFaults.this.umbilical, 0, (String) null, (BytesWritable) null, new Counters()), TestBSPTaskFaults.this.umbilical);
            } catch (Exception e) {
                TestBSPTaskFaults.LOG.error("Error in BSPTask execution.", e);
            }
        }
    }

    private static synchronized int incrementTestNumber() {
        int i = TEST_NUMBER + 1;
        TEST_NUMBER = i;
        return i;
    }

    protected void setUp() throws Exception {
        super.setUp();
        this.conf = new HamaConfiguration();
        this.conf.setInt("bsp.groomserver.pingperiod", 200);
        this.conf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
        this.conf.setClass("hama.sync.peer.class", LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
        InetSocketAddress inetSocketAddress = new InetSocketAddress(BSPNetUtils.getFreePort(34321) + incrementTestNumber());
        this.groom = new MinimalGroomServer(this.conf);
        this.workerServer = RPC.getServer(this.groom, inetSocketAddress.getHostName(), inetSocketAddress.getPort(), this.conf);
        this.workerServer.start();
        LOG.info("Started RPC server");
        this.conf.setInt("bsp.groom.rpc.port", inetSocketAddress.getPort());
        this.umbilical = RPC.getProxy(BSPPeerProtocol.class, 1L, inetSocketAddress, this.conf);
        LOG.info("Started the proxy connections");
        this.testBSPTaskService = Executors.newScheduledThreadPool(1);
    }

    private int getExpectedPingCounts() {
        return (int) ((2 * (this.groom.lastPingTime - this.groom.firstPingTime)) / this.conf.getInt("bsp.groomserver.pingperiod", 5000));
    }

    private void checkIfPingTestPassed() {
        int expectedPingCounts = getExpectedPingCounts();
        LOG.info("Counted " + this.groom.pingCount + " pings and expected " + expectedPingCounts + " pings.");
        assertEquals(true, this.groom.getPingCount() >= expectedPingCounts);
    }

    public void testPing() {
        this.conf.setInt(TEST_POINT, 0);
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.testBSPTaskService);
        TestBSPProcessRunner testBSPProcessRunner = new TestBSPProcessRunner(0, this.workerServer.getListenerAddress().getPort());
        try {
            executorCompletionService.submit(testBSPProcessRunner).get(20000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.error("Interrupted Exception.", e);
        } catch (ExecutionException e2) {
            LOG.error("ExecutionException Exception.", e2);
        } catch (TimeoutException e3) {
            LOG.error("TimeoutException Exception.", e3);
        }
        checkIfPingTestPassed();
        this.groom.setPingCount(0);
        this.testBSPTaskService.shutdownNow();
        testBSPProcessRunner.destroyProcess();
    }

    public void testPingOnTaskSetupFailure() {
        LOG.info("Testing ping failure case - 1");
        this.conf.setInt(TEST_POINT, 1);
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.testBSPTaskService);
        TestBSPProcessRunner testBSPProcessRunner = new TestBSPProcessRunner(1, this.workerServer.getListenerAddress().getPort());
        try {
            executorCompletionService.submit(testBSPProcessRunner).get(20000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.error("Interrupted Exception.", e);
        } catch (ExecutionException e2) {
            LOG.error("ExecutionException Exception.", e2);
        } catch (TimeoutException e3) {
            LOG.error("TimeoutException Exception.", e3);
        }
        checkIfPingTestPassed();
        this.groom.setPingCount(0);
        this.testBSPTaskService.shutdownNow();
        testBSPProcessRunner.destroyProcess();
    }

    public void testPingOnTaskExecFailure() {
        LOG.info("Testing ping failure case - 2");
        this.conf.setInt(TEST_POINT, 2);
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.testBSPTaskService);
        TestBSPProcessRunner testBSPProcessRunner = new TestBSPProcessRunner(2, this.workerServer.getListenerAddress().getPort());
        try {
            executorCompletionService.submit(testBSPProcessRunner).get(20000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.error("Interrupted Exception.", e);
        } catch (ExecutionException e2) {
            LOG.error("ExecutionException Exception.", e2);
        } catch (TimeoutException e3) {
            LOG.error("TimeoutException Exception.", e3);
        }
        checkIfPingTestPassed();
        this.groom.setPingCount(0);
        this.testBSPTaskService.shutdownNow();
        testBSPProcessRunner.destroyProcess();
    }

    public void testPingOnTaskCleanupFailure() {
        LOG.info("Testing ping failure case - 3");
        this.conf.setInt(TEST_POINT, 3);
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.testBSPTaskService);
        TestBSPProcessRunner testBSPProcessRunner = new TestBSPProcessRunner(3, this.workerServer.getListenerAddress().getPort());
        try {
            executorCompletionService.submit(testBSPProcessRunner).get(20000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.error("Interrupted Exception.", e);
        } catch (ExecutionException e2) {
            LOG.error("ExecutionException Exception.", e2);
        } catch (TimeoutException e3) {
            LOG.error("TimeoutException Exception.", e3);
        }
        checkIfPingTestPassed();
        this.groom.setPingCount(0);
        this.testBSPTaskService.shutdownNow();
        testBSPProcessRunner.destroyProcess();
    }

    public void testBSPTaskSelfDestroy() {
        LOG.info("Testing self kill on lost contact.");
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.testBSPTaskService);
        TestBSPProcessRunner testBSPProcessRunner = new TestBSPProcessRunner(0, this.workerServer.getListenerAddress().getPort());
        Future submit = executorCompletionService.submit(testBSPProcessRunner);
        while (this.groom.pingCount == 0) {
            try {
                Thread.sleep(100L);
            } catch (Exception e) {
                LOG.error("Interrupted the timer for 1 sec.", e);
            }
        }
        this.workerServer.stop();
        this.umbilical = null;
        this.workerServer = null;
        Integer num = -1;
        try {
            num = (Integer) submit.get(20000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e2) {
            LOG.error("Interrupted Exception.", e2);
        } catch (ExecutionException e3) {
            LOG.error("ExecutionException Exception.", e3);
        } catch (TimeoutException e4) {
            LOG.error("TimeoutException Exception.", e4);
        }
        assertEquals(69, num.intValue());
        testBSPProcessRunner.destroyProcess();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        if (this.groom != null) {
            this.groom.setPingCount(0);
        }
        if (this.umbilical != null) {
            this.umbilical.close();
            Thread.sleep(2000L);
        }
        if (this.workerServer != null) {
            this.workerServer.stop();
        }
        this.testBSPTaskService.shutdownNow();
        Thread.sleep(2000L);
    }
}
