package org.apache.hama.bsp;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncEventListener;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
import org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl;
import org.apache.hama.bsp.sync.ZooKeeperSyncServerImpl;
import org.apache.hama.util.BSPNetUtils;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.mortbay.log.Log;

/* loaded from: input_file:org/apache/hama/bsp/TestZooKeeper.class */
public class TestZooKeeper extends TestCase {
    private HamaConfiguration configuration = new HamaConfiguration();

    public TestZooKeeper() {
        System.setProperty("user.dir", "/tmp");
        this.configuration.set("bsp.master.address", "localhost");
        assertEquals("Make sure master addr is set to localhost:", "localhost", this.configuration.get("bsp.master.address"));
        this.configuration.set("bsp.local.dir", "/tmp/hama-test");
        this.configuration.set("bsp.output.dir", "/tmp/hama-test_out");
        this.configuration.set("hama.zookeeper.quorum", "localhost");
        this.configuration.setInt("hama.zookeeper.property.clientPort", BSPNetUtils.getFreePort(20000));
        this.configuration.set("hama.sync.client.class", ZooKeeperSyncClientImpl.class.getCanonicalName());
    }

    @Test
    public void testClearZKNodes() throws IOException, KeeperException, InterruptedException {
        final ZooKeeperSyncServerImpl zooKeeperSyncServerImpl = new ZooKeeperSyncServerImpl();
        boolean z = false;
        try {
            try {
                zooKeeperSyncServerImpl.init(this.configuration);
                ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
                newCachedThreadPool.submit(new Runnable() { // from class: org.apache.hama.bsp.TestZooKeeper.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            zooKeeperSyncServerImpl.start();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
                newCachedThreadPool.awaitTermination(10L, TimeUnit.SECONDS);
                ZooKeeperSyncClientImpl peerSyncClient = SyncServiceFactory.getPeerSyncClient(this.configuration);
                ZKSyncBSPMasterClient masterSyncClient = SyncServiceFactory.getMasterSyncClient(this.configuration);
                masterSyncClient.init(this.configuration);
                Thread.sleep(100L);
                Log.info("Created master and client sync clients");
                assertTrue(masterSyncClient.hasKey("/bsp"));
                Log.info("BSP root exists");
                BSPJobID bSPJobID = new BSPJobID("test1", 1);
                masterSyncClient.registerJob(bSPJobID.toString());
                TaskID taskID = new TaskID(bSPJobID, 1);
                TaskID taskID2 = new TaskID(bSPJobID, 2);
                TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, 1);
                TaskAttemptID taskAttemptID2 = new TaskAttemptID(taskID2, 1);
                this.configuration.setInt("bsp.peer.port", BSPNetUtils.getFreePort(21815));
                peerSyncClient.init(this.configuration, bSPJobID, taskAttemptID);
                peerSyncClient.registerTask(bSPJobID, "hamanode1", 5000L, taskAttemptID);
                peerSyncClient.registerTask(bSPJobID, "hamanode2", 5000L, taskAttemptID2);
                peerSyncClient.storeInformation(peerSyncClient.constructKey(bSPJobID, new String[]{"info", "level2"}), new IntWritable(5), true, (SyncEventListener) null);
                String[] allPeerNames = peerSyncClient.getAllPeerNames(taskAttemptID);
                Log.info("Found child count = " + allPeerNames.length);
                assertEquals(2, allPeerNames.length);
                Log.info("Passed the child count test");
                masterSyncClient.addKey(masterSyncClient.constructKey(bSPJobID, new String[]{"peer", "1"}), true, (SyncEventListener) null);
                masterSyncClient.addKey(masterSyncClient.constructKey(bSPJobID, new String[]{"peer", "2"}), true, (SyncEventListener) null);
                String[] childKeySet = masterSyncClient.getChildKeySet(masterSyncClient.constructKey(bSPJobID, new String[]{"peer"}), (SyncEventListener) null);
                Log.info("Found child count = " + childKeySet.length);
                assertEquals(2, childKeySet.length);
                Log.info(" Peer name " + childKeySet[0]);
                Log.info(" Peer name " + childKeySet[1]);
                Log.info("Passed the child key set test");
                masterSyncClient.deregisterJob(bSPJobID.toString());
                Log.info(masterSyncClient.constructKey(bSPJobID, new String[0]));
                Thread.sleep(200L);
                assertEquals(false, masterSyncClient.hasKey(masterSyncClient.constructKey(bSPJobID, new String[0])));
                Log.info("Passed the key presence test");
                assertEquals(false, masterSyncClient.getInformation(masterSyncClient.constructKey(bSPJobID, new String[]{"info", "level3"}), new IntWritable()));
                Writable[] writableArr = {new LongWritable(3L), new LongWritable(5L)};
                ArrayWritable arrayWritable = new ArrayWritable(LongWritable.class);
                arrayWritable.set(writableArr);
                masterSyncClient.storeInformation(masterSyncClient.constructKey(bSPJobID, new String[]{"info", "level3"}), arrayWritable, true, (SyncEventListener) null);
                ArrayWritable arrayWritable2 = new ArrayWritable(LongWritable.class);
                assertTrue(masterSyncClient.getInformation(masterSyncClient.constructKey(bSPJobID, new String[]{"info", "level3"}), arrayWritable2));
                assertEquals(arrayWritable.get()[0], arrayWritable2.get()[0]);
                assertEquals(arrayWritable.get()[1], arrayWritable2.get()[1]);
                Log.info("Passed array writable test");
                z = true;
                zooKeeperSyncServerImpl.stopServer();
            } catch (Exception e) {
                e.printStackTrace();
                zooKeeperSyncServerImpl.stopServer();
            }
            assertEquals(true, z);
        } catch (Throwable th) {
            zooKeeperSyncServerImpl.stopServer();
            throw th;
        }
    }
}
