package org.apache.hop.beam.core.transform;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hop.beam.core.BeamHop;
import org.apache.hop.beam.core.HopRow;
import org.apache.hop.beam.transforms.kafka.ConfigOption;
import org.apache.hop.core.Const;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.JsonRowMeta;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hop/beam/core/transform/BeamKafkaInputTransform.class */
public class BeamKafkaInputTransform extends PTransform<PBegin, PCollection<HopRow>> {
    private String transformName;
    private String bootstrapServers;
    private String topics;
    private String groupId;
    private boolean usingProcessingTime;
    private boolean usingLogAppendTime;
    private boolean usingCreateTime;
    private boolean restrictedToCommitted;
    private boolean allowingCommitOnConsumedOffset;
    private List<ConfigOption> configOptions;
    private String messageType;
    private String schemaRegistryUrl;
    private String schemaRegistrySubject;
    private String rowMetaJson;
    private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaInputTransform.class);
    private static final Counter numErrors = Metrics.counter("main", "BeamKafkaInputError");
    private transient IRowMeta rowMeta;

    /* loaded from: input_file:org/apache/hop/beam/core/transform/BeamKafkaInputTransform$KVStringGenericRecordToHopRowFn.class */
    public static final class KVStringGenericRecordToHopRowFn extends DoFn<KV<String, GenericRecord>, HopRow> {
        private final String rowMetaJson;
        private final String transformName;
        private final Logger LOG = LoggerFactory.getLogger(KVStringGenericRecordToHopRowFn.class);
        private final Counter numErrors = Metrics.counter("main", "BeamSubscribeTransformErrors");
        private IRowMeta rowMeta;
        private transient Counter inputCounter;
        private transient Counter writtenCounter;

        public KVStringGenericRecordToHopRowFn(String str, String str2) {
            this.transformName = str;
            this.rowMetaJson = str2;
        }

        @DoFn.Setup
        public void setUp() {
            try {
                this.inputCounter = Metrics.counter("input", this.transformName);
                this.writtenCounter = Metrics.counter("written", this.transformName);
                BeamHop.init();
                this.rowMeta = JsonRowMeta.fromJson(this.rowMetaJson);
                Metrics.counter("init", this.transformName).inc();
            } catch (Exception e) {
                this.numErrors.inc();
                this.LOG.error("Error in setup of KV<String,GenericRecord> to Hop Row conversion function", e);
                throw new RuntimeException("Error in setup of KV<String,GenericRecord> to Hop Row conversion function", e);
            }
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, GenericRecord>, HopRow>.ProcessContext processContext) {
            try {
                KV kv = (KV) processContext.element();
                this.inputCounter.inc();
                Object[] objArr = new Object[this.rowMeta.size()];
                objArr[0] = kv.getKey();
                objArr[1] = kv.getValue();
                processContext.output(new HopRow(objArr));
                this.writtenCounter.inc();
            } catch (Exception e) {
                this.numErrors.inc();
                this.LOG.error("Error in KV<String,GenericRecord> to Hop Row conversion function", e);
                throw new RuntimeException("Error in KV<String,GenericRecord> to Hop Row conversion function", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/hop/beam/core/transform/BeamKafkaInputTransform$KVStringStringToHopRowFn.class */
    public static final class KVStringStringToHopRowFn extends DoFn<KV<String, String>, HopRow> {
        private final String rowMetaJson;
        private final String transformName;
        private final Logger LOG = LoggerFactory.getLogger(KVStringStringToHopRowFn.class);
        private final Counter numErrors = Metrics.counter("main", "BeamSubscribeTransformErrors");
        private IRowMeta rowMeta;
        private transient Counter inputCounter;
        private transient Counter writtenCounter;

        public KVStringStringToHopRowFn(String str, String str2) {
            this.transformName = str;
            this.rowMetaJson = str2;
        }

        @DoFn.Setup
        public void setUp() {
            try {
                this.inputCounter = Metrics.counter("input", this.transformName);
                this.writtenCounter = Metrics.counter("written", this.transformName);
                BeamHop.init();
                this.rowMeta = JsonRowMeta.fromJson(this.rowMetaJson);
                Metrics.counter("init", this.transformName).inc();
            } catch (Exception e) {
                this.numErrors.inc();
                this.LOG.error("Error in setup of KV<String,String> to Hop Row conversion function", e);
                throw new RuntimeException("Error in setup of KV<String,String> to Hop Row conversion function", e);
            }
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, String>, HopRow>.ProcessContext processContext) {
            try {
                KV kv = (KV) processContext.element();
                this.inputCounter.inc();
                Object[] objArr = new Object[this.rowMeta.size()];
                objArr[0] = kv.getKey();
                objArr[1] = kv.getValue();
                processContext.output(new HopRow(objArr));
                this.writtenCounter.inc();
            } catch (Exception e) {
                this.numErrors.inc();
                this.LOG.error("Error in KV<String,String> to Hop Row conversion function", e);
                throw new RuntimeException("Error in KV<String,String> to Hop Row conversion function", e);
            }
        }
    }

    public BeamKafkaInputTransform() {
    }

    public BeamKafkaInputTransform(@Nullable String str, String str2, String str3, String str4, String str5, boolean z, boolean z2, boolean z3, boolean z4, boolean z5, String[] strArr, String[] strArr2, String[] strArr3, String str6, String str7, String str8, String str9) {
        super(str);
        this.transformName = str2;
        this.bootstrapServers = str3;
        this.topics = str4;
        this.groupId = str5;
        this.usingProcessingTime = z;
        this.usingLogAppendTime = z2;
        this.usingCreateTime = z3;
        this.restrictedToCommitted = z4;
        this.allowingCommitOnConsumedOffset = z5;
        this.configOptions = new ArrayList();
        for (int i = 0; i < strArr.length; i++) {
            this.configOptions.add(new ConfigOption(strArr[i], strArr2[i], ConfigOption.Type.getTypeFromName(strArr3[i])));
        }
        this.messageType = str6;
        this.schemaRegistryUrl = str7;
        this.schemaRegistrySubject = str8;
        this.rowMetaJson = str9;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v102, types: [java.lang.Boolean] */
    /* JADX WARN: Type inference failed for: r0v104, types: [java.lang.Double] */
    /* JADX WARN: Type inference failed for: r0v106, types: [java.lang.Long] */
    /* JADX WARN: Type inference failed for: r0v108, types: [java.lang.Integer] */
    /* JADX WARN: Type inference failed for: r0v110, types: [java.lang.Short] */
    public PCollection<HopRow> expand(PBegin pBegin) {
        String valueOf;
        try {
            BeamHop.init();
            ArrayList arrayList = new ArrayList();
            for (String str : this.topics.split(",")) {
                arrayList.add(Const.trim(str));
            }
            HashMap hashMap = new HashMap();
            hashMap.put("group.id", this.groupId);
            for (ConfigOption configOption : this.configOptions) {
                String value = configOption.getValue();
                switch (configOption.getType()) {
                    case String:
                        valueOf = value;
                        break;
                    case Short:
                        valueOf = Short.valueOf(value);
                        break;
                    case Int:
                        valueOf = Integer.valueOf(value);
                        break;
                    case Long:
                        valueOf = Long.valueOf(value);
                        break;
                    case Double:
                        valueOf = Double.valueOf(value);
                        break;
                    case Boolean:
                        valueOf = Boolean.valueOf(value);
                        break;
                    default:
                        throw new RuntimeException("Config option parameter " + configOption.getParameter() + " uses unsupported type " + configOption.getType().name());
                }
                hashMap.put(configOption.getParameter(), valueOf);
            }
            if ("Avro Record".equalsIgnoreCase(this.messageType)) {
                KafkaIO.Read withValueDeserializer = KafkaIO.read().withBootstrapServers(this.bootstrapServers).withConsumerConfigUpdates(hashMap).withTopics(arrayList).withKeyDeserializer(StringDeserializer.class).withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(this.schemaRegistryUrl, this.schemaRegistrySubject, (Integer) null, hashMap));
                if (this.usingProcessingTime) {
                    withValueDeserializer = withValueDeserializer.withProcessingTime();
                }
                if (this.usingLogAppendTime) {
                    withValueDeserializer = withValueDeserializer.withLogAppendTime();
                }
                if (this.usingCreateTime) {
                    withValueDeserializer = withValueDeserializer.withCreateTime(Duration.ZERO);
                }
                if (this.restrictedToCommitted) {
                    withValueDeserializer = withValueDeserializer.withReadCommitted();
                }
                if (this.allowingCommitOnConsumedOffset) {
                    withValueDeserializer = withValueDeserializer.commitOffsetsInFinalize();
                }
                return pBegin.apply(withValueDeserializer.withoutMetadata()).apply(ParDo.of(new KVStringGenericRecordToHopRowFn(this.transformName, this.rowMetaJson)));
            }
            if (!"String".equalsIgnoreCase(this.messageType)) {
                throw new HopException("Only parsing String or Avro Record messages is supported at this time");
            }
            KafkaIO.Read withValueDeserializer2 = KafkaIO.read().withBootstrapServers(this.bootstrapServers).withConsumerConfigUpdates(hashMap).withTopics(arrayList).withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class);
            if (this.usingProcessingTime) {
                withValueDeserializer2 = withValueDeserializer2.withProcessingTime();
            }
            if (this.usingLogAppendTime) {
                withValueDeserializer2 = withValueDeserializer2.withLogAppendTime();
            }
            if (this.usingCreateTime) {
                withValueDeserializer2 = withValueDeserializer2.withCreateTime(Duration.ZERO);
            }
            if (this.restrictedToCommitted) {
                withValueDeserializer2 = withValueDeserializer2.withReadCommitted();
            }
            if (this.allowingCommitOnConsumedOffset) {
                withValueDeserializer2 = withValueDeserializer2.commitOffsetsInFinalize();
            }
            return pBegin.apply(withValueDeserializer2.withoutMetadata()).apply(ParDo.of(new KVStringStringToHopRowFn(this.transformName, this.rowMetaJson)));
        } catch (Exception e) {
            numErrors.inc();
            LOG.error("Error in Kafka input transform", e);
            throw new RuntimeException("Error in Kafka input transform", e);
        }
    }
}
