package org.apache.hop.beam.core.transform;

import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang.StringUtils;
import org.apache.hop.beam.core.BeamHop;
import org.apache.hop.beam.core.HopRow;
import org.apache.hop.beam.core.fn.PubsubMessageToHopRowFn;
import org.apache.hop.beam.core.fn.StringToHopRowFn;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.JsonRowMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hop/beam/core/transform/BeamSubscribeTransform.class */
public class BeamSubscribeTransform extends PTransform<PBegin, PCollection<HopRow>> {
    private String transformName;
    private String subscription;
    private String topic;
    private String messageType;
    private String rowMetaJson;
    private static final Logger LOG = LoggerFactory.getLogger(BeamSubscribeTransform.class);
    private transient IRowMeta rowMeta;
    private transient Counter initCounter;
    private transient Counter inputCounter;
    private transient Counter writtenCounter;
    private transient Counter errorCounter;

    public BeamSubscribeTransform() {
    }

    public BeamSubscribeTransform(@Nullable String str, String str2, String str3, String str4, String str5, String str6) {
        super(str);
        this.transformName = str2;
        this.subscription = str3;
        this.topic = str4;
        this.messageType = str5;
        this.rowMetaJson = str6;
    }

    public PCollection<HopRow> expand(PBegin pBegin) {
        PCollection<HopRow> apply;
        try {
            if (this.rowMeta == null) {
                BeamHop.init();
                this.rowMeta = JsonRowMeta.fromJson(this.rowMetaJson);
                this.inputCounter = Metrics.counter("input", this.transformName);
                this.writtenCounter = Metrics.counter("written", this.transformName);
                Metrics.counter("init", this.transformName).inc();
            }
            if ("String".equalsIgnoreCase(this.messageType)) {
                PubsubIO.Read readStrings = PubsubIO.readStrings();
                apply = (PCollection) (StringUtils.isNotEmpty(this.subscription) ? readStrings.fromSubscription(this.subscription) : readStrings.fromTopic(this.topic)).expand(pBegin).apply(this.transformName, ParDo.of(new StringToHopRowFn(this.transformName, this.rowMetaJson)));
            } else {
                if (!"PubsubMessage".equalsIgnoreCase(this.messageType)) {
                    throw new RuntimeException("Unsupported message type: " + this.messageType);
                }
                PubsubIO.Read readMessages = PubsubIO.readMessages();
                apply = (StringUtils.isNotEmpty(this.subscription) ? readMessages.fromSubscription(this.subscription) : readMessages.fromTopic(this.topic)).expand(pBegin).apply(this.transformName, ParDo.of(new PubsubMessageToHopRowFn(this.transformName, this.rowMetaJson)));
            }
            return apply;
        } catch (Exception e) {
            Metrics.counter("error", this.transformName).inc();
            LOG.error("Error in beam subscribe transform", e);
            throw new RuntimeException("Error in beam subscribe transform", e);
        }
    }

    public String getTransformName() {
        return this.transformName;
    }

    public void setTransformName(String str) {
        this.transformName = str;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public String getMessageType() {
        return this.messageType;
    }

    public void setMessageType(String str) {
        this.messageType = str;
    }
}
