/*
 * Decompiled with CFR 0.152.
 */
package org.apache.openjpa.persistence.event;

import java.util.Collection;
import java.util.HashMap;
import javax.persistence.EntityManager;
import org.apache.openjpa.event.RemoteCommitEvent;
import org.apache.openjpa.event.RemoteCommitListener;
import org.apache.openjpa.event.TCPRemoteCommitProvider;
import org.apache.openjpa.lib.conf.Configurations;
import org.apache.openjpa.persistence.OpenJPAEntityManager;
import org.apache.openjpa.persistence.OpenJPAEntityManagerFactory;
import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
import org.apache.openjpa.persistence.common.utils.AbstractTestCase;
import org.apache.openjpa.persistence.event.common.apps.Duration;
import org.apache.openjpa.persistence.event.common.apps.RuntimeTest1;

public class TestTCPRemoteRecovery
extends AbstractTestCase {
    private static final int NUM_OBJECTS = 1;
    static int _fetchGroupSerial = 0;

    public TestTCPRemoteRecovery(String s) {
        super(s, "eventcactusapp");
    }

    public void setUp() {
        this.deleteAll(RuntimeTest1.class);
    }

    public void tearDownTestClass() throws Exception {
    }

    private void pause(double seconds) {
        try {
            Thread.currentThread();
            Thread.yield();
            Thread.currentThread();
            Thread.sleep((int)seconds * 1000);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public void testReceiverRecovers() {
        OpenJPAEntityManagerFactory pmfSender = this.createDistinctFactory(TCPRemoteCommitProvider.class, "Port=5636, Addresses=127.0.0.1:5636;127.0.0.1:6636");
        OpenJPAEntityManagerFactory pmfReceiver = this.createDistinctFactory(TCPRemoteCommitProvider.class, "Port=6636, Addresses=127.0.0.1:6636;127.0.0.1:5636");
        RemoteCommitListenerTestImpl listenerAtReceiver = new RemoteCommitListenerTestImpl();
        ((OpenJPAEntityManagerFactorySPI)pmfReceiver).getConfiguration().getRemoteCommitEventManager().addListener((RemoteCommitListener)listenerAtReceiver);
        OpenJPAEntityManager pmSender = pmfSender.createEntityManager();
        System.out.println("-------------------");
        System.out.println("2 PMFs created, acting as a cluster using ports 5636 and 6636");
        System.out.println("Testing scenario where receiver is failed, then recovered ");
        System.out.println("after two timeouts all the while with the sending pm continuing");
        System.out.println("to send.");
        this.performAddsModifiesDeletes((EntityManager)pmSender, 1);
        this.pause(1.0);
        System.out.println("About to close the receiving pmf.");
        pmfReceiver.close();
        this.pause(1.0);
        TestTCPRemoteRecovery.assertEquals((int)1, (int)listenerAtReceiver.totalAddedClasses);
        TestTCPRemoteRecovery.assertEquals((int)1, (int)listenerAtReceiver.totalDeleted);
        TestTCPRemoteRecovery.assertEquals((int)1, (int)listenerAtReceiver.totalUpdated);
        System.out.println("You should now see 1 WARN triggered as the sender-pmf tries to send.");
        this.performAddsModifiesDeletes((EntityManager)pmSender, 1);
        this.pause(15.1);
        System.out.println("Waited for a while. Should see 1 INFO for next transaction.");
        this.performAddsModifiesDeletes((EntityManager)pmSender, 1);
        this.pause(1.1);
        System.out.println("Recovering receiver pmf.");
        pmfReceiver = this.createDistinctFactory(TCPRemoteCommitProvider.class, "Port=6636, Addresses=127.0.0.1:6636;127.0.0.1:5636");
        this.pause(1.0);
        ((OpenJPAEntityManagerFactorySPI)pmfReceiver).getConfiguration().getRemoteCommitEventManager().addListener((RemoteCommitListener)listenerAtReceiver);
        TestTCPRemoteRecovery.assertEquals((int)1, (int)listenerAtReceiver.totalAddedClasses);
        TestTCPRemoteRecovery.assertEquals((int)1, (int)listenerAtReceiver.totalDeleted);
        TestTCPRemoteRecovery.assertEquals((int)1, (int)listenerAtReceiver.totalUpdated);
        System.out.println("Now waiting a recoverytime so that the sender");
        System.out.println("will resume trying to connect to the receiver.");
        this.pause(15.1);
        this.performAddsModifiesDeletes((EntityManager)pmSender, 1);
        this.pause(1.0);
        TestTCPRemoteRecovery.assertEquals((int)2, (int)listenerAtReceiver.totalAddedClasses);
        TestTCPRemoteRecovery.assertEquals((int)2, (int)listenerAtReceiver.totalDeleted);
        TestTCPRemoteRecovery.assertEquals((int)2, (int)listenerAtReceiver.totalUpdated);
        pmSender.close();
        pmfSender.close();
        pmfReceiver.close();
    }

    public void testSenderRecovers() {
        OpenJPAEntityManagerFactory pmfSender = this.createDistinctFactory(TCPRemoteCommitProvider.class, "Port=5637, Addresses=127.0.0.1:5637;127.0.0.1:6637");
        OpenJPAEntityManagerFactory pmfReceiver = this.createDistinctFactory(TCPRemoteCommitProvider.class, "Port=6637, Addresses=127.0.0.1:6637;127.0.0.1:5637");
        RemoteCommitListenerTestImpl listenerAtReceiver = new RemoteCommitListenerTestImpl();
        ((OpenJPAEntityManagerFactorySPI)pmfReceiver).getConfiguration().getRemoteCommitEventManager().addListener((RemoteCommitListener)listenerAtReceiver);
        OpenJPAEntityManager pmSender = pmfSender.createEntityManager();
        System.out.println("-------------------");
        System.out.println("2 PMFs created, acting as a cluster using ports 5637 and 6637");
        System.out.println("Testing scenario where sender fails and then later recovers.");
        System.out.println("All the while the receiving pm stays up and should receive");
        System.out.println("Events (both before and after the sender's failure).");
        this.performAddsModifiesDeletes((EntityManager)pmSender, 1);
        this.pause(2.1);
        System.out.println("Sender pmf closed.");
        pmSender.close();
        pmfSender.close();
        this.pause(4.1);
        System.out.println("Waited for a while.");
        System.out.println("Recovering the sender pmf.");
        pmfSender = this.createDistinctFactory(TCPRemoteCommitProvider.class, "Port=5637, Addresses=127.0.0.1:5637;127.0.0.1:6637");
        pmSender = pmfSender.createEntityManager();
        this.performAddsModifiesDeletes((EntityManager)pmSender, 1);
        this.pause(4.1);
        TestTCPRemoteRecovery.assertEquals((int)2, (int)listenerAtReceiver.totalAddedClasses);
        TestTCPRemoteRecovery.assertEquals((int)2, (int)listenerAtReceiver.totalDeleted);
        TestTCPRemoteRecovery.assertEquals((int)2, (int)listenerAtReceiver.totalUpdated);
        pmSender.close();
        pmfSender.close();
        pmfReceiver.close();
    }

    protected double performAddsModifiesDeletes(EntityManager pm, int numObjects) {
        int i;
        Duration timeToAMD = new Duration("Adds, removes, and dletes for " + numObjects + " objects.");
        timeToAMD.start();
        RuntimeTest1[] persistables = new RuntimeTest1[numObjects];
        for (i = 0; i < persistables.length; ++i) {
            persistables[i] = new RuntimeTest1("foo #" + i, i);
        }
        for (i = 0; i < persistables.length; ++i) {
            this.startTx(pm);
            pm.persist((Object)persistables[i]);
            this.endTx(pm);
        }
        this.startTx(pm);
        for (i = 0; i < persistables.length; ++i) {
            persistables[i].setStringField("bazzed" + i);
        }
        this.endTx(pm);
        this.startTx(pm);
        for (i = 0; i < persistables.length; ++i) {
            pm.remove((Object)persistables[i]);
        }
        this.endTx(pm);
        timeToAMD.stop();
        return timeToAMD.getDurationAsSeconds();
    }

    protected OpenJPAEntityManagerFactory createDistinctFactory(Class providerClass, String classProps1) {
        HashMap<String, String> propsMap;
        if (providerClass != null) {
            propsMap = new HashMap<String, String>();
            propsMap.put("openjpa.RemoteCommitProvider", Configurations.getPlugin((String)providerClass.getName(), (String)classProps1));
            propsMap.put("openjpa.FetchGroups", "differentiatingFetchGroup" + _fetchGroupSerial);
        } else {
            propsMap = new HashMap();
            propsMap.put("openjpa.RemoteCommitProvider", "sjvm");
            propsMap.put("openjpa.FetchGroups", "differentiatingFetchGroup" + _fetchGroupSerial);
        }
        ++_fetchGroupSerial;
        return this.getEmf(propsMap);
    }

    protected static class RemoteCommitListenerTestImpl
    implements RemoteCommitListener {
        Collection updated;
        Collection deleted;
        int totalAddedClasses;
        int totalUpdated;
        int totalDeleted;

        protected RemoteCommitListenerTestImpl() {
        }

        public synchronized void afterCommit(RemoteCommitEvent event) {
            this.updated = event.getUpdatedObjectIds();
            this.deleted = event.getDeletedObjectIds();
            this.totalAddedClasses += event.getPersistedTypeNames().size();
            this.totalUpdated += this.updated.size();
            this.totalDeleted += this.deleted.size();
            System.out.println("Aftercommit " + this);
        }

        public void close() {
        }

        public String toString() {
            String returnString = "Added clsses " + this.totalAddedClasses + " Dels " + this.totalDeleted + " Ups " + this.totalUpdated;
            return returnString;
        }
    }
}

