package org.apache.hop.beam.transforms.kinesis;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.kinesis.KinesisIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang.StringUtils;
import org.apache.hop.beam.core.BeamHop;
import org.apache.hop.beam.core.HopRow;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.JsonRowMeta;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeTransform.class */
public class BeamKinesisConsumeTransform extends PTransform<PBegin, PCollection<HopRow>> {
    private String transformName;
    private String rowMetaJson;
    private String accessKey;
    private String secretKey;
    private Regions regions;
    private String streamName;
    private String uniqueIdField;
    private String dataField;
    private String dataType;
    private String partitionKeyField;
    private String sequenceNumberField;
    private String subSequenceNumberField;
    private String shardIdField;
    private String streamNameField;
    private String maxNumRecords;
    private String maxReadTimeMs;
    private String upToDateThresholdMs;
    private String requestRecordsLimit;
    private boolean arrivalTimeWatermarkPolicy;
    private String arrivalTimeWatermarkPolicyMs;
    private boolean processingTimeWatermarkPolicy;
    private boolean fixedDelayRatePolicy;
    private String fixedDelayRatePolicyMs;
    private String maxCapacityPerShard;
    private static final Logger LOG = LoggerFactory.getLogger(BeamKinesisConsumeTransform.class);
    private static final Counter numErrors = Metrics.counter("main", "BeamKinesisConsumerError");

    /* loaded from: input_file:org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeTransform$KVStringGenericRecordToHopRowFn.class */
    public static final class KVStringGenericRecordToHopRowFn extends DoFn<KV<String, GenericRecord>, HopRow> {
        private final String rowMetaJson;
        private final String transformName;
        private final Logger LOG = LoggerFactory.getLogger(KVStringGenericRecordToHopRowFn.class);
        private final Counter numErrors = Metrics.counter("main", "BeamSubscribeTransformErrors");
        private IRowMeta rowMeta;
        private transient Counter inputCounter;
        private transient Counter writtenCounter;

        public KVStringGenericRecordToHopRowFn(String str, String str2) {
            this.transformName = str;
            this.rowMetaJson = str2;
        }

        @DoFn.Setup
        public void setUp() {
            try {
                this.inputCounter = Metrics.counter("input", this.transformName);
                this.writtenCounter = Metrics.counter("written", this.transformName);
                BeamHop.init();
                this.rowMeta = JsonRowMeta.fromJson(this.rowMetaJson);
                Metrics.counter("init", this.transformName).inc();
            } catch (Exception e) {
                this.numErrors.inc();
                this.LOG.error("Error in setup of KV<String,GenericRecord> to Hop Row conversion function", e);
                throw new RuntimeException("Error in setup of KV<String,GenericRecord> to Hop Row conversion function", e);
            }
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, GenericRecord>, HopRow>.ProcessContext processContext) {
            try {
                KV kv = (KV) processContext.element();
                this.inputCounter.inc();
                Object[] objArr = new Object[this.rowMeta.size()];
                objArr[0] = kv.getKey();
                objArr[1] = kv.getValue();
                processContext.output(new HopRow(objArr));
                this.writtenCounter.inc();
            } catch (Exception e) {
                this.numErrors.inc();
                this.LOG.error("Error in KV<String,GenericRecord> to Hop Row conversion function", e);
                throw new RuntimeException("Error in KV<String,GenericRecord> to Hop Row conversion function", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeTransform$KVStringStringToHopRowFn.class */
    public static final class KVStringStringToHopRowFn extends DoFn<KV<String, String>, HopRow> {
        private final String rowMetaJson;
        private final String transformName;
        private final Logger LOG = LoggerFactory.getLogger(KVStringStringToHopRowFn.class);
        private final Counter numErrors = Metrics.counter("main", "BeamSubscribeTransformErrors");
        private IRowMeta rowMeta;
        private transient Counter inputCounter;
        private transient Counter writtenCounter;

        public KVStringStringToHopRowFn(String str, String str2) {
            this.transformName = str;
            this.rowMetaJson = str2;
        }

        @DoFn.Setup
        public void setUp() {
            try {
                this.inputCounter = Metrics.counter("input", this.transformName);
                this.writtenCounter = Metrics.counter("written", this.transformName);
                BeamHop.init();
                this.rowMeta = JsonRowMeta.fromJson(this.rowMetaJson);
                Metrics.counter("init", this.transformName).inc();
            } catch (Exception e) {
                this.numErrors.inc();
                this.LOG.error("Error in setup of KV<String,String> to Hop Row conversion function", e);
                throw new RuntimeException("Error in setup of KV<String,String> to Hop Row conversion function", e);
            }
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, String>, HopRow>.ProcessContext processContext) {
            try {
                KV kv = (KV) processContext.element();
                this.inputCounter.inc();
                Object[] objArr = new Object[this.rowMeta.size()];
                objArr[0] = kv.getKey();
                objArr[1] = kv.getValue();
                processContext.output(new HopRow(objArr));
                this.writtenCounter.inc();
            } catch (Exception e) {
                this.numErrors.inc();
                this.LOG.error("Error in KV<String,String> to Hop Row conversion function", e);
                throw new RuntimeException("Error in KV<String,String> to Hop Row conversion function", e);
            }
        }
    }

    public BeamKinesisConsumeTransform() {
    }

    public BeamKinesisConsumeTransform(String str, String str2, String str3, Regions regions, String str4, String str5, String str6, String str7, String str8, String str9, String str10, String str11, String str12, String str13, String str14, String str15, String str16, String str17, boolean z, String str18, boolean z2, boolean z3, String str19, String str20) {
        super(str);
        this.transformName = str;
        this.rowMetaJson = str4;
        this.streamName = str5;
        this.accessKey = str2;
        this.secretKey = str3;
        this.regions = regions;
        this.uniqueIdField = str6;
        this.dataField = str7;
        this.dataType = str8;
        this.partitionKeyField = str9;
        this.sequenceNumberField = str10;
        this.subSequenceNumberField = str11;
        this.shardIdField = str12;
        this.streamNameField = str13;
        this.maxNumRecords = str14;
        this.maxReadTimeMs = str15;
        this.upToDateThresholdMs = str16;
        this.requestRecordsLimit = str17;
        this.arrivalTimeWatermarkPolicy = z;
        this.arrivalTimeWatermarkPolicyMs = str18;
        this.processingTimeWatermarkPolicy = z2;
        this.fixedDelayRatePolicy = z3;
        this.fixedDelayRatePolicyMs = str19;
        this.maxCapacityPerShard = str20;
        if (!"String".equalsIgnoreCase(str8)) {
            throw new RuntimeException("Only String messages are supported at this time");
        }
    }

    public PCollection<HopRow> expand(PBegin pBegin) {
        try {
            BeamHop.init();
            KinesisIO.Read withInitialPositionInStream = KinesisIO.read().withAWSClientsProvider(this.accessKey, this.secretKey, this.regions).withStreamName(this.streamName).withInitialPositionInStream(InitialPositionInStream.LATEST);
            if (StringUtils.isNotEmpty(this.maxNumRecords)) {
                withInitialPositionInStream = withInitialPositionInStream.withMaxNumRecords(Long.parseLong(this.maxNumRecords));
            }
            if (StringUtils.isNotEmpty(this.maxReadTimeMs)) {
                withInitialPositionInStream = withInitialPositionInStream.withMaxReadTime(Duration.millis(Long.parseLong(this.maxReadTimeMs)));
            }
            if (StringUtils.isNotEmpty(this.upToDateThresholdMs)) {
                withInitialPositionInStream = withInitialPositionInStream.withUpToDateThreshold(Duration.millis(Long.parseLong(this.upToDateThresholdMs)));
            }
            if (StringUtils.isNotEmpty(this.requestRecordsLimit)) {
                withInitialPositionInStream = withInitialPositionInStream.withRequestRecordsLimit(Integer.parseInt(this.requestRecordsLimit));
            }
            if (this.arrivalTimeWatermarkPolicy) {
                withInitialPositionInStream = withInitialPositionInStream.withArrivalTimeWatermarkPolicy();
                if (StringUtils.isNotEmpty(this.arrivalTimeWatermarkPolicyMs)) {
                    withInitialPositionInStream = withInitialPositionInStream.withArrivalTimeWatermarkPolicy(Duration.millis(Long.parseLong(this.arrivalTimeWatermarkPolicyMs)));
                }
            }
            if (this.processingTimeWatermarkPolicy) {
                withInitialPositionInStream = withInitialPositionInStream.withProcessingTimeWatermarkPolicy();
            }
            if (this.fixedDelayRatePolicy) {
                withInitialPositionInStream = withInitialPositionInStream.withFixedDelayRateLimitPolicy();
                if (StringUtils.isNotEmpty(this.fixedDelayRatePolicyMs)) {
                    withInitialPositionInStream = withInitialPositionInStream.withFixedDelayRateLimitPolicy(Duration.millis(Long.parseLong(this.fixedDelayRatePolicyMs)));
                }
            }
            if (StringUtils.isNotEmpty(this.maxCapacityPerShard)) {
                withInitialPositionInStream = withInitialPositionInStream.withMaxCapacityPerShard(Integer.valueOf(Integer.parseInt(this.maxCapacityPerShard)));
            }
            return pBegin.apply(withInitialPositionInStream).apply(ParDo.of(new KinesisRecordToHopRowFn(this.transformName, this.rowMetaJson, this.uniqueIdField, this.partitionKeyField, this.sequenceNumberField, this.subSequenceNumberField, this.shardIdField, this.streamNameField)));
        } catch (Exception e) {
            numErrors.inc();
            LOG.error("Error in Kafka input transform", e);
            throw new RuntimeException("Error in Kafka input transform", e);
        }
    }
}
