package org.apache.james.backends.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.james.backends.cassandra.Scenario;
import org.apache.james.backends.cassandra.StatementRecorder;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.backends.cassandra.versions.SchemaVersion;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/backends/cassandra/TestingSessionTest.class */
class TestingSessionTest {

    @RegisterExtension
    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraSchemaVersionModule.MODULE);
    private CassandraSchemaVersionDAO dao;

    TestingSessionTest() {
    }

    @BeforeEach
    void setUp(CassandraCluster cassandraCluster2) {
        this.dao = new CassandraSchemaVersionDAO(cassandraCluster2.getConf());
    }

    @Test
    void daoOperationShouldNotBeInstrumentedByDefault() {
        Assertions.assertThatCode(() -> {
            this.dao.getCurrentSchemaVersion().block();
        }).doesNotThrowAnyException();
    }

    @Test
    void daoOperationShouldNotBeInstrumentedWhenExecuteNormally(CassandraCluster cassandraCluster2) {
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.executeNormally().times(1).whenQueryStartsWith("SELECT value FROM schemaVersion;"));
        Assertions.assertThatCode(() -> {
            this.dao.getCurrentSchemaVersion().block();
        }).doesNotThrowAnyException();
    }

    @Test
    void daoOperationShouldNotBeInstrumentedWhenReturnEmpty(CassandraCluster cassandraCluster2) {
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.returnEmpty().times(1).whenQueryStartsWith("SELECT value FROM schemaVersion;"));
        Assertions.assertThat((Optional) this.dao.getCurrentSchemaVersion().block()).isEmpty();
    }

    @Test
    void recordStatementsShouldKeepTraceOfExecutedStatement(CassandraCluster cassandraCluster2) {
        StatementRecorder statementRecorder = new StatementRecorder(StatementRecorder.Selector.ALL);
        cassandraCluster2.getConf().recordStatements(statementRecorder);
        this.dao.getCurrentSchemaVersion().block();
        Assertions.assertThat(statementRecorder.listExecutedStatements(StatementRecorder.Selector.preparedStatement("SELECT value FROM schemaVersion;"))).hasSize(1);
    }

    @Test
    void recordStatementsShouldKeepTraceOfExecutedStatements(CassandraCluster cassandraCluster2) {
        StatementRecorder statementRecorder = new StatementRecorder(StatementRecorder.Selector.ALL);
        cassandraCluster2.getConf().recordStatements(statementRecorder);
        this.dao.updateVersion(new SchemaVersion(36)).block();
        this.dao.getCurrentSchemaVersion().block();
        ListAssert filteredOn = Assertions.assertThat(statementRecorder.listExecutedStatements(StatementRecorder.Selector.ALL)).filteredOn(statement -> {
            return statement instanceof BoundStatement;
        });
        Class<BoundStatement> cls = BoundStatement.class;
        Objects.requireNonNull(BoundStatement.class);
        filteredOn.extracting((v1) -> {
            return r1.cast(v1);
        }).extracting(boundStatement -> {
            return boundStatement.preparedStatement().getQueryString();
        }).containsExactly(new String[]{"INSERT INTO schemaVersion (key,value) VALUES (:key,:value);", "SELECT value FROM schemaVersion;"});
    }

    @Test
    void recordStatementsShouldNotKeepTraceOfExecutedStatementsBeforeRecording(CassandraCluster cassandraCluster2) {
        this.dao.getCurrentSchemaVersion().block();
        StatementRecorder statementRecorder = new StatementRecorder(StatementRecorder.Selector.ALL);
        cassandraCluster2.getConf().recordStatements(statementRecorder);
        Assertions.assertThat(statementRecorder.listExecutedStatements()).isEmpty();
    }

    @Test
    void daoOperationShouldNotBeInstrumentedWhenNotMatching(CassandraCluster cassandraCluster2) {
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.fail().times(1).whenQueryStartsWith("non matching"));
        Assertions.assertThatCode(() -> {
            this.dao.getCurrentSchemaVersion().block();
        }).doesNotThrowAnyException();
    }

    @Test
    void daoOperationShouldFailWhenInstrumented(CassandraCluster cassandraCluster2) {
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.fail().times(1).whenQueryStartsWith("SELECT value FROM schemaVersion;"));
        Assertions.assertThatThrownBy(() -> {
            this.dao.getCurrentSchemaVersion().block();
        }).isInstanceOf(Scenario.InjectedFailureException.class);
    }

    @Test
    void regularStatementsShouldBeInstrumented(CassandraCluster cassandraCluster2) {
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.fail().times(1).whenQueryStartsWith("SELECT value FROM schemaVersion;"));
        Assertions.assertThatThrownBy(() -> {
            new CassandraAsyncExecutor(cassandraCluster2.getConf()).execute(QueryBuilder.select(new String[]{"value"}).from("schemaVersion")).block();
        }).isInstanceOf(Scenario.InjectedFailureException.class);
    }

    @Test
    void forAllQueriesShouldMatchAllStatements(CassandraCluster cassandraCluster2) {
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.fail().times(1).forAllQueries());
        Assertions.assertThatThrownBy(() -> {
            this.dao.getCurrentSchemaVersion().block();
        }).isInstanceOf(Scenario.InjectedFailureException.class);
    }

    @Test
    void daoShouldNotBeInstrumentedWhenTimesIsExceeded(CassandraCluster cassandraCluster2) {
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.fail().times(1).whenQueryStartsWith("SELECT value FROM schemaVersion;"));
        try {
            this.dao.getCurrentSchemaVersion().block();
        } catch (Exception e) {
        }
        Assertions.assertThatCode(() -> {
            this.dao.getCurrentSchemaVersion().block();
        }).doesNotThrowAnyException();
    }

    @Test
    void timesShouldSpecifyExactlyTheFailureCount(CassandraCluster cassandraCluster2) {
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.fail().times(2).whenQueryStartsWith("SELECT value FROM schemaVersion;"));
        SoftAssertions.assertSoftly(softAssertions -> {
            Assertions.assertThatThrownBy(() -> {
                this.dao.getCurrentSchemaVersion().block();
            }).isInstanceOf(Scenario.InjectedFailureException.class);
            Assertions.assertThatThrownBy(() -> {
                this.dao.getCurrentSchemaVersion().block();
            }).isInstanceOf(Scenario.InjectedFailureException.class);
            Assertions.assertThatCode(() -> {
                this.dao.getCurrentSchemaVersion().block();
            }).doesNotThrowAnyException();
        });
    }

    @Test
    void scenarioShouldDefiningSeveralHooks(CassandraCluster cassandraCluster2) {
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.executeNormally().times(1).whenQueryStartsWith("SELECT value FROM schemaVersion;"), Scenario.Builder.fail().times(1).whenQueryStartsWith("SELECT value FROM schemaVersion;"));
        SoftAssertions.assertSoftly(softAssertions -> {
            Assertions.assertThatCode(() -> {
                this.dao.getCurrentSchemaVersion().block();
            }).doesNotThrowAnyException();
            Assertions.assertThatThrownBy(() -> {
                this.dao.getCurrentSchemaVersion().block();
            }).isInstanceOf(Scenario.InjectedFailureException.class);
            Assertions.assertThatCode(() -> {
                this.dao.getCurrentSchemaVersion().block();
            }).doesNotThrowAnyException();
        });
    }

    @Test
    void foreverShouldAlwaysApplyBehaviour(CassandraCluster cassandraCluster2) {
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.fail().forever().whenQueryStartsWith("SELECT value FROM schemaVersion;"));
        SoftAssertions.assertSoftly(softAssertions -> {
            Assertions.assertThatThrownBy(() -> {
                this.dao.getCurrentSchemaVersion().block();
            }).isInstanceOf(Scenario.InjectedFailureException.class);
            Assertions.assertThatThrownBy(() -> {
                this.dao.getCurrentSchemaVersion().block();
            }).isInstanceOf(Scenario.InjectedFailureException.class);
            Assertions.assertThatThrownBy(() -> {
                this.dao.getCurrentSchemaVersion().block();
            }).isInstanceOf(Scenario.InjectedFailureException.class);
        });
    }

    @Test
    void timesShouldBeTakenIntoAccountOnlyForMatchingStatements(CassandraCluster cassandraCluster2) {
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.fail().times(1).whenQueryStartsWith("SELECT value FROM schemaVersion;"));
        this.dao.updateVersion(new SchemaVersion(36)).block();
        Assertions.assertThatThrownBy(() -> {
            this.dao.getCurrentSchemaVersion().block();
        }).isInstanceOf(Scenario.InjectedFailureException.class);
    }

    @Test
    void statementShouldNotBeAppliedBeforeBarrierIsReleased(CassandraCluster cassandraCluster2) throws Exception {
        SchemaVersion schemaVersion = new SchemaVersion(32);
        this.dao.updateVersion(schemaVersion).block();
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.awaitOn(new Scenario.Barrier()).thenExecuteNormally().times(1).whenQueryStartsWith("INSERT INTO schemaVersion"));
        this.dao.updateVersion(new SchemaVersion(36)).subscribeOn(Schedulers.elastic()).subscribe();
        Thread.sleep(100L);
        Assertions.assertThat((Optional) this.dao.getCurrentSchemaVersion().block()).contains(schemaVersion);
    }

    @Test
    void statementShouldBeAppliedWhenBarrierIsReleased(CassandraCluster cassandraCluster2) throws Exception {
        SchemaVersion schemaVersion = new SchemaVersion(32);
        SchemaVersion schemaVersion2 = new SchemaVersion(36);
        this.dao.updateVersion(schemaVersion).block();
        Scenario.Barrier barrier = new Scenario.Barrier();
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.awaitOn(barrier).thenExecuteNormally().times(1).whenQueryStartsWith("INSERT INTO schemaVersion"));
        CompletableFuture future = this.dao.updateVersion(schemaVersion2).subscribeOn(Schedulers.elastic()).toFuture();
        barrier.releaseCaller();
        future.get();
        Assertions.assertThat((Optional) this.dao.getCurrentSchemaVersion().block()).contains(schemaVersion2);
    }

    @Test
    void testShouldBeAbleToAwaitCaller(CassandraCluster cassandraCluster2) throws Exception {
        SchemaVersion schemaVersion = new SchemaVersion(32);
        SchemaVersion schemaVersion2 = new SchemaVersion(36);
        this.dao.updateVersion(schemaVersion).block();
        Scenario.Barrier barrier = new Scenario.Barrier();
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.awaitOn(barrier).thenExecuteNormally().times(1).whenQueryStartsWith("INSERT INTO schemaVersion"));
        CompletableFuture future = this.dao.updateVersion(schemaVersion2).subscribeOn(Schedulers.elastic()).toFuture();
        barrier.awaitCaller();
        barrier.releaseCaller();
        future.get();
        Assertions.assertThat((Optional) this.dao.getCurrentSchemaVersion().block()).contains(schemaVersion2);
    }

    @Test
    void awaitOnShouldBeAbleToInjectFailure(CassandraCluster cassandraCluster2) throws Exception {
        SchemaVersion schemaVersion = new SchemaVersion(32);
        SchemaVersion schemaVersion2 = new SchemaVersion(36);
        this.dao.updateVersion(schemaVersion).block();
        Scenario.Barrier barrier = new Scenario.Barrier();
        cassandraCluster2.getConf().registerScenario(Scenario.Builder.awaitOn(barrier).thenFail().times(1).whenQueryStartsWith("INSERT INTO schemaVersion"));
        CompletableFuture future = this.dao.updateVersion(schemaVersion2).subscribeOn(Schedulers.elastic()).toFuture();
        barrier.awaitCaller();
        barrier.releaseCaller();
        Objects.requireNonNull(future);
        Assertions.assertThatThrownBy(future::get).hasCauseInstanceOf(Scenario.InjectedFailureException.class);
    }
}
