/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.near;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import javax.cache.CacheException;
import junit.framework.TestCase;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryAbstractDistributedJoinSelfTest;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.typedef.CAX;
import org.apache.ignite.internal.util.typedef.X;

public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest
extends IgniteCacheQueryAbstractDistributedJoinSelfTest {
    private int totalNodes = 6;

    @Override
    protected void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();
        if (this.totalNodes > 2) {
            for (int i = 2; i < this.totalNodes; ++i) {
                this.startGrid(i);
            }
        } else {
            this.totalNodes = 2;
        }
    }

    public void testRestarts() throws Exception {
        this.restarts(false);
    }

    public void testRestartsBroadcast() throws Exception {
        this.restarts(true);
    }

    private void restarts(final boolean broadcastQry) throws Exception {
        int duration = 90000;
        int qryThreadNum = 4;
        int restartThreadsNum = 2;
        int nodeLifeTime = 4000;
        int logFreq = 100;
        final AtomicIntegerArray locks = new AtomicIntegerArray(this.totalNodes);
        SqlFieldsQuery qry0 = broadcastQry ? new SqlFieldsQuery("select co._key, count(*) cnt\nfrom \"co\".Company co, \"pr\".Product pr, \"pu\".Purchase pu, \"pe\".Person pe \nwhere pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \ngroup by co._key order by cnt desc, co._key").setDistributedJoins(true).setEnforceJoinOrder(true) : new SqlFieldsQuery("select co._key, count(*) cnt\nfrom \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\nwhere pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \ngroup by co._key order by cnt desc, co._key").setDistributedJoins(true);
        String plan = this.queryPlan(this.grid(0).cache("pu"), qry0);
        X.println((String)("Plan1: " + plan), (Object[])new Object[0]);
        IgniteCacheQueryNodeRestartDistributedJoinSelfTest.assertEquals((boolean)broadcastQry, (boolean)plan.contains("batched:broadcast"));
        final List pRes = this.grid(0).cache("pu").query((Query)qry0).getAll();
        Thread.sleep(3000L);
        IgniteCacheQueryNodeRestartDistributedJoinSelfTest.assertEquals((Object)pRes, (Object)this.grid(0).cache("pu").query((Query)qry0).getAll());
        final SqlFieldsQuery qry1 = broadcastQry ? new SqlFieldsQuery("select pr._key, co._key\nfrom \"co\".Company co, \"pr\".Product pr \nwhere pr.companyId = co._key\norder by co._key, pr._key ").setDistributedJoins(true).setEnforceJoinOrder(true) : new SqlFieldsQuery("select pr._key, co._key\nfrom \"pr\".Product pr, \"co\".Company co\nwhere pr.companyId = co._key\norder by co._key, pr._key ").setDistributedJoins(true);
        plan = this.queryPlan(this.grid(0).cache("co"), qry1);
        X.println((String)("Plan2: " + plan), (Object[])new Object[0]);
        IgniteCacheQueryNodeRestartDistributedJoinSelfTest.assertEquals((boolean)broadcastQry, (boolean)plan.contains("batched:broadcast"));
        final List rRes = this.grid(0).cache("co").query((Query)qry1).getAll();
        IgniteCacheQueryNodeRestartDistributedJoinSelfTest.assertFalse((boolean)pRes.isEmpty());
        IgniteCacheQueryNodeRestartDistributedJoinSelfTest.assertFalse((boolean)rRes.isEmpty());
        final AtomicInteger qryCnt = new AtomicInteger();
        final AtomicBoolean qrysDone = new AtomicBoolean();
        final AtomicBoolean fail = new AtomicBoolean();
        IgniteInternalFuture fut1 = this.multithreadedAsync((Runnable)new CAX(){

            public void applyx() throws IgniteCheckedException {
                GridRandom rnd = new GridRandom();
                try {
                    while (!qrysDone.get()) {
                        IgniteCache cache;
                        int g;
                        do {
                            g = rnd.nextInt(locks.length());
                            if (!fail.get()) continue;
                            return;
                        } while (!locks.compareAndSet(g, 0, 1));
                        if (rnd.nextBoolean()) {
                            cache = IgniteCacheQueryNodeRestartDistributedJoinSelfTest.this.grid(g).cache("pu");
                            SqlFieldsQuery qry = broadcastQry ? new SqlFieldsQuery("select co._key, count(*) cnt\nfrom \"co\".Company co, \"pr\".Product pr, \"pu\".Purchase pu, \"pe\".Person pe \nwhere pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \ngroup by co._key order by cnt desc, co._key").setDistributedJoins(true).setEnforceJoinOrder(true) : new SqlFieldsQuery("select co._key, count(*) cnt\nfrom \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\nwhere pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \ngroup by co._key order by cnt desc, co._key").setDistributedJoins(true);
                            boolean smallPageSize = rnd.nextBoolean();
                            qry.setPageSize(smallPageSize ? 30 : 1000);
                            try {
                                TestCase.assertEquals((Object)pRes, (Object)cache.query((Query)qry).getAll());
                            }
                            catch (CacheException e) {
                                TestCase.assertTrue((String)"On large page size must retry.", (boolean)smallPageSize);
                                boolean failedOnRemoteFetch = false;
                                for (Throwable th = e; th != null; th = th.getCause()) {
                                    if (!(th instanceof CacheException) || th.getMessage() == null || !th.getMessage().startsWith("Failed to fetch data from node:")) continue;
                                    failedOnRemoteFetch = true;
                                    break;
                                }
                                if (!failedOnRemoteFetch) {
                                    e.printStackTrace();
                                    TestCase.fail((String)"Must fail inside of GridResultPage.fetchNextPage or subclass.");
                                }
                            }
                        } else {
                            cache = IgniteCacheQueryNodeRestartDistributedJoinSelfTest.this.grid(g).cache("co");
                            TestCase.assertEquals((Object)rRes, (Object)cache.query((Query)qry1).getAll());
                        }
                        locks.set(g, 0);
                        int c = qryCnt.incrementAndGet();
                        if (c % 100 != 0) continue;
                        IgniteCacheQueryNodeRestartDistributedJoinSelfTest.this.info("Executed queries: " + c);
                    }
                }
                catch (Throwable e) {
                    e.printStackTrace();
                    IgniteCacheQueryNodeRestartDistributedJoinSelfTest.this.error("Got exception: " + e.getMessage());
                    fail.set(true);
                }
            }
        }, qryThreadNum, "query-thread");
        final AtomicInteger restartCnt = new AtomicInteger();
        final AtomicBoolean restartsDone = new AtomicBoolean();
        IgniteInternalFuture fut2 = this.multithreadedAsync(new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                try {
                    GridRandom rnd = new GridRandom();
                    while (!restartsDone.get()) {
                        int g;
                        do {
                            g = rnd.nextInt(locks.length());
                            if (!fail.get()) continue;
                            return null;
                        } while (!locks.compareAndSet(g, 0, -1));
                        IgniteCacheQueryNodeRestartDistributedJoinSelfTest.this.log.info("Stop node: " + g);
                        IgniteCacheQueryNodeRestartDistributedJoinSelfTest.this.stopGrid(g);
                        Thread.sleep(rnd.nextInt(4000));
                        IgniteCacheQueryNodeRestartDistributedJoinSelfTest.this.log.info("Start node: " + g);
                        IgniteCacheQueryNodeRestartDistributedJoinSelfTest.this.startGrid(g);
                        Thread.sleep(rnd.nextInt(4000));
                        locks.set(g, 0);
                        int c = restartCnt.incrementAndGet();
                        if (c % 100 != 0) continue;
                        IgniteCacheQueryNodeRestartDistributedJoinSelfTest.this.info("Node restarts: " + c);
                    }
                    return true;
                }
                catch (Throwable e) {
                    e.printStackTrace();
                    return true;
                }
            }
        }, restartThreadsNum, "restart-thread");
        Thread.sleep(duration);
        this.info("Stopping...");
        restartsDone.set(true);
        qrysDone.set(true);
        fut2.get();
        fut1.get();
        if (fail.get()) {
            IgniteCacheQueryNodeRestartDistributedJoinSelfTest.fail((String)"See message above");
        }
        this.info("Stopped.");
    }
}

