package com.datastax.dse.driver.api.core.cql.continuous.reactive;

import com.datastax.dse.driver.api.core.cql.continuous.ContinuousPagingITBase;
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
import com.datastax.dse.driver.api.core.metrics.DseSessionMetric;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
import com.datastax.oss.driver.api.testinfra.DseRequirement;
import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.tngtech.java.junit.dataprovider.DataProviderRunner;
import com.tngtech.java.junit.dataprovider.UseDataProvider;
import io.reactivex.Flowable;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;

@RunWith(DataProviderRunner.class)
@DseRequirement(min = "5.1.0", description = "Continuous paging is only available from 5.1.0 onwards")
@Category({ParallelizableTests.class})
/* loaded from: input_file:com/datastax/dse/driver/api/core/cql/continuous/reactive/ContinuousPagingReactiveIT.class */
public class ContinuousPagingReactiveIT extends ContinuousPagingITBase {
    private static CcmRule ccmRule = CcmRule.getInstance();
    private static SessionRule<CqlSession> sessionRule = SessionRule.builder(ccmRule).withConfigLoader(SessionUtils.configLoaderBuilder().withStringList(DefaultDriverOption.METRICS_SESSION_ENABLED, Collections.singletonList(DseSessionMetric.CONTINUOUS_CQL_REQUESTS.getPath())).withStringList(DefaultDriverOption.METRICS_NODE_ENABLED, Collections.singletonList(DefaultNodeMetric.CQL_MESSAGES.getPath())).build()).build();

    @ClassRule
    public static TestRule chain = RuleChain.outerRule(ccmRule).around(sessionRule);

    @BeforeClass
    public static void setUp() {
        initialize(sessionRule.session(), sessionRule.slowProfile());
    }

    @Test
    @UseDataProvider("pagingOptions")
    public void should_execute_reactively(ContinuousPagingITBase.Options options) {
        CqlSession cqlSession = (CqlSession) sessionRule.session();
        ContinuousReactiveResultSet executeContinuouslyReactive = cqlSession.executeContinuouslyReactive(SimpleStatement.newInstance("SELECT v from test where k=?", new Object[]{"k"}).setExecutionProfile(options.asProfile(cqlSession)));
        List list = (List) Flowable.fromPublisher(executeContinuouslyReactive).toList().blockingGet();
        Assertions.assertThat(list).hasSize(options.expectedRows);
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        for (int i = 0; i < list.size(); i++) {
            ReactiveRow reactiveRow = (ReactiveRow) list.get(i);
            Assertions.assertThat(reactiveRow.getInt("v")).isEqualTo(i);
            linkedHashSet.add(reactiveRow.getExecutionInfo());
        }
        Assertions.assertThat((List) Flowable.fromPublisher(executeContinuouslyReactive.getExecutionInfos()).toList().blockingGet()).containsAll(linkedHashSet);
        List list2 = (List) Flowable.fromPublisher(executeContinuouslyReactive.getColumnDefinitions()).toList().blockingGet();
        ReactiveRow reactiveRow2 = (ReactiveRow) list.get(0);
        Assertions.assertThat(list2).hasSize(1).containsExactly(new ColumnDefinitions[]{reactiveRow2.getColumnDefinitions()});
        Assertions.assertThat((List) Flowable.fromPublisher(executeContinuouslyReactive.wasApplied()).toList().blockingGet()).hasSize(1).containsExactly(new Boolean[]{Boolean.valueOf(reactiveRow2.wasApplied())});
        validateMetrics(cqlSession);
    }
}
