/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.AsyncContinuousPagingResult;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ContinuousPagingOptions;
import com.datastax.driver.core.ContinuousPagingResult;
import com.datastax.driver.core.ContinuousPagingSession;
import com.datastax.driver.core.GuavaCompatibility;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TestUtils;
import com.datastax.driver.core.VersionNumber;
import com.datastax.driver.core.exceptions.ClientWriteException;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.OperationTimedOutException;
import com.datastax.driver.core.utils.DseVersion;
import com.datastax.driver.dse.CCMDseTestsSupport;
import com.datastax.driver.dse.DseCluster;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@DseVersion(value="5.1.0")
public class ContinuousPagingTest
extends CCMDseTestsSupport {
    public static final String KEY = "k";
    private PreparedStatement prepared;
    private Session schemaChangeSession;

    @Override
    public void onTestContextInitialized() {
        this.execute("CREATE TABLE test (k text, v int, PRIMARY KEY (k, v))");
        this.execute("CREATE TABLE test2 (k text, v int, v0 uuid, v1 uuid, PRIMARY KEY (k, v, v0))");
        for (int i = 0; i < 100; ++i) {
            this.execute(String.format("INSERT INTO test (k, v) VALUES ('%s', %d)", KEY, i));
        }
        int count = 0;
        for (int i = 0; i < 200; ++i) {
            BatchStatement batch = new BatchStatement();
            for (int j = 0; j < 100; ++j) {
                batch.add((Statement)new SimpleStatement("INSERT INTO test2 (k, v, v0, v1) VALUES (?, ?, ?, ?)", new Object[]{KEY, count++, UUID.randomUUID(), UUID.randomUUID()}));
            }
            this.session().execute((Statement)batch);
        }
        this.prepared = this.session().prepare("SELECT V from test where k=?");
        this.schemaChangeSession = this.cluster().connect(this.keyspace);
    }

    private ContinuousPagingSession cSession() {
        return (ContinuousPagingSession)super.session();
    }

    @DataProvider
    Object[][] pagingOptions() {
        return new Object[][]{{ContinuousPagingOptions.builder().withPageSize(100, ContinuousPagingOptions.PageUnit.ROWS).build(), 100, 1}, {ContinuousPagingOptions.builder().withPageSize(99, ContinuousPagingOptions.PageUnit.ROWS).build(), 100, 2}, {ContinuousPagingOptions.builder().withPageSize(50, ContinuousPagingOptions.PageUnit.ROWS).build(), 100, 2}, {ContinuousPagingOptions.builder().withPageSize(1, ContinuousPagingOptions.PageUnit.ROWS).build(), 100, 100}, {ContinuousPagingOptions.builder().withPageSize(10, ContinuousPagingOptions.PageUnit.ROWS).withMaxPages(10).build(), 100, 10}, {ContinuousPagingOptions.builder().withPageSize(10, ContinuousPagingOptions.PageUnit.ROWS).withMaxPages(9).build(), 90, 9}, {ContinuousPagingOptions.builder().withPageSize(10, ContinuousPagingOptions.PageUnit.ROWS).withMaxPages(0).withMaxPagesPerSecond(2).build(), 100, 10}, {ContinuousPagingOptions.builder().withPageSize(8, ContinuousPagingOptions.PageUnit.BYTES).build(), 100, 100}, {ContinuousPagingOptions.builder().withPageSize(16, ContinuousPagingOptions.PageUnit.BYTES).build(), 100, 50}, {ContinuousPagingOptions.builder().withPageSize(32, ContinuousPagingOptions.PageUnit.BYTES).build(), 100, 25}};
    }

    @Test(groups={"short"}, expectedExceptions={NullPointerException.class}, expectedExceptionsMessageRegExp="Options must not be null")
    public void should_throw_exception_when_given_null_options_sync() {
        SimpleStatement statement = new SimpleStatement("SELECT v from test where k=?", new Object[]{KEY});
        this.cSession().executeContinuously((Statement)statement, null);
    }

    @Test(groups={"short"}, expectedExceptions={NullPointerException.class}, expectedExceptionsMessageRegExp="Options must not be null")
    public void should_throw_exception_when_given_null_options_async() {
        SimpleStatement statement = new SimpleStatement("SELECT v from test where k=?", new Object[]{KEY});
        this.cSession().executeContinuouslyAsync((Statement)statement, null);
    }

    @Test(groups={"short"}, dataProvider="pagingOptions")
    public void synchronous_paging_with_options(ContinuousPagingOptions options, int expectedRows, int expectedPages) {
        SimpleStatement statement = new SimpleStatement("SELECT v from test where k=?", new Object[]{KEY});
        ContinuousPagingResult result = this.cSession().executeContinuously((Statement)statement, options);
        int i = 0;
        for (Row row : result) {
            Assertions.assertThat((int)row.getInt("v")).isEqualTo(i);
            ++i;
        }
        Assertions.assertThat((int)i).isEqualTo(expectedRows);
    }

    @Test(groups={"short"}, dataProvider="pagingOptions")
    public void prepared_statement_paging_with_options(ContinuousPagingOptions options, int expectedRows, int expectedPages) {
        ContinuousPagingResult result = this.cSession().executeContinuously((Statement)this.prepared.bind(new Object[]{KEY}), options);
        int i = 0;
        for (Row row : result) {
            Assertions.assertThat((int)row.getInt("v")).isEqualTo(i);
            ++i;
        }
        Assertions.assertThat((int)i).isEqualTo(expectedRows);
    }

    @Test(groups={"short"})
    public void prepared_statement_paging_should_be_resilient_to_schema_change() {
        this.execute("CREATE TABLE test_prep (k text PRIMARY KEY, v int)");
        for (int i = 0; i < 100; ++i) {
            this.execute(String.format("INSERT INTO test_prep (k, v) VALUES ('%d', %d)", i, i));
        }
        PreparedStatement prepared = this.cSession().prepare("select * from test_prep");
        ContinuousPagingOptions options = ContinuousPagingOptions.builder().withMaxEnqueuedPages(1).withPageSize(1, ContinuousPagingOptions.PageUnit.ROWS).build();
        ContinuousPagingResult result = this.cSession().executeContinuously((Statement)prepared.bind(), options);
        Iterator it = result.iterator();
        Row row0 = (Row)it.next();
        Assertions.assertThat((String)row0.getString(KEY)).isNotNull();
        Assertions.assertThat((boolean)row0.isNull("v")).isFalse();
        this.schemaChangeSession.execute("ALTER TABLE test_prep drop v;");
        while (it.hasNext()) {
            Row row = (Row)it.next();
            Assertions.assertThat((String)row.getString(KEY)).isNotNull();
            if (this.ccm().getDSEVersion().compareTo(VersionNumber.parse((String)"6.0.0")) >= 0) {
                Assertions.assertThat((boolean)row.isNull("v")).isTrue();
            }
            Assertions.assertThat((boolean)row.getColumnDefinitions().contains("v")).isTrue();
        }
        result = this.cSession().executeContinuously((Statement)prepared.bind(), options);
        for (Row row : result) {
            Assertions.assertThat((String)row.getString(KEY)).isNotNull();
            Assertions.assertThat((boolean)row.getColumnDefinitions().contains("v")).isFalse();
        }
    }

    @Test(groups={"short"})
    public void simple_statement_paging_should_be_resilient_to_schema_change() {
        this.execute("CREATE TABLE test_simple (k text PRIMARY KEY, v int)");
        for (int i = 0; i < 100; ++i) {
            this.execute(String.format("INSERT INTO test_simple (k, v) VALUES ('%d', %d)", i, i));
        }
        SimpleStatement simple = new SimpleStatement("select * from test_simple");
        ContinuousPagingOptions options = ContinuousPagingOptions.builder().withMaxEnqueuedPages(1).withPageSize(1, ContinuousPagingOptions.PageUnit.ROWS).build();
        ContinuousPagingResult result = this.cSession().executeContinuously((Statement)simple, options);
        this.cSession().getCluster().getConfiguration().getSocketOptions().setReadTimeoutMillis(120000000);
        Iterator it = result.iterator();
        Row row0 = (Row)it.next();
        Assertions.assertThat((String)row0.getString(KEY)).isNotNull();
        Assertions.assertThat((boolean)row0.isNull("v")).isFalse();
        this.schemaChangeSession.execute("ALTER TABLE test_simple add b int");
        while (it.hasNext()) {
            Row row = (Row)it.next();
            Assertions.assertThat((String)row.getString(KEY)).isNotNull();
            Assertions.assertThat((boolean)row.isNull("v")).isFalse();
            Assertions.assertThat((boolean)row.getColumnDefinitions().contains("b")).isFalse();
        }
        result = this.cSession().executeContinuously((Statement)simple, options);
        for (Row row : result) {
            Assertions.assertThat((String)row.getString(KEY)).isNotNull();
            Assertions.assertThat((boolean)row.isNull("v")).isFalse();
            Assertions.assertThat((boolean)row.isNull("b")).isTrue();
            Assertions.assertThat((boolean)row.getColumnDefinitions().contains("b")).isTrue();
        }
    }

    @Test(groups={"short"})
    public void should_cancel_with_synchronous_paging() throws Exception {
        SimpleStatement statement = new SimpleStatement("SELECT v from test where k=?", new Object[]{KEY});
        ContinuousPagingOptions options = ContinuousPagingOptions.builder().withPageSize(10, ContinuousPagingOptions.PageUnit.ROWS).withMaxPages(0).withMaxPagesPerSecond(1).build();
        ContinuousPagingResult pagingResult = this.cSession().executeContinuously((Statement)statement, options);
        pagingResult.cancel();
        int i = 0;
        for (Row row : pagingResult) {
            Assertions.assertThat((int)row.getInt("v")).isEqualTo(i);
            ++i;
        }
        Assertions.assertThat((int)i).isEqualTo(10);
    }

    @Test(groups={"short"}, dataProvider="pagingOptions")
    public void asynchronous_paging_with_options(ContinuousPagingOptions options, int expectedRows, int expectedPages) throws Exception {
        SimpleStatement statement = new SimpleStatement("SELECT v from test where k=?", new Object[]{KEY});
        ListenableFuture result = this.cSession().executeContinuouslyAsync((Statement)statement, options);
        ListenableFuture future = GuavaCompatibility.INSTANCE.transformAsync(result, (AsyncFunction)new AsyncContinuousPagingFunction());
        PageStatistics stats = (PageStatistics)Uninterruptibles.getUninterruptibly((Future)future, (long)30L, (TimeUnit)TimeUnit.SECONDS);
        Assertions.assertThat((int)stats.rows).isEqualTo(expectedRows);
        Assertions.assertThat((int)stats.pages).isEqualTo(expectedPages);
    }

    @Test(groups={"short"})
    public void should_cancel_with_asynchronous_paging() throws Exception {
        SimpleStatement statement = new SimpleStatement("SELECT v from test where k=?", new Object[]{KEY});
        ContinuousPagingOptions options = ContinuousPagingOptions.builder().withPageSize(10, ContinuousPagingOptions.PageUnit.ROWS).withMaxPages(0).withMaxPagesPerSecond(1).build();
        ListenableFuture future = this.cSession().executeContinuouslyAsync((Statement)statement, options);
        AsyncContinuousPagingResult pagingResult = (AsyncContinuousPagingResult)Uninterruptibles.getUninterruptibly((Future)future, (long)30L, (TimeUnit)TimeUnit.SECONDS);
        pagingResult.cancel();
        ListenableFuture nextPageFuture = pagingResult.nextPage();
        try {
            Uninterruptibles.getUninterruptibly((Future)nextPageFuture, (long)1L, (TimeUnit)TimeUnit.SECONDS);
            Assert.fail((String)"Expected an execution exception since paging was cancelled.");
        }
        catch (ExecutionException te) {
            Assertions.assertThat((boolean)te.getMessage().contains("was cancelled"));
        }
    }

    @Test(groups={"short"})
    public void should_cancel_future_when_cancelling_previous_result() throws Exception {
        SimpleStatement statement = new SimpleStatement("SELECT v from test where k=?", new Object[]{KEY});
        ContinuousPagingOptions options = ContinuousPagingOptions.builder().withPageSize(10, ContinuousPagingOptions.PageUnit.ROWS).withMaxPages(0).withMaxPagesPerSecond(1).build();
        ListenableFuture future = this.cSession().executeContinuouslyAsync((Statement)statement, options);
        AsyncContinuousPagingResult pagingResult = (AsyncContinuousPagingResult)Uninterruptibles.getUninterruptibly((Future)future, (long)30L, (TimeUnit)TimeUnit.SECONDS);
        ListenableFuture nextPageFuture = pagingResult.nextPage();
        pagingResult.cancel();
        Assertions.assertThat((boolean)nextPageFuture.isCancelled()).isTrue();
        try {
            Uninterruptibles.getUninterruptibly((Future)nextPageFuture, (long)1L, (TimeUnit)TimeUnit.SECONDS);
            Assert.fail((String)"Expected a cancellation exception since previous result was cancelled.");
        }
        catch (CancellationException cancellationException) {
            // empty catch block
        }
    }

    @Test(groups={"short"})
    public void should_cancel_when_future_is_cancelled() throws Exception {
        SimpleStatement statement = new SimpleStatement("SELECT v from test where k=?", new Object[]{KEY});
        ContinuousPagingOptions options = ContinuousPagingOptions.builder().withPageSize(10, ContinuousPagingOptions.PageUnit.ROWS).withMaxPages(0).withMaxPagesPerSecond(1).build();
        ListenableFuture future = this.cSession().executeContinuouslyAsync((Statement)statement, options);
        AsyncContinuousPagingResult pagingResult = (AsyncContinuousPagingResult)Uninterruptibles.getUninterruptibly((Future)future, (long)30L, (TimeUnit)TimeUnit.SECONDS);
        ListenableFuture nextPageFuture = pagingResult.nextPage();
        nextPageFuture.cancel(false);
        Assertions.assertThat((boolean)nextPageFuture.isCancelled()).isTrue();
        try {
            Uninterruptibles.getUninterruptibly((Future)nextPageFuture, (long)1L, (TimeUnit)TimeUnit.SECONDS);
            Assert.fail((String)"Expected a cancellation exception since fuure was cancelled.");
        }
        catch (CancellationException cancellationException) {
            // empty catch block
        }
    }

    @Test(groups={"short"})
    public void should_time_out_when_server_does_not_produce_pages_fast_enough() throws Exception {
        SimpleStatement statement = new SimpleStatement("SELECT v from test where k=?", new Object[]{KEY});
        ContinuousPagingOptions options = ContinuousPagingOptions.builder().withPageSize(10, ContinuousPagingOptions.PageUnit.ROWS).withMaxPagesPerSecond(1).build();
        statement.setReadTimeoutMillis(100);
        ListenableFuture future = this.cSession().executeContinuouslyAsync((Statement)statement, options);
        AsyncContinuousPagingResult pagingResult = (AsyncContinuousPagingResult)Uninterruptibles.getUninterruptibly((Future)future, (long)30L, (TimeUnit)TimeUnit.SECONDS);
        try {
            pagingResult.nextPage().get();
            Assert.fail((String)"Expected a timeout");
        }
        catch (ExecutionException e) {
            ((AbstractThrowableAssert)Assertions.assertThat((Throwable)e.getCause()).isInstanceOf(OperationTimedOutException.class)).hasMessageContaining("Timed out waiting for page 2");
        }
    }

    @Test(groups={"short"})
    public void should_resume_reading_when_client_catches_up() throws Exception {
        SimpleStatement statement = new SimpleStatement("SELECT * from test2 where k=?", new Object[]{KEY});
        ContinuousPagingOptions options = ContinuousPagingOptions.builder().withPageSize(100, ContinuousPagingOptions.PageUnit.ROWS).build();
        ListenableFuture result = this.cSession().executeContinuouslyAsync((Statement)statement, options);
        Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
        ListenableFuture future = GuavaCompatibility.INSTANCE.transformAsync(result, (AsyncFunction)new AsyncContinuousPagingFunction());
        PageStatistics stats = (PageStatistics)Uninterruptibles.getUninterruptibly((Future)future, (long)30L, (TimeUnit)TimeUnit.SECONDS);
        Assertions.assertThat((int)stats.rows).isEqualTo(20000);
        Assertions.assertThat((int)stats.pages).isEqualTo(200);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"short"})
    public void should_release_stream_id_when_server_side_error_is_thrown_as_result_of_not_consuming_fast_enough() throws Exception {
        DseCluster cluster = this.createClusterBuilder().addContactPointsWithPorts(new InetSocketAddress[]{this.ccm().addressOfNode(1)}).withSocketOptions(new SocketOptions().setReceiveBufferSize(65535)).build();
        ContinuousPagingSession session = null;
        Host host = null;
        int pageNumber = 0;
        try {
            try {
                session = (ContinuousPagingSession)cluster.connect(this.keyspace);
                SimpleStatement statement = new SimpleStatement("SELECT * from test2 where k=?", new Object[]{KEY});
                ContinuousPagingOptions options = ContinuousPagingOptions.builder().withPageSize(100, ContinuousPagingOptions.PageUnit.ROWS).build();
                ListenableFuture future = session.executeContinuouslyAsync((Statement)statement, options);
                host = TestUtils.findHost((Cluster)cluster, 1);
                Assertions.assertThat((int)session.getState().getInFlightQueries(host)).isEqualTo(1);
                Uninterruptibles.sleepUninterruptibly((long)25L, (TimeUnit)TimeUnit.SECONDS);
                int rowCount = 0;
                AsyncContinuousPagingResult result = (AsyncContinuousPagingResult)Uninterruptibles.getUninterruptibly((Future)future);
                while (true) {
                    pageNumber = result.pageNumber();
                    rowCount += Iterables.size((Iterable)result.currentPage());
                    if (result.isLast()) break;
                    result = (AsyncContinuousPagingResult)Uninterruptibles.getUninterruptibly((Future)result.nextPage());
                }
                Assertions.assertThat((int)rowCount).isEqualTo(20000);
                throw new SkipException("ClientWriteException was not raised, TCP window scaling is enabled or window size is larger than default.");
            }
            catch (ExecutionException e) {
                Throwable cause = e.getCause();
                Assertions.assertThat((Throwable)cause).isInstanceOf(DriverException.class);
                Throwable clientWriteException = cause.getCause();
                Assertions.assertThat((Throwable)clientWriteException).isInstanceOf(ClientWriteException.class);
                Assertions.assertThat((String)clientWriteException.getMessage()).startsWith((CharSequence)"Timed out adding page to output queue");
                Assertions.assertThat((int)pageNumber).isGreaterThanOrEqualTo(5);
                boolean closed = false;
                try {
                    if (host != null) {
                        session.execute("select * from system.local");
                        Assertions.assertThat((int)session.getState().getInFlightQueries(host)).isEqualTo(0);
                    }
                    closed = true;
                    Uninterruptibles.getUninterruptibly((Future)cluster.closeAsync(), (long)3L, (TimeUnit)TimeUnit.SECONDS);
                }
                finally {
                    if (!closed) {
                        cluster.closeAsync();
                    }
                }
            }
        }
        catch (Throwable throwable) {
            boolean closed = false;
            try {
                if (host != null) {
                    session.execute("select * from system.local");
                    Assertions.assertThat((int)session.getState().getInFlightQueries(host)).isEqualTo(0);
                }
                closed = true;
                Uninterruptibles.getUninterruptibly((Future)cluster.closeAsync(), (long)3L, (TimeUnit)TimeUnit.SECONDS);
            }
            finally {
                if (!closed) {
                    cluster.closeAsync();
                }
            }
            throw throwable;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"short"})
    public void should_not_disable_autoread_after_last_page_received() throws Exception {
        DseCluster cluster = this.createClusterBuilder().addContactPointsWithPorts(new InetSocketAddress[]{this.ccm().addressOfNode(1)}).withSocketOptions(new SocketOptions().setReceiveBufferSize(65535)).build();
        try {
            ContinuousPagingSession session = (ContinuousPagingSession)cluster.connect(this.keyspace);
            SimpleStatement statement = new SimpleStatement("SELECT * from test2 where k=?", new Object[]{KEY});
            ContinuousPagingOptions options = ContinuousPagingOptions.builder().withPageSize(1000, ContinuousPagingOptions.PageUnit.ROWS).build();
            ListenableFuture future = session.executeContinuouslyAsync((Statement)statement, options);
            AsyncContinuousPagingResult result = (AsyncContinuousPagingResult)Uninterruptibles.getUninterruptibly((Future)future);
            while ((result = (AsyncContinuousPagingResult)Uninterruptibles.getUninterruptibly((Future)result.nextPage())).pageNumber() < 17) {
            }
            Uninterruptibles.sleepUninterruptibly((long)5L, (TimeUnit)TimeUnit.SECONDS);
            session.execute("select * from system.local");
        }
        finally {
            cluster.closeAsync();
        }
    }

    private static class AsyncContinuousPagingFunction
    implements AsyncFunction<AsyncContinuousPagingResult, PageStatistics> {
        private final int rowsSoFar;

        AsyncContinuousPagingFunction() {
            this(0);
        }

        AsyncContinuousPagingFunction(int rowsSoFar) {
            this.rowsSoFar = rowsSoFar;
        }

        public ListenableFuture<PageStatistics> apply(AsyncContinuousPagingResult input) throws Exception {
            int rows = this.rowsSoFar;
            for (Row row : input.currentPage()) {
                int v = row.getInt("v");
                if (v != rows) {
                    throw new Exception(String.format("Expected v == %d, got %d.", rows, v));
                }
                ++rows;
            }
            if (input.isLast()) {
                int pages = rows == this.rowsSoFar ? input.pageNumber() - 1 : input.pageNumber();
                return Futures.immediateFuture((Object)new PageStatistics(rows, pages));
            }
            return GuavaCompatibility.INSTANCE.transformAsync(input.nextPage(), (AsyncFunction)new AsyncContinuousPagingFunction(rows));
        }
    }

    private static class PageStatistics {
        int rows;
        int pages;

        PageStatistics(int rows, int pages) {
            this.rows = rows;
            this.pages = pages;
        }
    }
}

