/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kudu;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.field.FieldConverter;
import org.apache.nifi.serialization.record.field.StandardFieldConverterRegistry;
import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

public abstract class AbstractKuduProcessor
extends AbstractProcessor {
    static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder().name("Kudu Masters").description("Comma separated addresses of the Kudu masters to connect to.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder().name("kerberos-user-service").displayName("Kerberos User Service").description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos").identifiesControllerService(KerberosUserService.class).required(false).build();
    static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new PropertyDescriptor.Builder().name("kudu-operations-timeout-ms").displayName("Kudu Operation Timeout").description("Default timeout used for user operations (using sessions and scanners)").required(false).defaultValue("30000ms").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    static final PropertyDescriptor KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS = new PropertyDescriptor.Builder().name("kudu-keep-alive-period-timeout-ms").displayName("Kudu Keep Alive Period Timeout").description("Default timeout used for user operations").required(false).defaultValue("15000ms").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    private static final int DEFAULT_WORKER_COUNT = Runtime.getRuntime().availableProcessors();
    static final PropertyDescriptor WORKER_COUNT = new PropertyDescriptor.Builder().name("worker-count").displayName("Kudu Client Worker Count").description("The maximum number of worker threads handling Kudu client read and write operations. Defaults to the number of available processors.").required(true).defaultValue(Integer.toString(DEFAULT_WORKER_COUNT)).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    static final PropertyDescriptor KUDU_SASL_PROTOCOL_NAME = new PropertyDescriptor.Builder().name("kudu-sasl-protocol-name").displayName("Kudu SASL Protocol Name").description("The SASL protocol name to use for authenticating via Kerberos. Must match the service principal name.").required(false).defaultValue("kudu").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    private static final FieldConverter<Object, Timestamp> TIMESTAMP_FIELD_CONVERTER = StandardFieldConverterRegistry.getRegistry().getFieldConverter(Timestamp.class);
    private static final String MICROSECOND_TIMESTAMP_PATTERN = "yyyy-MM-dd HH:mm:ss[.SSSSSS]";
    private volatile KuduClient kuduClient;
    private final ReadWriteLock kuduClientReadWriteLock = new ReentrantReadWriteLock();
    private final Lock kuduClientReadLock = this.kuduClientReadWriteLock.readLock();
    private final Lock kuduClientWriteLock = this.kuduClientReadWriteLock.writeLock();
    private volatile KerberosUser kerberosUser;

    protected KerberosUser getKerberosUser() {
        return this.kerberosUser;
    }

    protected boolean supportsIgnoreOperations() {
        try {
            return this.kuduClient.supportsIgnoreOperations();
        }
        catch (KuduException e) {
            throw new RuntimeException(e);
        }
    }

    protected void createKerberosUserAndOrKuduClient(ProcessContext context) {
        KerberosUserService kerberosUserService = (KerberosUserService)context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
        if (kerberosUserService == null) {
            return;
        }
        this.kerberosUser = kerberosUserService.createKerberosUser();
        this.kerberosUser.login();
        this.createKuduClient(context);
    }

    protected void createKuduClient(ProcessContext context) {
        this.kuduClientWriteLock.lock();
        try {
            if (this.kuduClient != null) {
                try {
                    this.kuduClient.close();
                }
                catch (KuduException e) {
                    this.getLogger().error("Couldn't close Kudu client.");
                }
            }
            if (this.kerberosUser != null) {
                KerberosAction kerberosAction = new KerberosAction(this.kerberosUser, () -> this.buildClient(context), this.getLogger());
                this.kuduClient = (KuduClient)kerberosAction.execute();
            } else {
                this.kuduClient = this.buildClient(context);
            }
        }
        finally {
            this.kuduClientWriteLock.unlock();
        }
    }

    protected KuduClient buildClient(ProcessContext context) {
        String masters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
        int operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
        int adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
        String saslProtocolName = context.getProperty(KUDU_SASL_PROTOCOL_NAME).evaluateAttributeExpressions().getValue();
        int workerCount = context.getProperty(WORKER_COUNT).asInteger();
        boolean corePoolSize = false;
        long threadKeepAliveTime = 60L;
        ThreadPoolExecutor nioExecutor = new ThreadPoolExecutor(0, workerCount, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ClientThreadFactory(this.getIdentifier()));
        return new KuduClient.KuduClientBuilder(masters).defaultAdminOperationTimeoutMs((long)adminOperationTimeout).defaultOperationTimeoutMs((long)operationTimeout).saslProtocolName(saslProtocolName).workerCount(workerCount).nioExecutor((Executor)nioExecutor).build();
    }

    protected void executeOnKuduClient(Consumer<KuduClient> actionOnKuduClient) {
        this.kuduClientReadLock.lock();
        try {
            actionOnKuduClient.accept(this.kuduClient);
        }
        finally {
            this.kuduClientReadLock.unlock();
        }
    }

    protected void flushKuduSession(KuduSession kuduSession, boolean close, List<RowError> rowErrors) throws KuduException {
        List responses;
        List list = responses = close ? kuduSession.close() : kuduSession.flush();
        if (kuduSession.getFlushMode() == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
            rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
        } else {
            responses.stream().filter(OperationResponse::hasRowError).map(OperationResponse::getRowError).forEach(rowErrors::add);
        }
    }

    @OnStopped
    public void shutdown() throws Exception {
        try {
            if (this.kuduClient != null) {
                this.getLogger().debug("Closing KuduClient");
                this.kuduClient.close();
                this.kuduClient = null;
            }
        }
        finally {
            if (this.kerberosUser != null) {
                this.kerberosUser.logout();
                this.kerberosUser = null;
            }
        }
    }

    protected void buildPartialRow(Schema schema, PartialRow row, Record record, List<String> fieldNames, boolean ignoreNull, boolean lowercaseFields) {
        Iterator<String> iterator = fieldNames.iterator();
        block15: while (iterator.hasNext()) {
            String recordFieldName;
            String colName = recordFieldName = iterator.next();
            if (lowercaseFields) {
                colName = colName.toLowerCase();
            }
            if (!schema.hasColumn(colName)) continue;
            int columnIndex = schema.getColumnIndex(colName);
            ColumnSchema colSchema = schema.getColumnByIndex(columnIndex);
            Type colType = colSchema.getType();
            if (record.getValue(recordFieldName) == null) {
                if (schema.getColumnByIndex(columnIndex).isKey()) {
                    throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
                }
                if (!schema.getColumnByIndex(columnIndex).isNullable()) {
                    throw new IllegalArgumentException(String.format("Can't set column %s to null ", colName));
                }
                if (ignoreNull) continue;
                row.setNull(colName);
                continue;
            }
            Object value = record.getValue(recordFieldName);
            Optional fieldDataType = record.getSchema().getDataType(recordFieldName);
            String dataTypeFormat = fieldDataType.map(DataType::getFormat).orElse(null);
            switch (colType) {
                case BOOL: {
                    row.addBoolean(columnIndex, DataTypeUtils.toBoolean((Object)value, (String)recordFieldName).booleanValue());
                    continue block15;
                }
                case INT8: {
                    row.addByte(columnIndex, DataTypeUtils.toByte((Object)value, (String)recordFieldName).byteValue());
                    continue block15;
                }
                case INT16: {
                    row.addShort(columnIndex, DataTypeUtils.toShort((Object)value, (String)recordFieldName).shortValue());
                    continue block15;
                }
                case INT32: {
                    row.addInt(columnIndex, DataTypeUtils.toInteger((Object)value, (String)recordFieldName).intValue());
                    continue block15;
                }
                case INT64: {
                    row.addLong(columnIndex, DataTypeUtils.toLong((Object)value, (String)recordFieldName).longValue());
                    continue block15;
                }
                case UNIXTIME_MICROS: {
                    Optional optionalDataType = record.getSchema().getDataType(recordFieldName);
                    Optional<String> optionalPattern = this.getTimestampPattern(optionalDataType.orElse(null));
                    Timestamp timestamp = (Timestamp)TIMESTAMP_FIELD_CONVERTER.convertField(value, optionalPattern, recordFieldName);
                    row.addTimestamp(columnIndex, timestamp);
                    continue block15;
                }
                case STRING: {
                    row.addString(columnIndex, DataTypeUtils.toString((Object)value, (String)dataTypeFormat));
                    continue block15;
                }
                case BINARY: {
                    row.addBinary(columnIndex, DataTypeUtils.toString((Object)value, (String)dataTypeFormat).getBytes());
                    continue block15;
                }
                case FLOAT: {
                    row.addFloat(columnIndex, DataTypeUtils.toFloat((Object)value, (String)recordFieldName).floatValue());
                    continue block15;
                }
                case DOUBLE: {
                    row.addDouble(columnIndex, DataTypeUtils.toDouble((Object)value, (String)recordFieldName).doubleValue());
                    continue block15;
                }
                case DECIMAL: {
                    row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString((Object)value, (String)dataTypeFormat)));
                    continue block15;
                }
                case VARCHAR: {
                    row.addVarchar(columnIndex, DataTypeUtils.toString((Object)value, (String)dataTypeFormat));
                    continue block15;
                }
                case DATE: {
                    String dateFormat = dataTypeFormat == null ? RecordFieldType.DATE.getDefaultFormat() : dataTypeFormat;
                    row.addDate(columnIndex, this.getDate(value, recordFieldName, dateFormat));
                    continue block15;
                }
            }
            throw new IllegalStateException(String.format("unknown column type %s", colType));
        }
    }

    private Optional<String> getTimestampPattern(DataType dataType) {
        if (dataType == null) {
            return Optional.empty();
        }
        return Optional.of(RecordFieldType.TIMESTAMP == dataType.getFieldType() ? MICROSECOND_TIMESTAMP_PATTERN : dataType.getFormat());
    }

    private Date getDate(Object value, String recordFieldName, String format) {
        FieldConverter converter = StandardFieldConverterRegistry.getRegistry().getFieldConverter(LocalDate.class);
        LocalDate localDate = (LocalDate)converter.convertField(value, Optional.ofNullable(format), recordFieldName);
        return Date.valueOf(localDate);
    }

    private Type toKuduType(DataType nifiType) {
        return switch (nifiType.getFieldType()) {
            case RecordFieldType.BOOLEAN -> Type.BOOL;
            case RecordFieldType.BYTE -> Type.INT8;
            case RecordFieldType.SHORT -> Type.INT16;
            case RecordFieldType.INT -> Type.INT32;
            case RecordFieldType.LONG -> Type.INT64;
            case RecordFieldType.FLOAT -> Type.FLOAT;
            case RecordFieldType.DOUBLE -> Type.DOUBLE;
            case RecordFieldType.DECIMAL -> Type.DECIMAL;
            case RecordFieldType.TIMESTAMP -> Type.UNIXTIME_MICROS;
            case RecordFieldType.CHAR, RecordFieldType.STRING -> Type.STRING;
            case RecordFieldType.DATE -> Type.DATE;
            default -> throw new IllegalArgumentException(String.format("unsupported type %s", nifiType));
        };
    }

    private ColumnTypeAttributes getKuduTypeAttributes(DataType nifiType) {
        if (nifiType.getFieldType().equals((Object)RecordFieldType.DECIMAL)) {
            DecimalDataType decimalDataType = (DecimalDataType)nifiType;
            return new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(decimalDataType.getPrecision()).scale(decimalDataType.getScale()).build();
        }
        return null;
    }

    protected AlterTableOptions getAddNullableColumnStatement(String columnName, DataType nifiType) {
        AlterTableOptions alterTable = new AlterTableOptions();
        alterTable.addColumn(new ColumnSchema.ColumnSchemaBuilder(columnName, this.toKuduType(nifiType)).nullable(true).typeAttributes(this.getKuduTypeAttributes(nifiType)).build());
        return alterTable;
    }

    private static class ClientThreadFactory
    implements ThreadFactory {
        private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
        private final AtomicInteger threadCount = new AtomicInteger();
        private final String identifier;

        private ClientThreadFactory(String identifier) {
            this.identifier = identifier;
        }

        @Override
        public Thread newThread(Runnable runnable) {
            Thread thread = this.defaultThreadFactory.newThread(runnable);
            thread.setDaemon(true);
            thread.setName(this.getName());
            return thread;
        }

        private String getName() {
            return String.format("PutKudu[%s]-client-%d", this.identifier, this.threadCount.getAndIncrement());
        }
    }
}

