package org.apache.hama.bsp.sync;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.bsp.BSPJobID;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.TaskID;
import org.apache.hama.util.BSPNetUtils;
import org.junit.Test;

/* loaded from: input_file:org/apache/hama/bsp/sync/TestSyncServiceFactory.class */
public class TestSyncServiceFactory extends TestCase {
    public static final Log LOG = LogFactory.getLog(TestCase.class);

    /* loaded from: input_file:org/apache/hama/bsp/sync/TestSyncServiceFactory$ListenerTest.class */
    public static class ListenerTest extends ZKSyncEventListener {
        private Text value = new Text("init");

        public String getValue() {
            return this.value.toString();
        }

        public void onDelete() {
        }

        public void onChange() {
            this.LOG.info("ZK value changed event triggered.");
            this.value.set("Changed");
        }

        public void onChildKeySetChange() {
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/sync/TestSyncServiceFactory$ZKServerThread.class */
    private static class ZKServerThread implements Runnable {
        SyncServer server;

        ZKServerThread(SyncServer syncServer) {
            this.server = syncServer;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.server.start();
            } catch (Exception e) {
                TestSyncServiceFactory.LOG.error("Error running server.", e);
            }
        }
    }

    @Test
    public void testClientInstantiation() throws Exception {
        assertTrue(SyncServiceFactory.getPeerSyncClient(new Configuration()) instanceof ZooKeeperSyncClientImpl);
    }

    @Test
    public void testServerInstantiation() throws Exception {
        assertTrue(SyncServiceFactory.getSyncServer(new Configuration()) instanceof ZooKeeperSyncServerImpl);
    }

    @Test
    public void testZKSyncStore() throws Exception {
        Configuration configuration = new Configuration();
        int freePort = BSPNetUtils.getFreePort(21811);
        configuration.set("bsp.local.dir", "/tmp/hama-test");
        configuration.set("bsp.output.dir", "/tmp/hama-test_out");
        configuration.setInt("bsp.peer.port", freePort);
        configuration.set("hama.zookeeper.quorum", "localhost");
        configuration.setInt("hama.zookeeper.property.clientPort", freePort);
        configuration.set("hama.zookeeper.session.timeout", "12000");
        System.setProperty("user.dir", "/tmp");
        final SyncServer syncServer = SyncServiceFactory.getSyncServer(configuration);
        syncServer.init(configuration);
        assertTrue(syncServer instanceof ZooKeeperSyncServerImpl);
        ZKServerThread zKServerThread = new ZKServerThread(syncServer);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        newFixedThreadPool.submit(zKServerThread);
        newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS);
        PeerSyncClient peerSyncClient = SyncServiceFactory.getPeerSyncClient(configuration);
        assertTrue(peerSyncClient instanceof ZooKeeperSyncClientImpl);
        BSPJobID bSPJobID = new BSPJobID("abc", 1);
        peerSyncClient.init(configuration, bSPJobID, new TaskAttemptID(new TaskID(bSPJobID, 1), 1));
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.hama.bsp.sync.TestSyncServiceFactory.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    syncServer.stopServer();
                } catch (Exception e) {
                }
            }
        });
        IntWritable intWritable = new IntWritable(5);
        peerSyncClient.storeInformation(peerSyncClient.constructKey(bSPJobID, new String[]{String.valueOf(1L), "test"}), intWritable, true, (SyncEventListener) null);
        ListenerTest listenerTest = new ListenerTest();
        peerSyncClient.registerListener(peerSyncClient.constructKey(bSPJobID, new String[]{String.valueOf(1L), "test"}), ZKSyncEventFactory.getValueChangeEvent(), listenerTest);
        IntWritable intWritable2 = new IntWritable();
        assertTrue(peerSyncClient.getInformation(peerSyncClient.constructKey(bSPJobID, new String[]{String.valueOf(1L), "test"}), intWritable2));
        assertTrue(intWritable2.get() == intWritable.get());
        intWritable.set(6);
        peerSyncClient.storeInformation(peerSyncClient.constructKey(bSPJobID, new String[]{String.valueOf(1L), "test"}), intWritable, true, (SyncEventListener) null);
        IntWritable intWritable3 = new IntWritable();
        assertTrue(peerSyncClient.getInformation(peerSyncClient.constructKey(bSPJobID, new String[]{String.valueOf(1L), "test"}), intWritable3));
        assertTrue(intWritable3.get() == intWritable.get());
        Thread.sleep(5000L);
        assertEquals(true, listenerTest.getValue().equals("Changed"));
        syncServer.stopServer();
    }
}
