package org.apache.nifi.processors.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StringUtils;

@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement on a Cassandra 1.x, 2.x, or 3.0.x cluster. The content of an incoming FlowFile is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be a lowercase string indicating the Cassandra type.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@SupportsBatching
@ReadsAttributes({@ReadsAttribute(attribute = "cql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized CQL statements. The type of each parameter is specified as a lowercase string corresponding to the Cassandra data type (text, int, boolean, e.g.). In the case of collections, the primitive type(s) of the elements in the collection should be comma-delimited, follow the collection type, and be enclosed in angle brackets (< and >), for example set<text> or map<timestamp, int>."), @ReadsAttribute(attribute = "cql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized CQL statements. The value of the parameters are specified as cql.args.1.value, cql.args.2.value, cql.args.3.value, and so on. The  type of the cql.args.1.value parameter is specified by the cql.args.1.type attribute.")})
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"cassandra", "cql", "put", "insert", "update", "set"})
/* loaded from: input_file:org/apache/nifi/processors/cassandra/PutCassandraQL.class */
public class PutCassandraQL extends AbstractCassandraProcessor {
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final Set<Relationship> relationships;

    @VisibleForTesting
    private ConcurrentMap<String, PreparedStatement> statementCache;
    public static final PropertyDescriptor STATEMENT_TIMEOUT = new PropertyDescriptor.Builder().name("Max Wait Time").displayName("Max Wait Time").description("The maximum amount of time allowed for a running CQL select query. Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days. A value of zero means there is no limit. ").defaultValue("0 seconds").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final PropertyDescriptor STATEMENT_CACHE_SIZE = new PropertyDescriptor.Builder().name("putcql-stmt-cache-size").displayName("Statement Cache Size").description("The maximum number of CQL Prepared Statements to cache. This can improve performance if many incoming flow files have the same CQL statement with different values for the parameters. If this property is set to zero, the cache is effectively disabled.").defaultValue("0").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).build();
    private static final Pattern CQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("cql\\.args\\.(\\d+)\\.type");
    private static final Pattern CQL_TYPE_PATTERN = Pattern.compile("([^<]+)(<([^,>]+)(,([^,>]+))*>)?");

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override // org.apache.nifi.processors.cassandra.AbstractCassandraProcessor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        super.onScheduled(processContext);
        this.statementCache = CacheBuilder.newBuilder().maximumSize(processContext.getProperty(STATEMENT_CACHE_SIZE).evaluateAttributeExpressions().asInteger().intValue()).build().asMap();
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        ComponentLog logger = getLogger();
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        long nanoTime = System.nanoTime();
        long longValue = processContext.getProperty(STATEMENT_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
        Charset forName = Charset.forName(processContext.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
        Session session = this.cassandraSession.get();
        String cql = getCQL(processSession, flowFile, forName);
        try {
            PreparedStatement preparedStatement = this.statementCache.get(cql);
            if (preparedStatement == null) {
                preparedStatement = session.prepare(cql);
                this.statementCache.put(cql, preparedStatement);
            }
            BoundStatement bind = preparedStatement.bind();
            Map attributes = flowFile.getAttributes();
            for (Map.Entry entry : attributes.entrySet()) {
                String str = (String) entry.getKey();
                Matcher matcher = CQL_TYPE_ATTRIBUTE_PATTERN.matcher(str);
                if (matcher.matches()) {
                    int parseInt = Integer.parseInt(matcher.group(1));
                    String str2 = (String) entry.getValue();
                    if (StringUtils.isEmpty(str2)) {
                        throw new ProcessException("Value of the " + str + " attribute is null or empty, it must contain a valid value");
                    }
                    String trim = str2.trim();
                    String str3 = "cql.args." + parseInt + ".value";
                    String str4 = (String) attributes.get(str3);
                    try {
                        setStatementObject(bind, parseInt - 1, str3, str4, trim);
                    } catch (InvalidTypeException | IllegalArgumentException e) {
                        throw new ProcessException("The value of the " + str3 + " is '" + str4 + "', which cannot be converted into the necessary data type: " + trim, e);
                    }
                }
            }
            try {
                ResultSetFuture executeAsync = session.executeAsync(bind);
                if (longValue > 0) {
                    executeAsync.getUninterruptibly(longValue, TimeUnit.MILLISECONDS);
                } else {
                    executeAsync.getUninterruptibly();
                }
                processSession.getProvenanceReporter().send(flowFile, "cassandra://" + session.getCluster().getMetadata().getClusterName(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime), true);
                processSession.transfer(flowFile, REL_SUCCESS);
            } catch (TimeoutException e2) {
                throw new ProcessException(e2);
            }
        } catch (QueryExecutionException e3) {
            logger.error("Cannot execute the statement with the requested consistency level successfully", e3);
            processSession.transfer(processSession.penalize(flowFile), REL_RETRY);
        } catch (NoHostAvailableException e4) {
            getLogger().error("No host in the Cassandra cluster can be contacted successfully to execute this statement", e4);
            getLogger().error(e4.getCustomMessage(10, true, false));
            processSession.transfer(processSession.penalize(flowFile), REL_RETRY);
        } catch (QueryValidationException e5) {
            logger.error("The CQL statement {} is invalid due to syntax error, authorization issue, or another validation problem; routing {} to failure", new Object[]{cql, flowFile, e5});
            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
        } catch (ProcessException e6) {
            logger.error("Unable to execute CQL select statement {} for {} due to {}; routing to failure", new Object[]{cql, flowFile, e6});
            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
        }
    }

    private String getCQL(ProcessSession processSession, FlowFile flowFile, Charset charset) {
        final byte[] bArr = new byte[(int) flowFile.getSize()];
        processSession.read(flowFile, new InputStreamCallback() { // from class: org.apache.nifi.processors.cassandra.PutCassandraQL.1
            public void process(InputStream inputStream) throws IOException {
                StreamUtils.fillBuffer(inputStream, bArr);
            }
        });
        return new String(bArr, charset);
    }

    protected void setStatementObject(BoundStatement boundStatement, int i, String str, String str2, String str3) throws IllegalArgumentException {
        if (str2 == null) {
            boundStatement.setToNull(i);
            return;
        }
        if (str3 == null) {
            throw new IllegalArgumentException("Parameter type for " + str + " cannot be null");
        }
        Matcher matcher = CQL_TYPE_PATTERN.matcher(str3);
        if (matcher.find() && matcher.groupCount() > 1) {
            String lowerCase = matcher.group(1).toLowerCase();
            DataType primitiveDataTypeFromString = getPrimitiveDataTypeFromString(lowerCase);
            if (primitiveDataTypeFromString != null) {
                TypeCodec codecFor = codecRegistry.codecFor(primitiveDataTypeFromString);
                if (primitiveDataTypeFromString.equals(DataType.ascii()) || primitiveDataTypeFromString.equals(DataType.text()) || primitiveDataTypeFromString.equals(DataType.varchar()) || primitiveDataTypeFromString.equals(DataType.inet()) || primitiveDataTypeFromString.equals(DataType.varint())) {
                    boundStatement.setString(i, str2);
                    return;
                }
                if (primitiveDataTypeFromString.equals(DataType.cboolean())) {
                    boundStatement.setBool(i, ((Boolean) codecFor.parse(str2)).booleanValue());
                    return;
                }
                if (primitiveDataTypeFromString.equals(DataType.cint())) {
                    boundStatement.setInt(i, ((Integer) codecFor.parse(str2)).intValue());
                    return;
                }
                if (primitiveDataTypeFromString.equals(DataType.bigint()) || primitiveDataTypeFromString.equals(DataType.counter())) {
                    boundStatement.setLong(i, ((Long) codecFor.parse(str2)).longValue());
                    return;
                }
                if (primitiveDataTypeFromString.equals(DataType.cfloat())) {
                    boundStatement.setFloat(i, ((Float) codecFor.parse(str2)).floatValue());
                    return;
                }
                if (primitiveDataTypeFromString.equals(DataType.cdouble())) {
                    boundStatement.setDouble(i, ((Double) codecFor.parse(str2)).doubleValue());
                    return;
                }
                if (primitiveDataTypeFromString.equals(DataType.blob())) {
                    boundStatement.setBytes(i, (ByteBuffer) codecFor.parse(str2));
                    return;
                }
                if (primitiveDataTypeFromString.equals(DataType.timestamp())) {
                    boundStatement.setTimestamp(i, (Date) codecFor.parse(str2));
                    return;
                } else {
                    if (primitiveDataTypeFromString.equals(DataType.timeuuid()) || primitiveDataTypeFromString.equals(DataType.uuid())) {
                        boundStatement.setUUID(i, (UUID) codecFor.parse(str2));
                        return;
                    }
                    return;
                }
            }
            if (matcher.groupCount() <= 2) {
                throw new IllegalArgumentException("Collection type " + lowerCase + " needs parameterized type(s), such as set<text>");
            }
            DataType primitiveDataTypeFromString2 = getPrimitiveDataTypeFromString(matcher.group(3));
            if (primitiveDataTypeFromString2 == null) {
                throw new IllegalArgumentException("Nested collections are not supported");
            }
            if (DataType.Name.MAP.toString().equalsIgnoreCase(lowerCase)) {
                if (matcher.groupCount() > 4) {
                    boundStatement.setMap(i, (Map) codecRegistry.codecFor(DataType.map(primitiveDataTypeFromString2, getPrimitiveDataTypeFromString(matcher.group(5)))).parse(str2));
                    return;
                }
            } else if (DataType.Name.SET.toString().equalsIgnoreCase(lowerCase)) {
                boundStatement.setSet(i, (Set) codecRegistry.codecFor(DataType.set(primitiveDataTypeFromString2)).parse(str2));
                return;
            } else if (DataType.Name.LIST.toString().equalsIgnoreCase(lowerCase)) {
                boundStatement.setList(i, (List) codecRegistry.codecFor(DataType.list(primitiveDataTypeFromString2)).parse(str2));
                return;
            }
        }
        throw new IllegalArgumentException("Cannot create object of type " + str3 + " using input " + str2);
    }

    @Override // org.apache.nifi.processors.cassandra.AbstractCassandraProcessor
    @OnStopped
    public void stop(ProcessContext processContext) {
        super.stop(processContext);
        this.statementCache.clear();
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(descriptors);
        arrayList.add(STATEMENT_TIMEOUT);
        arrayList.add(STATEMENT_CACHE_SIZE);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_RETRY);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
