package org.apache.nifi.kafka.connect;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.class */
public class StatelessNiFiSinkTask extends SinkTask {
    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSinkTask.class);
    private StatelessDataflow dataflow;
    private String inputPortName;
    private Set<String> failurePortNames;
    private long timeoutMillis;
    private Pattern headerNameRegex;
    private String headerNamePrefix;
    private QueueSize queueSize;
    private String dataflowName;
    private long backoffMillis = 0;

    public String version() {
        return StatelessKafkaConnectorUtil.getVersion();
    }

    public void start(Map<String, String> map) {
        logger.info("Starting Sink Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(map));
        this.timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(map.getOrDefault("dataflow.timeout", "60 sec"), TimeUnit.MILLISECONDS);
        this.dataflowName = map.get("name");
        String str = map.get("headers.as.attributes.regex");
        this.headerNameRegex = str == null ? null : Pattern.compile(str);
        this.headerNamePrefix = map.getOrDefault("attribute.prefix", "");
        this.dataflow = StatelessKafkaConnectorUtil.createDataflow(map);
        String str2 = map.get("name");
        this.inputPortName = map.get("input.port");
        if (this.inputPortName == null) {
            Set inputPortNames = this.dataflow.getInputPortNames();
            if (inputPortNames.isEmpty()) {
                throw new ConfigException("The dataflow specified for <" + str2 + "> does not have an Input Port at the root level. Dataflows used for a Kafka Connect Sink Task must have at least one Input Port at the root level.");
            }
            if (inputPortNames.size() > 1) {
                throw new ConfigException("The dataflow specified for <" + str2 + "> has multiple Input Ports at the root level (" + inputPortNames.toString() + "). The input.port property must be set to indicate which of these Ports Kafka records should be sent to.");
            }
            this.inputPortName = (String) inputPortNames.iterator().next();
        }
        if (!this.dataflow.getInputPortNames().contains(this.inputPortName)) {
            throw new ConfigException("The dataflow specified for <" + str2 + "> does not have Input Port with name <" + this.inputPortName + "> at the root level. Existing Input Port names are " + this.dataflow.getInputPortNames());
        }
        String str3 = map.get("failure.ports");
        if (str3 == null || str3.trim().isEmpty()) {
            this.failurePortNames = Collections.emptySet();
        } else {
            this.failurePortNames = new HashSet();
            for (String str4 : str3.split(",")) {
                this.failurePortNames.add(str4.trim());
            }
        }
        Set outputPortNames = this.dataflow.getOutputPortNames();
        for (String str5 : this.failurePortNames) {
            if (!outputPortNames.contains(str5)) {
                throw new ConfigException("Dataflow was configured with a Failure Port of " + str5 + " but there is no Port with that name in the dataflow. Valid Port names are " + outputPortNames);
            }
        }
    }

    public void put(Collection<SinkRecord> collection) {
        logger.debug("Enqueuing {} Kafka messages", Integer.valueOf(collection.size()));
        for (SinkRecord sinkRecord : collection) {
            Map<String, String> createAttributes = createAttributes(sinkRecord);
            this.queueSize = this.dataflow.enqueue(getContents(sinkRecord.value()), createAttributes, this.inputPortName);
        }
    }

    private void backoff() {
        if (this.backoffMillis == 0) {
            this.backoffMillis = 1000L;
        }
        this.backoffMillis = Math.min(this.backoffMillis * 2, 10000L);
        this.context.timeout(this.backoffMillis);
    }

    private void resetBackoff() {
        this.backoffMillis = 0L;
    }

    private synchronized void triggerDataflow() {
        long nanoTime = System.nanoTime();
        while (this.dataflow.isFlowFileQueued()) {
            DataflowTrigger trigger = this.dataflow.trigger();
            try {
                Optional result = trigger.getResult(this.timeoutMillis, TimeUnit.MILLISECONDS);
                if (result.isPresent()) {
                    TriggerResult triggerResult = (TriggerResult) result.get();
                    if (triggerResult.isSuccessful()) {
                        verifyOutputPortContents(trigger, triggerResult);
                        triggerResult.acknowledge();
                        resetBackoff();
                    } else {
                        retry(trigger, "Dataflow " + this.dataflowName + " failed to execute properly", (Throwable) triggerResult.getFailureCause().orElse(null));
                    }
                } else {
                    retry(trigger, "Timed out waiting for dataflow " + this.dataflowName + " to complete", null);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.dataflow.purge();
                throw new RuntimeException("Interrupted while waiting for dataflow to complete", e);
            }
        }
        this.context.requestCommit();
        long nanoTime2 = System.nanoTime() - nanoTime;
        if (this.queueSize != null) {
            logger.debug("Ran dataflow with {} messages ({}) in {} nanos", new Object[]{Integer.valueOf(this.queueSize.getObjectCount()), FormatUtils.formatDataSize(this.queueSize.getByteCount()), Long.valueOf(nanoTime2)});
        }
    }

    private void retry(DataflowTrigger dataflowTrigger, String str, Throwable th) {
        logger.error(str, th);
        dataflowTrigger.cancel();
        backoff();
        this.dataflow.purge();
        throw new RetriableException(str, th);
    }

    private void verifyOutputPortContents(DataflowTrigger dataflowTrigger, TriggerResult triggerResult) {
        for (String str : this.failurePortNames) {
            List outputFlowFiles = triggerResult.getOutputFlowFiles(str);
            if (outputFlowFiles != null && !outputFlowFiles.isEmpty()) {
                logger.error("Dataflow transferred FlowFiles to Port {}, which is configured as a Failure Port. Rolling back session.", str);
                dataflowTrigger.cancel();
                throw new RetriableException("Data was transferred to Failure Port " + str);
            }
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        super.flush(map);
        triggerDataflow();
    }

    private byte[] getContents(Object obj) {
        if (obj == null) {
            return new byte[0];
        }
        if (obj instanceof String) {
            return ((String) obj).getBytes(StandardCharsets.UTF_8);
        }
        if (obj instanceof byte[]) {
            return (byte[]) obj;
        }
        throw new IllegalArgumentException("Unsupported message type: the Message value was " + obj + " but was expected to be a byte array or a String");
    }

    private Map<String, String> createAttributes(SinkRecord sinkRecord) {
        HashMap hashMap = new HashMap();
        hashMap.put("kafka.topic", sinkRecord.topic());
        hashMap.put("kafka.offset", String.valueOf(sinkRecord.kafkaOffset()));
        hashMap.put("kafka.partition", String.valueOf(sinkRecord.kafkaPartition()));
        hashMap.put("kafka.timestamp", String.valueOf(sinkRecord.timestamp()));
        Object key = sinkRecord.key();
        if (key instanceof String) {
            hashMap.put("kafka.key", (String) key);
        }
        if (this.headerNameRegex != null) {
            for (Header header : sinkRecord.headers()) {
                if (this.headerNameRegex.matcher(header.key()).matches()) {
                    hashMap.put(this.headerNamePrefix + header.key(), String.valueOf(header.value()));
                }
            }
        }
        return hashMap;
    }

    public void stop() {
        logger.info("Shutting down Sink Task");
        if (this.dataflow != null) {
            this.dataflow.shutdown();
        }
    }
}
