package org.apache.hama.bsp.message;

import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.Counters;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.queue.DiskTransferProtocolQueue;
import org.apache.hama.bsp.message.queue.MemoryTransferProtocol;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.message.queue.MessageTransferQueue;
import org.apache.hama.util.BSPNetUtils;

/* loaded from: input_file:org/apache/hama/bsp/message/TestHadoopMessageManager.class */
public class TestHadoopMessageManager extends TestCase {
    public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue";
    public static volatile int increment = 1;

    public void testMemoryMessaging() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setClass("hama.messenger.xfer.queue.class", MemoryTransferProtocol.class, MessageTransferQueue.class);
        configuration.set("bsp.disk.queue.dir", "/tmp/messageQueue");
        messagingInternal(configuration);
    }

    public void testDiskMessaging() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("bsp.disk.queue.dir", "/tmp/messageQueue");
        configuration.setClass("hama.messenger.xfer.queue.class", DiskTransferProtocolQueue.class, MessageTransferQueue.class);
        messagingInternal(configuration);
    }

    private static void messagingInternal(Configuration configuration) throws Exception {
        configuration.set("hama.messenger.class", "org.apache.hama.bsp.message.HadoopMessageManagerImpl");
        MessageManager messageManager = MessageManagerFactory.getMessageManager(configuration);
        assertTrue(messageManager instanceof HadoopMessageManagerImpl);
        String canonicalHostname = BSPNetUtils.getCanonicalHostname();
        int freePort = BSPNetUtils.getFreePort();
        int i = increment;
        increment = i + 1;
        InetSocketAddress inetSocketAddress = new InetSocketAddress(canonicalHostname, freePort + i);
        messageManager.init(new TaskAttemptID("1", 1, 1, 1), new BSPPeerImpl(configuration, FileSystem.get(configuration), new Counters()), configuration, inetSocketAddress);
        messageManager.send(inetSocketAddress.getHostName() + ":" + inetSocketAddress.getPort(), new IntWritable(1337));
        Map.Entry entry = (Map.Entry) messageManager.getMessageIterator().next();
        assertEquals(entry.getKey(), inetSocketAddress);
        assertTrue(((MessageQueue) entry.getValue()).size() == 1);
        BSPMessageBundle bSPMessageBundle = new BSPMessageBundle();
        Iterator it = ((MessageQueue) entry.getValue()).iterator();
        while (it.hasNext()) {
            bSPMessageBundle.addMessage((IntWritable) it.next());
        }
        messageManager.transfer(inetSocketAddress, bSPMessageBundle);
        messageManager.clearOutgoingQueues();
        assertTrue(messageManager.getNumCurrentMessages() == 1);
        assertEquals(messageManager.getCurrentMessage().get(), 1337);
        messageManager.close();
    }
}
