package org.apache.hop.beam.transforms.kinesis;

import com.amazonaws.regions.Regions;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.beam.sdk.io.kinesis.KinesisIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.hop.beam.core.BeamHop;
import org.apache.hop.beam.core.HopRow;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.IValueMeta;
import org.apache.hop.core.row.JsonRowMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceTransform.class */
public class BeamKinesisProduceTransform extends PTransform<PCollection<HopRow>, PDone> {
    private String transformName;
    private String rowMetaJson;
    private String accessKey;
    private String secretKey;
    private Regions regions;
    private String streamName;
    private String dataField;
    private String dataType;
    private String partitionKey;
    private List<KinesisConfigOption> configOptions;
    private static final Logger LOG = LoggerFactory.getLogger(BeamKinesisProduceTransform.class);
    private static final Counter numErrors = Metrics.counter("main", "BeamKafkaOutputError");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceTransform$HopRowToMessage.class */
    public static class HopRowToMessage extends DoFn<HopRow, byte[]> {
        private final int messageIndex;
        private final String transformName;
        private final String rowMetaJson;
        private transient IValueMeta valueMeta;
        private transient Counter outputCounter;
        private transient Counter readCounter;
        static final /* synthetic */ boolean $assertionsDisabled;

        public HopRowToMessage(String str, String str2, int i) {
            this.transformName = str;
            this.rowMetaJson = str2;
            this.messageIndex = i;
        }

        @DoFn.Setup
        public void setUp() {
            try {
                this.outputCounter = Metrics.counter("output", this.transformName);
                this.readCounter = Metrics.counter("read", this.transformName);
                BeamHop.init();
                this.valueMeta = JsonRowMeta.fromJson(this.rowMetaJson).getValueMeta(this.messageIndex);
                Metrics.counter("init", this.transformName).inc();
            } catch (Exception e) {
                BeamKinesisProduceTransform.LOG.error("Error in setup of HopRow to kinesis message conversion function", e);
                throw new RuntimeException("Error in setup of HopRow to kinesis message conversion function", e);
            }
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<HopRow, byte[]>.ProcessContext processContext) {
            HopRow hopRow = (HopRow) processContext.element();
            this.readCounter.inc();
            if (!$assertionsDisabled && hopRow == null) {
                throw new AssertionError();
            }
            try {
                processContext.output(this.valueMeta.getBinary(hopRow.getRow()[this.messageIndex]));
                this.outputCounter.inc();
            } catch (Exception e) {
                throw new RuntimeException("Error converting message to a binary form, value nr " + this.messageIndex + " (" + this.valueMeta.getName() + ") in the input row", e);
            }
        }

        static {
            $assertionsDisabled = !BeamKinesisProduceTransform.class.desiredAssertionStatus();
        }
    }

    public BeamKinesisProduceTransform() {
        this.configOptions = new ArrayList();
    }

    public BeamKinesisProduceTransform(String str, String str2, String str3, String str4, Regions regions, String str5, String str6, String str7, String str8, String[] strArr, String[] strArr2) {
        super(str);
        this.transformName = str;
        this.rowMetaJson = str2;
        this.accessKey = str3;
        this.secretKey = str4;
        this.regions = regions;
        this.streamName = str5;
        this.dataField = str6;
        this.dataType = str7;
        this.partitionKey = str8;
        this.configOptions = new ArrayList();
        for (int i = 0; i < strArr.length; i++) {
            this.configOptions.add(new KinesisConfigOption(strArr[i], strArr2[i]));
        }
    }

    public PDone expand(PCollection<HopRow> pCollection) {
        try {
            BeamHop.init();
            IRowMeta fromJson = JsonRowMeta.fromJson(this.rowMetaJson);
            int indexOfValue = fromJson.indexOfValue(this.dataField);
            if (indexOfValue < 0) {
                throw new HopException("Unable to find message field " + this.dataField + " in input row: " + fromJson.toString());
            }
            if (!"String".equals(this.dataType)) {
                throw new HopException("For now, only Strings are supported as Kinesis data messages");
            }
            Properties properties = new Properties();
            for (KinesisConfigOption kinesisConfigOption : this.configOptions) {
                properties.put(kinesisConfigOption.getParameter(), kinesisConfigOption.getValue());
            }
            return pCollection.apply(ParDo.of(new HopRowToMessage(this.transformName, this.rowMetaJson, indexOfValue))).apply(KinesisIO.write().withAWSClientsProvider(this.accessKey, this.secretKey, this.regions).withStreamName(this.streamName).withPartitionKey(this.partitionKey).withProducerProperties(properties));
        } catch (Exception e) {
            numErrors.inc();
            LOG.error("Error in Beam Kinesis Produce transform", e);
            throw new RuntimeException("Error in Beam Kinesis Produce transform", e);
        }
    }
}
