/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.CCMConfig;
import com.datastax.driver.core.CCMTestsSupport;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.CreateCCM;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.exceptions.OperationTimedOutException;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@CreateCCM(value=CreateCCM.TestMode.PER_METHOD)
public class ReusedStreamIdTest
extends CCMTestsSupport {
    static Logger logger = LoggerFactory.getLogger(ReusedStreamIdTest.class);

    public Cluster.Builder createClusterLowReadTimeout() {
        return Cluster.builder().withSocketOptions(new SocketOptions().setReadTimeoutMillis(1000));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"long"})
    @CCMConfig(numberOfNodes={2}, clusterProvider="createClusterLowReadTimeout")
    public void should_not_receive_wrong_response_when_callbacks_block_io_thread() {
        int concurrency = 10;
        final Semaphore semaphore = new Semaphore(concurrency);
        final Random random = new Random();
        try {
            List columnsToGrab = this.cluster().getMetadata().getKeyspace("system").getTable("local").getColumns();
            Assertions.assertThat((int)columnsToGrab.size()).isGreaterThan(1);
            final CountDownLatch errorTrigger = new CountDownLatch(1);
            long start = System.currentTimeMillis();
            int iterations = 500;
            final AtomicInteger completed = new AtomicInteger(0);
            for (int i = 1; i <= iterations; ++i) {
                try {
                    if (errorTrigger.getCount() == 0L) {
                        Assert.fail((String)String.format("Error triggered at or before %d of %d requests after %dms.", i, iterations, System.currentTimeMillis() - start));
                    }
                    semaphore.acquire();
                    final String column = ((ColumnMetadata)columnsToGrab.get(i % columnsToGrab.size())).getName();
                    String query = String.format("select %s from system.local", column);
                    ResultSetFuture future = this.session().executeAsync(query);
                    Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<ResultSet>(){

                        public void onSuccess(ResultSet result) {
                            semaphore.release();
                            int columnIndex = result.getColumnDefinitions().getIndexOf(column);
                            if (columnIndex == -1) {
                                logger.error("Got response without column {}, got columns {} from Host {}.", new Object[]{column, result.getColumnDefinitions(), result.getExecutionInfo().getQueriedHost()});
                                errorTrigger.countDown();
                                return;
                            }
                            completed.incrementAndGet();
                            int num = random.nextInt(1);
                            if (num == 0) {
                                Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
                            }
                        }

                        public void onFailure(Throwable t) {
                            semaphore.release();
                            if (!(t instanceof OperationTimedOutException)) {
                                logger.error("Unexpected error encountered.", t);
                                errorTrigger.countDown();
                            }
                        }
                    });
                }
                catch (InterruptedException e) {
                    Assert.fail((String)"Test interrupted", (Throwable)e);
                }
                if (i % (iterations / 10) != 0) continue;
                logger.info("Submitted {} of {} requests. ({} completed successfully)", new Object[]{i, iterations, completed.get()});
            }
            Uninterruptibles.awaitUninterruptibly((CountDownLatch)errorTrigger, (long)10L, (TimeUnit)TimeUnit.SECONDS);
            if (errorTrigger.getCount() == 0L) {
                Assert.fail((String)String.format("Error triggered after %dms.", System.currentTimeMillis() - start));
            }
            Assertions.assertThat((int)completed.get()).isGreaterThan(0);
        }
        finally {
            try {
                if (!semaphore.tryAcquire(concurrency, 10L, TimeUnit.SECONDS)) {
                    Assert.fail((String)"Could not acquire all permits within 10 seconds of completion.");
                }
            }
            catch (InterruptedException e) {
                Assert.fail((String)"Interrupted.", (Throwable)e);
            }
        }
    }
}

