package org.apache.hama.monitor;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.monitor.Federator;

/* loaded from: input_file:org/apache/hama/monitor/TestFederator.class */
public class TestFederator extends TestCase {
    public static final Log LOG = LogFactory.getLog(TestFederator.class);
    Federator federator;
    static final int expected = 10;

    /* loaded from: input_file:org/apache/hama/monitor/TestFederator$DummyCollector.class */
    public static final class DummyCollector implements Federator.Collector {
        final AtomicInteger sum = new AtomicInteger(0);

        public DummyCollector(int i) {
            this.sum.set(i);
        }

        public Object harvest() throws Exception {
            Assert.assertEquals("Test if value is equal before harvest.", TestFederator.expected, this.sum.get());
            int incrementAndGet = this.sum.incrementAndGet();
            Thread.sleep(2000L);
            Assert.assertEquals("Test if value is equal after harvest.", 11, incrementAndGet);
            return Integer.valueOf(incrementAndGet);
        }
    }

    public void setUp() throws Exception {
        this.federator = new Federator(new HamaConfiguration());
        this.federator.start();
    }

    public void testExecutionFlow() throws Exception {
        LOG.info("Value before submitted: 10");
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        this.federator.register(new Federator.Act(new DummyCollector(expected), new Federator.CollectorHandler() { // from class: org.apache.hama.monitor.TestFederator.1
            public void handle(Future future) {
                try {
                    atomicInteger.set(((Integer) future.get()).intValue());
                    TestFederator.LOG.info("Value after submitted: " + atomicInteger);
                } catch (InterruptedException e) {
                    TestFederator.LOG.error(e);
                    Thread.currentThread().interrupt();
                } catch (ExecutionException e2) {
                    TestFederator.LOG.error(e2);
                }
            }
        }));
        Thread.sleep(3000L);
        assertEquals("Result should be 11.", atomicInteger.get(), 11);
    }

    public void tearDown() throws Exception {
        this.federator.interrupt();
    }
}
