package org.apache.beam.it.cassandra;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.common.utils.ExceptionUtils;
import org.apache.beam.it.testcontainers.TestContainerResourceManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:org/apache/beam/it/cassandra/CassandraResourceManager.class */
public class CassandraResourceManager extends TestContainerResourceManager<GenericContainer<?>> implements ResourceManager {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraResourceManager.class);
    private static final String DEFAULT_CASSANDRA_CONTAINER_NAME = "cassandra";
    private static final String DEFAULT_CASSANDRA_CONTAINER_TAG = "4.1.0";
    private static final int CASSANDRA_INTERNAL_PORT = 9042;
    private final CqlSession cassandraClient;
    private final String keyspaceName;
    private final boolean usingStaticDatabase;

    /* loaded from: input_file:org/apache/beam/it/cassandra/CassandraResourceManager$Builder.class */
    public static final class Builder extends TestContainerResourceManager.Builder<CassandraResourceManager> {

        @Nullable
        private String keyspaceName;

        private Builder(String str) {
            super(str, CassandraResourceManager.DEFAULT_CASSANDRA_CONTAINER_NAME, CassandraResourceManager.DEFAULT_CASSANDRA_CONTAINER_TAG);
            this.keyspaceName = null;
        }

        public Builder setKeyspaceName(String str) {
            this.keyspaceName = str;
            return this;
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public CassandraResourceManager m1build() {
            return new CassandraResourceManager(this);
        }
    }

    private CassandraResourceManager(Builder builder) {
        this(null, new CassandraContainer(DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)), builder);
    }

    @VisibleForTesting
    CassandraResourceManager(@Nullable CqlSession cqlSession, CassandraContainer<?> cassandraContainer, Builder builder) {
        super(cassandraContainer, builder);
        this.usingStaticDatabase = builder.keyspaceName != null;
        this.keyspaceName = this.usingStaticDatabase ? builder.keyspaceName : CassandraResourceManagerUtils.generateKeyspaceName(builder.testId);
        this.cassandraClient = cqlSession == null ? (CqlSession) CqlSession.builder().addContactPoint(new InetSocketAddress(getHost(), getPort(CASSANDRA_INTERNAL_PORT))).withLocalDatacenter("datacenter1").build() : cqlSession;
        if (this.usingStaticDatabase) {
            return;
        }
        Failsafe.with(buildRetryPolicy(), new RetryPolicy[0]).run(() -> {
            this.cassandraClient.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}", this.keyspaceName));
        });
    }

    public static Builder builder(String str) {
        return new Builder(str);
    }

    public int getPort() {
        return super.getPort(CASSANDRA_INTERNAL_PORT);
    }

    public synchronized String getKeyspaceName() {
        return this.keyspaceName;
    }

    public synchronized ResultSet executeStatement(String str) {
        LOG.info("Executing statement: {}", str);
        try {
            return (ResultSet) Failsafe.with(buildRetryPolicy(), new RetryPolicy[0]).get(() -> {
                return this.cassandraClient.execute(SimpleStatement.newInstance(str).setKeyspace(this.keyspaceName));
            });
        } catch (Exception e) {
            throw new CassandraResourceManagerException("Error reading collection.", e);
        }
    }

    public synchronized boolean insertDocument(String str, Map<String, Object> map) {
        return insertDocuments(str, ImmutableList.of(map));
    }

    public synchronized boolean insertDocuments(String str, List<Map<String, Object>> list) throws CassandraResourceManagerException {
        LOG.info("Attempting to write {} documents to {}.{}.", new Object[]{Integer.valueOf(list.size()), this.keyspaceName, str});
        try {
            Iterator<Map<String, Object>> it = list.iterator();
            while (it.hasNext()) {
                executeStatement(createInsertStatement(str, it.next()));
            }
            LOG.info("Successfully wrote {} documents to {}.{}", new Object[]{Integer.valueOf(list.size()), this.keyspaceName, str});
            return true;
        } catch (Exception e) {
            throw new CassandraResourceManagerException("Error inserting documents.", e);
        }
    }

    public synchronized Iterable<Row> readTable(String str) throws CassandraResourceManagerException {
        LOG.info("Reading all documents from {}.{}", this.keyspaceName, str);
        try {
            List all = executeStatement(String.format("SELECT * FROM %s", str)).all();
            LOG.info("Successfully loaded documents from {}.{}", this.keyspaceName, str);
            return all;
        } catch (Exception e) {
            throw new CassandraResourceManagerException("Error reading table.", e);
        }
    }

    public synchronized void cleanupAll() {
        LOG.info("Attempting to cleanup Cassandra manager.");
        boolean z = false;
        if (!this.usingStaticDatabase) {
            try {
                executeStatement(String.format("DROP KEYSPACE IF EXISTS %s", this.keyspaceName));
            } catch (Exception e) {
                LOG.error("Failed to drop Cassandra keyspace {}.", this.keyspaceName, e);
                if (!ExceptionUtils.containsType(e, DriverTimeoutException.class) && !ExceptionUtils.containsMessage(e, "does not exist")) {
                    z = true;
                }
            }
        }
        try {
            this.cassandraClient.close();
        } catch (Exception e2) {
            LOG.error("Failed to delete Cassandra client.", e2);
            z = true;
        }
        if (z) {
            throw new CassandraResourceManagerException("Failed to delete resources. Check above for errors.");
        }
        super.cleanupAll();
        LOG.info("Cassandra manager successfully cleaned up.");
    }

    private String createInsertStatement(String str, Map<String, Object> map) {
        StringBuilder sb = new StringBuilder();
        StringBuilder sb2 = new StringBuilder();
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            sb.append(entry.getKey()).append(", ");
            if (entry.getValue() instanceof String) {
                sb2.append("'").append(entry.getValue()).append("'");
            } else {
                sb2.append(entry.getValue());
            }
            sb2.append(", ");
        }
        if (!map.isEmpty()) {
            sb.delete(sb.length() - 2, sb.length());
            sb2.delete(sb2.length() - 2, sb2.length());
        }
        return String.format("INSERT INTO %s (%s) VALUES (%s)", str, sb, sb2);
    }

    private static RetryPolicy<Object> buildRetryPolicy() {
        return ((RetryPolicyBuilder) RetryPolicy.builder().withMaxRetries(5).withDelay(Duration.ofSeconds(1L)).handle(DriverTimeoutException.class)).build();
    }
}
