package org.apache.hop.beam.core.transform;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.hop.beam.core.BeamHop;
import org.apache.hop.beam.core.HopRow;
import org.apache.hop.beam.core.fn.HopRowToKVStringStringFn;
import org.apache.hop.beam.transforms.kafka.ConfigOption;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.IValueMeta;
import org.apache.hop.core.row.JsonRowMeta;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hop/beam/core/transform/BeamKafkaOutputTransform.class */
public class BeamKafkaOutputTransform extends PTransform<PCollection<HopRow>, PDone> {
    private String transformName;
    private String bootstrapServers;
    private String topic;
    private String keyField;
    private String messageField;
    private List<ConfigOption> configOptions;
    private String rowMetaJson;
    private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaOutputTransform.class);
    private static final Counter numErrors = Metrics.counter("main", "BeamKafkaOutputError");

    /* loaded from: input_file:org/apache/hop/beam/core/transform/BeamKafkaOutputTransform$GenericRecordCoder.class */
    private static final class GenericRecordCoder extends AtomicCoder<GenericRecord> {
        private GenericRecordCoder() {
        }

        public static GenericRecordCoder of() {
            return new GenericRecordCoder();
        }

        public void encode(GenericRecord genericRecord, OutputStream outputStream) throws IOException {
            StringUtf8Coder.of().encode(genericRecord.getSchema().toString(), outputStream);
            AvroCoder.of(genericRecord.getSchema()).encode(genericRecord, outputStream);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public GenericRecord m25decode(InputStream inputStream) throws IOException {
            return (GenericRecord) AvroCoder.of(new Schema.Parser().parse(StringUtf8Coder.of().decode(inputStream))).decode(inputStream);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hop/beam/core/transform/BeamKafkaOutputTransform$HopRowToKVStringGenericRecordFn.class */
    public static final class HopRowToKVStringGenericRecordFn extends DoFn<HopRow, KV<String, GenericRecord>> {
        private String rowMetaJson;
        private String transformName;
        private int keyIndex;
        private int valueIndex;
        private static final Logger LOG = LoggerFactory.getLogger(HopRowToKVStringGenericRecordFn.class);
        private final Counter numErrors = Metrics.counter("main", "BeamKafkaProducerTransformErrors");
        private IRowMeta rowMeta;
        private transient Counter inputCounter;
        private transient Counter writtenCounter;

        public HopRowToKVStringGenericRecordFn(String str, int i, int i2, String str2) {
            this.transformName = str;
            this.keyIndex = i;
            this.valueIndex = i2;
            this.rowMetaJson = str2;
        }

        @DoFn.Setup
        public void setUp() {
            try {
                this.inputCounter = Metrics.counter("input", this.transformName);
                this.writtenCounter = Metrics.counter("written", this.transformName);
                BeamHop.init();
                this.rowMeta = JsonRowMeta.fromJson(this.rowMetaJson);
                Metrics.counter("init", this.transformName).inc();
            } catch (Exception e) {
                this.numErrors.inc();
                LOG.error("Error in setup of HopRow to KV<String,GenericRecord> function", e);
                throw new RuntimeException("Error in setup of HopRow to KV<String,GenericRecord> function", e);
            }
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<HopRow, KV<String, GenericRecord>>.ProcessContext processContext) {
            try {
                HopRow hopRow = (HopRow) processContext.element();
                this.inputCounter.inc();
                processContext.output(KV.of(this.rowMeta.getString(hopRow.getRow(), this.keyIndex), (GenericRecord) hopRow.getRow()[this.valueIndex]));
                this.writtenCounter.inc();
            } catch (Exception e) {
                this.numErrors.inc();
                LOG.error("Error in HopRow to KV<String,GenericRecord> function", e);
                throw new RuntimeException("Error in HopRow to KV<String,GenericRecord> function", e);
            }
        }
    }

    public BeamKafkaOutputTransform() {
    }

    public BeamKafkaOutputTransform(String str, String str2, String str3, String str4, String str5, String[] strArr, String[] strArr2, String[] strArr3, String str6) {
        super(str);
        this.transformName = str;
        this.bootstrapServers = str2;
        this.topic = str3;
        this.keyField = str4;
        this.messageField = str5;
        this.rowMetaJson = str6;
        this.configOptions = new ArrayList();
        for (int i = 0; i < strArr.length; i++) {
            this.configOptions.add(new ConfigOption(strArr[i], strArr2[i], ConfigOption.Type.getTypeFromName(strArr3[i])));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v60, types: [java.lang.Boolean] */
    /* JADX WARN: Type inference failed for: r0v62, types: [java.lang.Double] */
    /* JADX WARN: Type inference failed for: r0v64, types: [java.lang.Long] */
    /* JADX WARN: Type inference failed for: r0v66, types: [java.lang.Integer] */
    /* JADX WARN: Type inference failed for: r0v68, types: [java.lang.Short] */
    public PDone expand(PCollection<HopRow> pCollection) {
        String valueOf;
        try {
            BeamHop.init();
            IRowMeta fromJson = JsonRowMeta.fromJson(this.rowMetaJson);
            int indexOfValue = fromJson.indexOfValue(this.keyField);
            if (indexOfValue < 0) {
                throw new HopException("Unable to find key field " + this.keyField + " in input row: " + fromJson.toString());
            }
            int indexOfValue2 = fromJson.indexOfValue(this.messageField);
            if (indexOfValue2 < 0) {
                throw new HopException("Unable to find message field " + this.messageField + " in input row: " + fromJson.toString());
            }
            HashMap hashMap = new HashMap();
            for (ConfigOption configOption : this.configOptions) {
                String value = configOption.getValue();
                switch (configOption.getType()) {
                    case String:
                        valueOf = value;
                        break;
                    case Short:
                        valueOf = Short.valueOf(value);
                        break;
                    case Int:
                        valueOf = Integer.valueOf(value);
                        break;
                    case Long:
                        valueOf = Long.valueOf(value);
                        break;
                    case Double:
                        valueOf = Double.valueOf(value);
                        break;
                    case Boolean:
                        valueOf = Boolean.valueOf(value);
                        break;
                    default:
                        throw new RuntimeException("Config option parameter " + configOption.getParameter() + " uses unsupported type " + configOption.getType().name());
                }
                hashMap.put(configOption.getParameter(), valueOf);
            }
            IValueMeta valueMeta = fromJson.getValueMeta(indexOfValue2);
            if (valueMeta.getType() == 2) {
                return pCollection.apply(ParDo.of(new HopRowToKVStringStringFn(this.transformName, indexOfValue, indexOfValue2, this.rowMetaJson))).apply(KafkaIO.write().withBootstrapServers(this.bootstrapServers).withTopic(this.topic).withKeySerializer(StringSerializer.class).withValueSerializer(StringSerializer.class).withProducerConfigUpdates(hashMap));
            }
            if (valueMeta.getType() != 20) {
                throw new HopException("Hop only supports sending String or Avro Record values as Kafka messages");
            }
            return pCollection.apply(ParDo.of(new HopRowToKVStringGenericRecordFn(this.transformName, indexOfValue, indexOfValue2, this.rowMetaJson))).apply(KafkaIO.write().withBootstrapServers(this.bootstrapServers).withTopic(this.topic).withKeySerializer(StringSerializer.class).withValueSerializer(KafkaAvroSerializer.class).withProducerConfigUpdates(hashMap));
        } catch (Exception e) {
            numErrors.inc();
            LOG.error("Error in Beam Kafka output transform", e);
            throw new RuntimeException("Error in Beam Kafka output transform", e);
        }
    }
}
