package org.apache.nifi.record.sink.kafka;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;

@CapabilityDescription("Provides a service to write records to a Kafka 2.6+ topic.")
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties. In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged. For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ", expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@Tags({"kafka", "record", "sink"})
/* loaded from: input_file:org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.class */
public class KafkaRecordSink_2_6 extends AbstractControllerService implements KafkaClientComponent, RecordSinkService {
    static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", "Records are considered 'transmitted unsuccessfully' unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration.");
    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "Records are considered 'transmitted successfully' if the message is received by a single Kafka node, whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes.");
    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "Records are considered 'transmitted successfully' after successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result in data loss.");
    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder().name("topic").displayName("Topic Name").description("The name of the Kafka Topic to publish to.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder().name("acks").displayName("Delivery Guarantee").description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new AllowableValue[]{DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED}).defaultValue(DELIVERY_BEST_EFFORT.getValue()).build();
    static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder().name("max.block.ms").displayName("Max Metadata Wait Time").description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the entire 'send' call. Corresponds to Kafka's 'max.block.ms' property").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("5 sec").build();
    static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder().name("ack.wait.time").displayName("Acknowledgment Wait Time").description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).defaultValue("5 secs").build();
    static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder().name("max.request.size").displayName("Max Request Size").description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).").required(true).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").build();
    static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("compression.type").displayName("Compression Type").description("This parameter allows you to specify the compression codec for all data generated by this producer.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).allowableValues(new String[]{"none", "gzip", "snappy", "lz4"}).defaultValue("none").build();
    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder().name("message-header-encoding").displayName("Message Header Encoding").description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, this property indicates the Character Encoding to use for serializing the headers.").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).defaultValue("UTF-8").required(false).build();
    private List<PropertyDescriptor> properties;
    private volatile RecordSetWriterFactory writerFactory;
    private volatile int maxMessageSize;
    private volatile long maxAckWaitMillis;
    private volatile String topic;
    private volatile Producer<byte[], byte[]> producer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6$KafkaSendException.class */
    public static class KafkaSendException extends RuntimeException {
        KafkaSendException(Throwable th) {
            super(th);
        }
    }

    protected void init(ControllerServiceInitializationContext controllerServiceInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(BOOTSTRAP_SERVERS);
        arrayList.add(TOPIC);
        arrayList.add(RecordSinkService.RECORD_WRITER_FACTORY);
        arrayList.add(DELIVERY_GUARANTEE);
        arrayList.add(MESSAGE_HEADER_ENCODING);
        arrayList.add(SECURITY_PROTOCOL);
        arrayList.add(KERBEROS_CREDENTIALS_SERVICE);
        arrayList.add(KERBEROS_SERVICE_NAME);
        arrayList.add(SSL_CONTEXT_SERVICE);
        arrayList.add(MAX_REQUEST_SIZE);
        arrayList.add(ACK_WAIT_TIME);
        arrayList.add(METADATA_WAIT_TIME);
        arrayList.add(COMPRESSION_CODEC);
        this.properties = Collections.unmodifiableList(arrayList);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().description("Specifies the value for '" + str + "' Kafka Configuration.").name(str).addValidator(new DynamicPropertyValidator(ProducerConfig.class)).dynamic(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        return new KafkaClientCustomValidationFunction().apply(validationContext);
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext configurationContext) throws InitializationException {
        this.topic = configurationContext.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
        this.writerFactory = configurationContext.getProperty(RecordSinkService.RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
        this.maxMessageSize = configurationContext.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
        this.maxAckWaitMillis = configurationContext.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
        Map<String, Object> properties = new StandardKafkaPropertyProvider(ProducerConfig.class).getProperties(configurationContext);
        properties.put("key.serializer", ByteArraySerializer.class.getName());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        properties.put("max.request.size", String.valueOf(this.maxMessageSize));
        try {
            this.producer = createProducer(properties);
        } catch (Exception e) {
            getLogger().error("Could not create Kafka producer due to {}", new Object[]{e.getMessage()}, e);
            throw new InitializationException(e);
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r16v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r16v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 16, insn: 0x0138: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r16 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:56:0x0138 */
    /* JADX WARN: Not initialized variable reg: 17, insn: 0x013d: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r17 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:58:0x013d */
    /* JADX WARN: Type inference failed for: r16v0, types: [org.apache.nifi.serialization.RecordSetWriter] */
    /* JADX WARN: Type inference failed for: r17v0, types: [java.lang.Throwable] */
    public WriteResult sendData(RecordSet recordSet, Map<String, String> map, boolean z) throws IOException {
        ?? r16;
        ?? r17;
        try {
            try {
                RecordSchema schema = getWriterFactory().getSchema((Map) null, recordSet.getSchema());
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                ByteCountingOutputStream byteCountingOutputStream = new ByteCountingOutputStream(byteArrayOutputStream);
                int i = 0;
                RecordSetWriter createWriter = getWriterFactory().createWriter(getLogger(), schema, byteCountingOutputStream, map);
                Throwable th = null;
                createWriter.beginRecordSet();
                do {
                    Record next = recordSet.next();
                    if (next == null) {
                        WriteResult finishRecordSet = createWriter.finishRecordSet();
                        if (byteCountingOutputStream.getBytesWritten() > this.maxMessageSize) {
                            throw new TokenTooLargeException("The query's result set size exceeds the maximum allowed message size of " + this.maxMessageSize + " bytes.");
                        }
                        int recordCount = finishRecordSet.getRecordCount();
                        map.put(CoreAttributes.MIME_TYPE.key(), createWriter.getMimeType());
                        map.put("record.count", Integer.toString(recordCount));
                        map.putAll(finishRecordSet.getAttributes());
                        if (createWriter != null) {
                            if (0 != 0) {
                                try {
                                    createWriter.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createWriter.close();
                            }
                        }
                        if (recordCount > 0 || z) {
                            try {
                                this.producer.send(new ProducerRecord(this.topic, (Integer) null, (Object) null, byteArrayOutputStream.toByteArray()), (recordMetadata, exc) -> {
                                    if (exc != null) {
                                        throw new KafkaSendException(exc);
                                    }
                                }).get(this.maxAckWaitMillis, TimeUnit.MILLISECONDS);
                            } catch (InterruptedException e) {
                                getLogger().warn("Interrupted while waiting for an acknowledgement from Kafka");
                                Thread.currentThread().interrupt();
                            } catch (TimeoutException e2) {
                                getLogger().warn("Timed out while waiting for an acknowledgement from Kafka");
                            } catch (KafkaSendException e3) {
                                Throwable cause = e3.getCause();
                                if (cause instanceof IOException) {
                                    throw ((IOException) cause);
                                }
                                throw new IOException(cause);
                            }
                        } else {
                            finishRecordSet = WriteResult.EMPTY;
                        }
                        return finishRecordSet;
                    }
                    createWriter.write(next);
                    i++;
                } while (byteCountingOutputStream.getBytesWritten() <= this.maxMessageSize);
                throw new TokenTooLargeException("The query's result set size exceeds the maximum allowed message size of " + this.maxMessageSize + " bytes.");
            } catch (Throwable th3) {
                if (r16 != 0) {
                    if (r17 != 0) {
                        try {
                            r16.close();
                        } catch (Throwable th4) {
                            r17.addSuppressed(th4);
                        }
                    } else {
                        r16.close();
                    }
                }
                throw th3;
            }
        } catch (IOException e4) {
            throw e4;
        } catch (Exception e5) {
            throw new IOException("Failed to write metrics using record writer: " + e5.getMessage(), e5);
        }
    }

    @OnDisabled
    public void stop() {
        if (this.producer != null) {
            this.producer.close(Duration.ofMillis(this.maxAckWaitMillis));
        }
    }

    protected RecordSetWriterFactory getWriterFactory() {
        return this.writerFactory;
    }

    protected Producer<byte[], byte[]> createProducer(Map<String, Object> map) {
        return new KafkaProducer(map);
    }
}
