package org.apache.hop.beam.core.transform;

import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.hop.beam.core.BeamHop;
import org.apache.hop.beam.core.HopRow;
import org.apache.hop.beam.core.fn.PublishMessagesFn;
import org.apache.hop.beam.core.fn.PublishStringsFn;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.JsonRowMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hop/beam/core/transform/BeamPublishTransform.class */
public class BeamPublishTransform extends PTransform<PCollection<HopRow>, PDone> {
    private String transformName;
    private String topic;
    private String messageType;
    private String messageField;
    private String rowMetaJson;
    private static final Logger LOG = LoggerFactory.getLogger(BeamPublishTransform.class);
    private transient IRowMeta rowMeta;
    private transient Counter errorCounter;
    private transient int fieldIndex;

    public BeamPublishTransform() {
    }

    public BeamPublishTransform(String str, String str2, String str3, String str4, String str5) {
        this.transformName = str;
        this.topic = str2;
        this.messageType = str3;
        this.messageField = str4;
        this.rowMetaJson = str5;
    }

    public PDone expand(PCollection<HopRow> pCollection) {
        try {
            if (this.rowMeta == null) {
                BeamHop.init();
                JsonRowMeta.fromJson(this.rowMetaJson);
                IRowMeta fromJson = JsonRowMeta.fromJson(this.rowMetaJson);
                Counter counter = Metrics.counter("init", this.transformName);
                Metrics.counter("read", this.transformName);
                Metrics.counter("output", this.transformName);
                this.errorCounter = Metrics.counter("error", this.transformName);
                this.fieldIndex = fromJson.indexOfValue(this.messageField);
                if (this.fieldIndex < 0) {
                    throw new RuntimeException("Field '" + this.messageField + "' couldn't be found in the input row: " + fromJson.toString());
                }
                counter.inc();
            }
            if ("String".equalsIgnoreCase(this.messageType)) {
                return PubsubIO.writeStrings().to(this.topic).expand(pCollection.apply(this.transformName, ParDo.of(new PublishStringsFn(this.transformName, this.fieldIndex, this.rowMetaJson))));
            }
            if (!"PubsubMessage".equalsIgnoreCase(this.messageType)) {
                throw new RuntimeException("Message type '" + this.messageType + "' is not yet supported");
            }
            return PubsubIO.writeMessages().to(this.topic).expand(pCollection.apply(ParDo.of(new PublishMessagesFn(this.transformName, this.fieldIndex, this.rowMetaJson))));
        } catch (Exception e) {
            this.errorCounter.inc();
            LOG.error("Error in beam publish transform", e);
            throw new RuntimeException("Error in beam publish transform", e);
        }
    }

    public String getTransformName() {
        return this.transformName;
    }

    public void setTransformName(String str) {
        this.transformName = str;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public String getMessageType() {
        return this.messageType;
    }

    public void setMessageType(String str) {
        this.messageType = str;
    }

    public String getMessageField() {
        return this.messageField;
    }

    public void setMessageField(String str) {
        this.messageField = str;
    }
}
