package org.apache.hama.bsp;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.Counters;
import org.apache.hama.bsp.TaskStatus;
import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
import org.apache.hama.bsp.ft.FaultTolerantPeerService;
import org.apache.hama.bsp.message.MessageEventListener;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.sync.BSPPeerSyncClient;
import org.apache.hama.bsp.sync.PeerSyncClient;
import org.apache.hama.bsp.sync.SyncEvent;
import org.apache.hama.bsp.sync.SyncEventListener;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.util.BSPNetUtils;
import org.apache.hama.util.KeyValuePair;

/* loaded from: input_file:org/apache/hama/bsp/TestCheckpoint.class */
public class TestCheckpoint extends TestCase {
    public static final Log LOG = LogFactory.getLog(TestCheckpoint.class);
    static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";

    /* loaded from: input_file:org/apache/hama/bsp/TestCheckpoint$TempSyncClient.class */
    public static class TempSyncClient extends BSPPeerSyncClient {
        Map<String, Writable> valueMap = new HashMap();

        public String constructKey(BSPJobID bSPJobID, String... strArr) {
            StringBuffer stringBuffer = new StringBuffer(100);
            stringBuffer.append(bSPJobID.toString()).append("/");
            for (String str : strArr) {
                stringBuffer.append(str).append("/");
            }
            return stringBuffer.toString();
        }

        public boolean storeInformation(String str, Writable writable, boolean z, SyncEventListener syncEventListener) {
            ArrayWritable arrayWritable = (ArrayWritable) writable;
            TestCheckpoint.LOG.info("SyncClient Storing value step = " + arrayWritable.get()[0].get() + " count = " + arrayWritable.get()[1].get() + " for key " + str);
            this.valueMap.put(str, writable);
            return true;
        }

        public boolean getInformation(String str, Writable writable) {
            TestCheckpoint.LOG.info("Getting value for key " + str);
            if (!this.valueMap.containsKey(str)) {
                return false;
            }
            Writable writable2 = this.valueMap.get(str);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
            try {
                try {
                    writable2.write(dataOutputStream);
                    dataOutputStream.flush();
                    writable.readFields(new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())));
                    try {
                        byteArrayOutputStream.close();
                        dataOutputStream.close();
                    } catch (IOException e) {
                        TestCheckpoint.LOG.error("Error closing byte stream.", e);
                    }
                    return true;
                } catch (IOException e2) {
                    TestCheckpoint.LOG.error("Error writing data to write buffer.", e2);
                    try {
                        byteArrayOutputStream.close();
                        dataOutputStream.close();
                        return false;
                    } catch (IOException e3) {
                        TestCheckpoint.LOG.error("Error closing byte stream.", e3);
                        return false;
                    }
                }
            } catch (Throwable th) {
                try {
                    byteArrayOutputStream.close();
                    dataOutputStream.close();
                } catch (IOException e4) {
                    TestCheckpoint.LOG.error("Error closing byte stream.", e4);
                }
                throw th;
            }
        }

        public boolean addKey(String str, boolean z, SyncEventListener syncEventListener) {
            this.valueMap.put(str, NullWritable.get());
            return true;
        }

        public boolean hasKey(String str) {
            return this.valueMap.containsKey(str);
        }

        public String[] getChildKeySet(String str, SyncEventListener syncEventListener) {
            ArrayList arrayList = new ArrayList();
            for (String str2 : this.valueMap.keySet()) {
                if (str2.startsWith(str + "/")) {
                    arrayList.add(str2);
                }
            }
            String[] strArr = new String[arrayList.size()];
            arrayList.toArray(strArr);
            return strArr;
        }

        public boolean registerListener(String str, SyncEvent syncEvent, SyncEventListener syncEventListener) {
            return false;
        }

        public boolean remove(String str, SyncEventListener syncEventListener) {
            this.valueMap.remove(str);
            return false;
        }

        public void init(Configuration configuration, BSPJobID bSPJobID, TaskAttemptID taskAttemptID) throws Exception {
        }

        public void enterBarrier(BSPJobID bSPJobID, TaskAttemptID taskAttemptID, long j) throws SyncException {
            TestCheckpoint.LOG.info("Enter barrier called - " + j);
        }

        public void leaveBarrier(BSPJobID bSPJobID, TaskAttemptID taskAttemptID, long j) throws SyncException {
            TestCheckpoint.LOG.info("Exit barrier called - " + j);
        }

        public void register(BSPJobID bSPJobID, TaskAttemptID taskAttemptID, String str, long j) {
        }

        public String[] getAllPeerNames(TaskAttemptID taskAttemptID) {
            return null;
        }

        public void deregisterFromBarrier(BSPJobID bSPJobID, TaskAttemptID taskAttemptID, String str, long j) {
        }

        public void stopServer() {
        }

        public void close() throws IOException {
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/TestCheckpoint$TestBSPPeer.class */
    public static class TestBSPPeer implements BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, Text> {
        Configuration conf;
        long superstepCount;
        FaultTolerantPeerService<Text> fService;

        public TestBSPPeer(BSPJob bSPJob, Configuration configuration, TaskAttemptID taskAttemptID, Counters counters, long j, BSPPeerSyncClient bSPPeerSyncClient, MessageManager<Text> messageManager, TaskStatus.State state) {
            this.conf = configuration;
            if (j > 0) {
                this.superstepCount = j;
            } else {
                this.superstepCount = 0L;
            }
            try {
                this.fService = new AsyncRcvdMsgCheckpointImpl().constructPeerFaultTolerance(bSPJob, this, bSPPeerSyncClient, (InetSocketAddress) null, taskAttemptID, j, configuration, messageManager);
                this.fService.onPeerInitialized(state);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void send(String str, Text text) throws IOException {
        }

        /* renamed from: getCurrentMessage, reason: merged with bridge method [inline-methods] */
        public Text m8getCurrentMessage() throws IOException {
            return new Text("data");
        }

        public int getNumCurrentMessages() {
            return 1;
        }

        public void sync() throws IOException, SyncException, InterruptedException {
            this.superstepCount++;
            try {
                this.fService.afterBarrier();
            } catch (Exception e) {
                e.printStackTrace();
            }
            TestCheckpoint.LOG.info("After barrier " + this.superstepCount);
        }

        public long getSuperstepCount() {
            return this.superstepCount;
        }

        public String getPeerName() {
            return null;
        }

        public String getPeerName(int i) {
            return null;
        }

        public int getPeerIndex() {
            return 1;
        }

        public String[] getAllPeerNames() {
            return null;
        }

        public int getNumPeers() {
            return 0;
        }

        public void clear() {
        }

        public void write(NullWritable nullWritable, NullWritable nullWritable2) throws IOException {
        }

        public boolean readNext(NullWritable nullWritable, NullWritable nullWritable2) throws IOException {
            return false;
        }

        public KeyValuePair<NullWritable, NullWritable> readNext() throws IOException {
            return null;
        }

        public void reopenInput() throws IOException {
        }

        public Configuration getConfiguration() {
            return null;
        }

        public Counters.Counter getCounter(Enum<?> r3) {
            return null;
        }

        public Counters.Counter getCounter(String str, String str2) {
            return null;
        }

        public void incrementCounter(Enum<?> r2, long j) {
        }

        public void incrementCounter(String str, String str2, long j) {
        }

        public long getSplitSize() {
            return 0L;
        }

        public long getPos() throws IOException {
            return 0L;
        }

        public TaskAttemptID getTaskId() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/TestCheckpoint$TestMessageManager.class */
    public static class TestMessageManager implements MessageManager<Text> {
        List<Text> messageQueue = new ArrayList();
        BSPMessageBundle<Text> loopbackBundle = new BSPMessageBundle<>();
        Iterator<Text> iter = null;
        MessageEventListener<Text> listener;

        public void init(TaskAttemptID taskAttemptID, BSPPeer<?, ?, ?, ?, Text> bSPPeer, Configuration configuration, InetSocketAddress inetSocketAddress) {
        }

        public void close() {
        }

        /* renamed from: getCurrentMessage, reason: merged with bridge method [inline-methods] */
        public Text m9getCurrentMessage() throws IOException {
            if (this.iter == null) {
                this.iter = this.messageQueue.iterator();
            }
            if (this.iter.hasNext()) {
                return this.iter.next();
            }
            return null;
        }

        public void send(String str, Text text) throws IOException {
        }

        public void finishSendPhase() throws IOException {
        }

        public Iterator<Map.Entry<InetSocketAddress, MessageQueue<Text>>> getMessageIterator() {
            return null;
        }

        public void transfer(InetSocketAddress inetSocketAddress, BSPMessageBundle<Text> bSPMessageBundle) throws IOException {
        }

        public void clearOutgoingQueues() {
        }

        public int getNumCurrentMessages() {
            return this.messageQueue.size();
        }

        public BSPMessageBundle<Text> getLoopbackBundle() {
            return this.loopbackBundle;
        }

        public void addMessage(Text text) throws IOException {
            this.messageQueue.add(text);
            this.listener.onMessageReceived(text);
        }

        public void loopBackMessages(BSPMessageBundle<? extends Writable> bSPMessageBundle) {
            this.loopbackBundle = bSPMessageBundle;
        }

        public void loopBackMessage(Writable writable) {
        }

        public void registerListener(MessageEventListener<Text> messageEventListener) throws IOException {
            this.listener = messageEventListener;
        }

        public InetSocketAddress getListenerAddress() {
            return null;
        }
    }

    private static void checkSuperstepMsgCount(PeerSyncClient peerSyncClient, BSPPeer bSPPeer, BSPJob bSPJob, long j, long j2) {
        ArrayWritable arrayWritable = new ArrayWritable(LongWritable.class);
        assertTrue(peerSyncClient.getInformation(peerSyncClient.constructKey(bSPJob.getJobID(), new String[]{"checkpoint", "" + bSPPeer.getPeerIndex()}), arrayWritable));
        LongWritable longWritable = arrayWritable.get()[0];
        LongWritable longWritable2 = arrayWritable.get()[1];
        assertEquals(j, longWritable.get());
        assertEquals(j2, longWritable2.get());
    }

    public void testCheckpointInterval() throws Exception {
        Configuration configuration = new Configuration();
        System.setProperty("user.dir", "/tmp");
        configuration.set("hama.sync.peer.class", TempSyncClient.class.getName());
        configuration.set("bsp.ft.class", AsyncRcvdMsgCheckpointImpl.class.getName());
        configuration.setBoolean("bsp.ft.enabled", true);
        configuration.setBoolean("bsp.checkpoint.enabled", true);
        configuration.setInt("bsp.checkpoint.interval", 2);
        configuration.set("bsp.output.dir", "/tmp/hama-test_out");
        configuration.set("bsp.local.dir", "/tmp/hama-test");
        FileSystem fileSystem = FileSystem.get(configuration);
        BSPJob bSPJob = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
        TaskAttemptID taskAttemptID = new TaskAttemptID(new TaskID(bSPJob.getJobID(), 1), 1);
        TestMessageManager testMessageManager = new TestMessageManager();
        BSPPeerSyncClient peerSyncClient = SyncServiceFactory.getPeerSyncClient(configuration);
        TestBSPPeer testBSPPeer = new TestBSPPeer(bSPJob, configuration, taskAttemptID, new Counters(), -1L, peerSyncClient, testMessageManager, TaskStatus.State.RUNNING);
        assertNotNull("BSPPeerImpl should not be null.", testBSPPeer);
        LOG.info("Created bsp peer and other parameters");
        LOG.info("Got port = " + BSPNetUtils.getFreePort(12502));
        assertFalse(peerSyncClient.getInformation(peerSyncClient.constructKey(bSPJob.getJobID(), new String[]{"checkpoint", "" + testBSPPeer.getPeerIndex()}), new ArrayWritable(LongWritable.class)));
        testBSPPeer.sync();
        checkSuperstepMsgCount(peerSyncClient, testBSPPeer, bSPJob, 1L, 0L);
        Text text = new Text("data");
        testMessageManager.addMessage(text);
        testBSPPeer.sync();
        checkSuperstepMsgCount(peerSyncClient, testBSPPeer, bSPJob, 1L, 0L);
        testMessageManager.addMessage(text);
        testBSPPeer.sync();
        checkSuperstepMsgCount(peerSyncClient, testBSPPeer, bSPJob, 3L, 1L);
        testBSPPeer.sync();
        checkSuperstepMsgCount(peerSyncClient, testBSPPeer, bSPJob, 3L, 1L);
        testMessageManager.addMessage(text);
        testMessageManager.addMessage(text);
        testBSPPeer.sync();
        checkSuperstepMsgCount(peerSyncClient, testBSPPeer, bSPJob, 5L, 2L);
        testBSPPeer.sync();
        checkSuperstepMsgCount(peerSyncClient, testBSPPeer, bSPJob, 5L, 2L);
        fileSystem.delete(new Path("checkpoint"), true);
    }

    public void testCheckpoint() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("hama.sync.peer.class", TempSyncClient.class.getName());
        configuration.setBoolean("bsp.ft.enabled", true);
        configuration.set("bsp.ft.class", AsyncRcvdMsgCheckpointImpl.class.getName());
        configuration.setBoolean("bsp.checkpoint.enabled", true);
        int freePort = BSPNetUtils.getFreePort(12502);
        LOG.info("Got port = " + freePort);
        configuration.set("bsp.peer.hostname", "0.0.0.0");
        configuration.setInt("bsp.peer.port", freePort);
        configuration.set("bsp.output.dir", "/tmp/hama-test_out");
        configuration.set("bsp.local.dir", "/tmp/hama-test");
        FileSystem fileSystem = FileSystem.get(configuration);
        BSPJob bSPJob = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
        TaskAttemptID taskAttemptID = new TaskAttemptID(new TaskID(bSPJob.getJobID(), 1), 1);
        TestMessageManager testMessageManager = new TestMessageManager();
        BSPPeerSyncClient peerSyncClient = SyncServiceFactory.getPeerSyncClient(configuration);
        TestBSPPeer testBSPPeer = new TestBSPPeer(bSPJob, configuration, taskAttemptID, new Counters(), -1L, peerSyncClient, testMessageManager, TaskStatus.State.RUNNING);
        assertNotNull("BSPPeerImpl should not be null.", testBSPPeer);
        LOG.info("Created bsp peer and other parameters");
        testBSPPeer.sync();
        LOG.info("Completed first sync.");
        checkSuperstepMsgCount(peerSyncClient, testBSPPeer, bSPJob, 1L, 0L);
        testMessageManager.addMessage(new Text("data"));
        testBSPPeer.sync();
        LOG.info("Completed second sync.");
        checkSuperstepMsgCount(peerSyncClient, testBSPPeer, bSPJob, 2L, 1L);
        FSDataInputStream open = fileSystem.open(new Path("checkpoint/job_checkpttest_0001/2/1"));
        Text text = (Text) ReflectionUtils.newInstance(Class.forName(open.readUTF()), configuration);
        text.readFields(open);
        assertEquals("data", text.toString());
        fileSystem.delete(new Path("checkpoint"), true);
    }

    public void testPeerRecovery() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("hama.sync.peer.class", TempSyncClient.class.getName());
        configuration.set("bsp.ft.class", AsyncRcvdMsgCheckpointImpl.class.getName());
        configuration.setBoolean("bsp.checkpoint.enabled", true);
        int freePort = BSPNetUtils.getFreePort(12502);
        LOG.info("Got port = " + freePort);
        configuration.set("bsp.peer.hostname", "0.0.0.0");
        configuration.setInt("bsp.peer.port", freePort);
        configuration.set("bsp.output.dir", "/tmp/hama-test_out");
        configuration.set("bsp.local.dir", "/tmp/hama-test");
        FileSystem fileSystem = FileSystem.get(configuration);
        BSPJob bSPJob = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
        TaskAttemptID taskAttemptID = new TaskAttemptID(new TaskID(bSPJob.getJobID(), 1), 1);
        TestMessageManager testMessageManager = new TestMessageManager();
        BSPPeerSyncClient peerSyncClient = SyncServiceFactory.getPeerSyncClient(configuration);
        Text text = new Text("data");
        Writable[] writableArr = {new LongWritable(3L), new LongWritable(5L)};
        ArrayWritable arrayWritable = new ArrayWritable(LongWritable.class);
        arrayWritable.set(writableArr);
        peerSyncClient.storeInformation("job_checkpttest_0001/checkpoint/1/", arrayWritable, true, (SyncEventListener) null);
        FSDataOutputStream create = fileSystem.create(new Path("checkpoint/job_checkpttest_0001/3/1"));
        for (int i = 0; i < 5; i++) {
            create.writeUTF(text.getClass().getCanonicalName());
            text.write(create);
        }
        create.close();
        new TestBSPPeer(bSPJob, configuration, taskAttemptID, new Counters(), 3L, peerSyncClient, testMessageManager, TaskStatus.State.RECOVERING);
        BSPMessageBundle<Text> loopbackBundle = testMessageManager.getLoopbackBundle();
        assertEquals(5, loopbackBundle.getMessages().size());
        assertEquals(((Text) loopbackBundle.getMessages().get(0)).toString(), "data");
        fileSystem.delete(new Path("checkpoint"), true);
    }
}
