package org.apache.streams.cassandra;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.schemabuilder.SchemaBuilder;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.util.GuidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/cassandra/CassandraPersistWriter.class */
public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable {
    public static final String STREAMS_ID = "CassandraPersistWriter";
    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraPersistWriter.class);
    private static final long MAX_WRITE_LATENCY = 1000;
    protected volatile Queue<StreamsDatum> persistQueue;
    private ObjectMapper mapper;
    private volatile AtomicLong lastWrite;
    private ScheduledExecutorService backgroundFlushTask;
    private CassandraConfiguration config;
    private CassandraClient client;
    private Session session;
    protected PreparedStatement insertStatement;
    protected List<BoundStatement> insertBatch;
    protected final ReadWriteLock lock;

    public CassandraPersistWriter() {
        this((CassandraConfiguration) new ComponentConfigurator(CassandraConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra")));
    }

    public CassandraPersistWriter(CassandraConfiguration cassandraConfiguration) {
        this.mapper = StreamsJacksonMapper.getInstance();
        this.lastWrite = new AtomicLong(System.currentTimeMillis());
        this.backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
        this.insertBatch = new ArrayList();
        this.lock = new ReentrantReadWriteLock();
        this.config = cassandraConfiguration;
    }

    public void setPersistQueue(Queue<StreamsDatum> queue) {
        this.persistQueue = queue;
    }

    public Queue<StreamsDatum> getPersistQueue() {
        return this.persistQueue;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void write(StreamsDatum streamsDatum) {
        if (streamsDatum.getDocument() instanceof String) {
            try {
                ObjectNode objectNode = (ObjectNode) this.mapper.readValue((String) streamsDatum.getDocument(), ObjectNode.class);
                byte[] bytes = objectNode.toString().getBytes();
                String generateGuid = GuidUtils.generateGuid(new String[]{objectNode.toString()});
                if (!Objects.isNull(streamsDatum.getMetadata().get("id"))) {
                    generateGuid = streamsDatum.getMetadata().get("id").toString();
                }
                this.insertBatch.add(this.insertStatement.bind(new Object[]{generateGuid, ByteBuffer.wrap(bytes)}));
            } catch (IOException e) {
                LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString());
                return;
            }
        } else {
            try {
                ObjectNode valueToTree = this.mapper.valueToTree(streamsDatum.getDocument());
                byte[] bytes2 = valueToTree.toString().getBytes();
                String generateGuid2 = GuidUtils.generateGuid(new String[]{valueToTree.toString()});
                if (!Objects.isNull(streamsDatum.getId())) {
                    generateGuid2 = streamsDatum.getId();
                }
                this.insertBatch.add(this.insertStatement.bind(new Object[]{generateGuid2, ByteBuffer.wrap(bytes2)}));
            } catch (Exception e2) {
                LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString());
                return;
            }
        }
        flushIfNecessary();
    }

    @Override // java.io.Flushable
    public void flush() throws IOException {
        try {
            LOGGER.debug("Attempting to flush {} items to cassandra", Integer.valueOf(this.insertBatch.size()));
            this.lock.writeLock().lock();
            BatchStatement batchStatement = new BatchStatement();
            batchStatement.addAll(this.insertBatch);
            this.session.execute(batchStatement);
            this.lastWrite.set(System.currentTimeMillis());
            this.insertBatch = new ArrayList();
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        this.session.close();
        this.client.cluster().close();
        this.backgroundFlushTask.shutdownNow();
    }

    public void start() {
        try {
            connectToCassandra();
            this.client.start();
            createKeyspaceAndTable();
            createInsertStatement();
            this.backgroundFlushTask.scheduleAtFixedRate(new Runnable() { // from class: org.apache.streams.cassandra.CassandraPersistWriter.1
                @Override // java.lang.Runnable
                public void run() {
                    CassandraPersistWriter.this.flushIfNecessary();
                }
            }, 0L, 2000L, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            LOGGER.error("Exception", e);
        }
    }

    public void stop() {
        try {
            flush();
        } catch (IOException e) {
            LOGGER.error("Error flushing", e);
        }
        try {
            close();
        } catch (IOException e2) {
            LOGGER.error("Error closing", e2);
        }
        try {
            this.backgroundFlushTask.shutdown();
            if (!this.backgroundFlushTask.awaitTermination(15L, TimeUnit.SECONDS)) {
                this.backgroundFlushTask.shutdownNow();
                if (!this.backgroundFlushTask.awaitTermination(15L, TimeUnit.SECONDS)) {
                    LOGGER.error("Stream did not terminate");
                }
            }
        } catch (InterruptedException e3) {
            this.backgroundFlushTask.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            if (this.persistQueue.peek() != null) {
                try {
                    write(this.persistQueue.remove());
                } catch (Exception e) {
                    LOGGER.warn("Failure writing entry from Queue: {}", e.getMessage());
                }
            }
            try {
                Thread.sleep(new Random().nextInt(1));
            } catch (InterruptedException e2) {
                LOGGER.trace("Interrupt", e2);
            }
        }
    }

    public void prepare(Object obj) {
        this.persistQueue = new ConcurrentLinkedQueue();
        start();
    }

    public void cleanUp() {
        stop();
    }

    protected void flushIfNecessary() {
        long currentTimeMillis = System.currentTimeMillis() - this.lastWrite.get();
        if (this.insertBatch.size() > 0) {
            if (this.insertBatch.size() % 100 == 0 || currentTimeMillis > MAX_WRITE_LATENCY) {
                try {
                    flush();
                } catch (IOException e) {
                    LOGGER.error("Error writing to Cassandra", e);
                }
            }
        }
    }

    private synchronized void connectToCassandra() throws Exception {
        this.client = new CassandraClient(this.config);
    }

    private void createKeyspaceAndTable() {
        Metadata metadata = this.client.cluster().getMetadata();
        if (Objects.isNull(metadata.getKeyspace(this.config.getKeyspace()))) {
            LOGGER.info("Keyspace {} does not exist. Creating Keyspace", this.config.getKeyspace());
            HashMap hashMap = new HashMap();
            hashMap.put("class", "SimpleStrategy");
            hashMap.put("replication_factor", 1);
            this.client.cluster().connect().execute(SchemaBuilder.createKeyspace(this.config.getKeyspace()).with().replication(hashMap).getQueryString());
        }
        this.session = this.client.cluster().connect(this.config.getKeyspace());
        if (Objects.isNull(metadata.getKeyspace(this.config.getKeyspace()).getTable(this.config.getTable()))) {
            LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", this.config.getTable(), this.config.getKeyspace());
            this.session.execute(SchemaBuilder.createTable(this.config.getTable()).addPartitionKey(this.config.getPartitionKeyColumn(), DataType.varchar()).addColumn(this.config.getColumn(), DataType.blob()).getQueryString());
        }
    }

    private void createInsertStatement() {
        Insert insertInto = QueryBuilder.insertInto(this.config.getTable());
        insertInto.value(this.config.getPartitionKeyColumn(), new Object());
        insertInto.value(this.config.getColumn(), new Object());
        this.insertStatement = this.session.prepare(insertInto.getQueryString());
    }
}
