package org.apache.hop.pipeline.transforms.kafka.consumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.hop.core.Const;
import org.apache.hop.core.Result;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.exception.HopTransformException;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.RowDataUtil;
import org.apache.hop.core.variables.IVariables;
import org.apache.hop.pipeline.Pipeline;
import org.apache.hop.pipeline.PipelineMeta;
import org.apache.hop.pipeline.SingleThreadedPipelineExecutor;
import org.apache.hop.pipeline.TransformWithMappingMeta;
import org.apache.hop.pipeline.engines.local.LocalPipelineEngine;
import org.apache.hop.pipeline.transform.BaseTransform;
import org.apache.hop.pipeline.transform.ITransform;
import org.apache.hop.pipeline.transform.RowAdapter;
import org.apache.hop.pipeline.transform.TransformMeta;
import org.apache.hop.pipeline.transforms.injector.InjectorMeta;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

/* loaded from: input_file:org/apache/hop/pipeline/transforms/kafka/consumer/KafkaConsumerInput.class */
public class KafkaConsumerInput extends BaseTransform<KafkaConsumerInputMeta, KafkaConsumerInputData> {
    private static final Class<?> PKG = KafkaConsumerInputMeta.class;

    public KafkaConsumerInput(TransformMeta transformMeta, KafkaConsumerInputMeta kafkaConsumerInputMeta, KafkaConsumerInputData kafkaConsumerInputData, int i, PipelineMeta pipelineMeta, Pipeline pipeline) {
        super(transformMeta, kafkaConsumerInputMeta, kafkaConsumerInputData, i, pipelineMeta, pipeline);
    }

    public boolean init() {
        if (!super.init()) {
            return false;
        }
        try {
            ((KafkaConsumerInputData) this.data).outputRowMeta = this.meta.getRowMeta(getTransformName(), this);
        } catch (HopTransformException e) {
            this.log.logError("Error determining output row metadata", e);
        }
        ((KafkaConsumerInputData) this.data).incomingRowsBuffer = new ArrayList();
        ((KafkaConsumerInputData) this.data).batchDuration = Const.toInt(resolve(this.meta.getBatchDuration()), 0);
        ((KafkaConsumerInputData) this.data).batchSize = Const.toInt(resolve(this.meta.getBatchSize()), 0);
        ((KafkaConsumerInputData) this.data).consumer = buildKafkaConsumer(this, this.meta);
        ((KafkaConsumerInputData) this.data).consumer.subscribe((Set) this.meta.getTopics().stream().map(this::resolve).collect(Collectors.toSet()));
        try {
            initSubPipeline();
            ((KafkaConsumerInputData) this.data).isKafkaConsumerClosing = false;
            return true;
        } catch (Exception e2) {
            logError("Error initializing sub-transformation", e2);
            return false;
        }
    }

    private void initSubPipeline() throws HopException {
        try {
            String resolve = resolve(this.meta.getFilename());
            PipelineMeta pipelineMeta = new PipelineMeta(resolve, this.metadataProvider, this);
            pipelineMeta.setMetadataProvider(this.metadataProvider);
            pipelineMeta.setFilename(resolve);
            pipelineMeta.setPipelineType(PipelineMeta.PipelineType.SingleThreaded);
            logDetailed("Loaded sub-pipeline '" + resolve + "'");
            LocalPipelineEngine localPipelineEngine = new LocalPipelineEngine(pipelineMeta, this, getPipeline());
            localPipelineEngine.prepareExecution();
            localPipelineEngine.setLogLevel(getPipeline().getLogLevel());
            localPipelineEngine.setPreviousResult(new Result());
            TransformWithMappingMeta.replaceVariableValues(localPipelineEngine, this);
            TransformWithMappingMeta.addMissingVariables(localPipelineEngine, this);
            localPipelineEngine.activateParameters(localPipelineEngine);
            logDetailed("Initialized sub-pipeline '" + resolve + "'");
            for (TransformMeta transformMeta : pipelineMeta.getTransforms()) {
                if (transformMeta.getTransform() instanceof InjectorMeta) {
                    if (((KafkaConsumerInputData) this.data).rowProducer != null) {
                        throw new HopException("You can only have one copy of the injector transform '" + transformMeta.getName() + "' to accept the Kafka messages");
                    }
                    ((KafkaConsumerInputData) this.data).rowProducer = localPipelineEngine.addRowProducer(transformMeta.getName(), 0);
                }
            }
            if (((KafkaConsumerInputData) this.data).rowProducer == null) {
                throw new HopException("Unable to find an Injector transform in the Kafka pipeline. Such a transform is needed to accept data from this Kafka Consumer transform.");
            }
            if (StringUtils.isNotEmpty(this.meta.getSubTransform())) {
                ITransform findRunThread = localPipelineEngine.findRunThread(this.meta.getSubTransform());
                if (findRunThread == null) {
                    throw new HopException("Unable to find transform '" + this.meta.getSubTransform() + "' to retrieve rows from");
                }
                findRunThread.addRowListener(new RowAdapter() { // from class: org.apache.hop.pipeline.transforms.kafka.consumer.KafkaConsumerInput.1
                    public void rowWrittenEvent(IRowMeta iRowMeta, Object[] objArr) throws HopTransformException {
                        KafkaConsumerInput.this.putRow(iRowMeta, objArr);
                    }
                });
            }
            localPipelineEngine.setLogChannel(getLogChannel());
            localPipelineEngine.startThreads();
            if (errorHandlingConditionIsSatisfied()) {
                ((KafkaConsumerInputData) this.data).executor = new SingleThreadedPipelineExecutor(localPipelineEngine, true);
            } else {
                ((KafkaConsumerInputData) this.data).executor = new SingleThreadedPipelineExecutor(localPipelineEngine);
            }
            if (!((KafkaConsumerInputData) this.data).executor.init()) {
                throw new HopException("Initialization of sub-pipeline failed");
            }
            getPipeline().addActiveSubPipeline(getTransformName(), localPipelineEngine);
        } catch (Exception e) {
            throw new HopException("Unable to load and initialize sub pipeline", e);
        }
    }

    public void dispose() {
        if (((KafkaConsumerInputData) this.data).consumer != null) {
            ((KafkaConsumerInputData) this.data).consumer.unsubscribe();
            ((KafkaConsumerInputData) this.data).consumer.close();
        }
        super.dispose();
    }

    public static Consumer buildKafkaConsumer(IVariables iVariables, KafkaConsumerInputMeta kafkaConsumerInputMeta) {
        Thread.currentThread().setContextClassLoader(kafkaConsumerInputMeta.getClass().getClassLoader());
        Properties properties = new Properties();
        for (String str : kafkaConsumerInputMeta.getConfig().keySet()) {
            String resolve = iVariables.resolve(kafkaConsumerInputMeta.getConfig().get(str));
            if (StringUtils.isNotEmpty(resolve)) {
                properties.put(str, iVariables.resolve(resolve));
            }
        }
        properties.put("group.id", iVariables.resolve(Const.NVL(kafkaConsumerInputMeta.getConsumerGroup(), "Apache Hop")));
        properties.put("bootstrap.servers", iVariables.resolve(kafkaConsumerInputMeta.getDirectBootstrapServers()));
        properties.put("enable.auto.commit", Boolean.valueOf(kafkaConsumerInputMeta.isAutoCommit()));
        int i = Const.toInt(iVariables.resolve(kafkaConsumerInputMeta.getBatchDuration()), 0);
        if (i > 0) {
            properties.put("fetch.max.wait.ms", Integer.valueOf(i));
        }
        int i2 = Const.toInt(iVariables.resolve(kafkaConsumerInputMeta.getBatchSize()), 0);
        if (i2 > 0) {
            properties.put("max.poll.records", Integer.valueOf(i2));
        }
        properties.put("key.deserializer", kafkaConsumerInputMeta.getKeyField().getOutputType().getKafkaDeserializerClass());
        properties.put("value.deserializer", kafkaConsumerInputMeta.getMessageField().getOutputType().getKafkaDeserializerClass());
        return new KafkaConsumer(properties);
    }

    public void stopRunning() throws HopException {
        ((KafkaConsumerInputData) this.data).isKafkaConsumerClosing = true;
        ((KafkaConsumerInputData) this.data).consumer.wakeup();
        super.stopRunning();
    }

    public boolean processRow() throws HopException {
        try {
            ConsumerRecords poll = ((KafkaConsumerInputData) this.data).consumer.poll(Duration.ofMillis(((KafkaConsumerInputData) this.data).batchDuration > 0 ? ((KafkaConsumerInputData) this.data).batchDuration : Long.MAX_VALUE));
            if (!((KafkaConsumerInputData) this.data).isKafkaConsumerClosing && !poll.isEmpty()) {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    Object[] processMessageAsRow = processMessageAsRow((ConsumerRecord) it.next());
                    ((KafkaConsumerInputData) this.data).rowProducer.putRow(((KafkaConsumerInputData) this.data).outputRowMeta, processMessageAsRow);
                    if (errorHandlingConditionIsSatisfied()) {
                        ((KafkaConsumerInputData) this.data).incomingRowsBuffer.add(processMessageAsRow);
                    }
                    incrementLinesInput();
                }
                logBasic("Number of rows read: " + ((KafkaConsumerInputData) this.data).rowProducer.getRowSet().size());
                ((KafkaConsumerInputData) this.data).executor.oneIteration();
                if (((KafkaConsumerInputData) this.data).executor.isStopped() || ((KafkaConsumerInputData) this.data).executor.getErrors() > 0) {
                    logDebug("Executor's reported errors #: " + ((KafkaConsumerInputData) this.data).executor.getErrors());
                    if (((KafkaConsumerInputData) this.data).executor.getErrors() <= 0 || !errorHandlingConditionIsSatisfied()) {
                        ((KafkaConsumerInputData) this.data).executor.getPipeline().stopAll();
                        setOutputDone();
                        stopAll();
                        return false;
                    }
                    for (int i = 0; i < ((KafkaConsumerInputData) this.data).incomingRowsBuffer.size(); i++) {
                        putError(((KafkaConsumerInputData) this.data).outputRowMeta, ((KafkaConsumerInputData) this.data).incomingRowsBuffer.get(i), 1L, "An error occurred while processing the subpipeline", null, "KAFKA001");
                    }
                }
                ((KafkaConsumerInputData) this.data).consumer.commitAsync();
                ((KafkaConsumerInputData) this.data).executor.buildExecutionSummary();
                if (errorHandlingConditionIsSatisfied()) {
                    ((KafkaConsumerInputData) this.data).incomingRowsBuffer.clear();
                }
            }
        } catch (WakeupException e) {
            ((KafkaConsumerInputData) this.data).executor.getPipeline().stopAll();
            setOutputDone();
            stopAll();
        }
        if (((KafkaConsumerInputData) this.data).executor.getErrors() <= 0 || !errorHandlingConditionIsSatisfied()) {
            return true;
        }
        try {
            ((KafkaConsumerInputData) this.data).executor.getPipeline().stopAll();
            ((KafkaConsumerInputData) this.data).executor.dispose();
            ((KafkaConsumerInputData) this.data).rowProducer = null;
            initSubPipeline();
            return true;
        } catch (Exception e2) {
            logError("Error initializing sub-transformation", e2);
            return false;
        }
    }

    private boolean errorHandlingConditionIsSatisfied() {
        return getTransformMeta().isDoingErrorHandling() && ((KafkaConsumerInputData) this.data).batchSize == 1;
    }

    public Object[] processMessageAsRow(ConsumerRecord<Object, Object> consumerRecord) {
        Object[] allocateRowData = RowDataUtil.allocateRowData(((KafkaConsumerInputData) this.data).outputRowMeta.size());
        int i = 0 + 1;
        allocateRowData[0] = consumerRecord.key();
        int i2 = i + 1;
        allocateRowData[i] = consumerRecord.value();
        int i3 = i2 + 1;
        allocateRowData[i2] = consumerRecord.topic();
        int i4 = i3 + 1;
        allocateRowData[i3] = Long.valueOf(consumerRecord.partition());
        allocateRowData[i4] = Long.valueOf(consumerRecord.offset());
        allocateRowData[i4 + 1] = Long.valueOf(consumerRecord.timestamp());
        return allocateRowData;
    }
}
