package co.cask.cdap.etl.batch;

import co.cask.cdap.etl.api.Alert;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.ErrorRecord;
import co.cask.cdap.etl.api.InvalidEntry;
import co.cask.cdap.etl.api.MultiOutputEmitter;
import co.cask.cdap.etl.common.BasicErrorRecord;
import co.cask.cdap.etl.common.RecordInfo;
import co.cask.cdap.etl.common.RecordType;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/cdap-etl-batch-5.0.0.jar:co/cask/cdap/etl/batch/PipeEmitter.class */
public class PipeEmitter implements Emitter<Object>, MultiOutputEmitter<Object> {
    private static final Logger LOG = LoggerFactory.getLogger(PipeEmitter.class);
    protected final String stageName;
    private final Set<PipeStage<RecordInfo>> outputConsumers;
    private final Multimap<String, PipeStage<RecordInfo>> outputPortConsumers;
    private final Set<PipeStage<RecordInfo<ErrorRecord<Object>>>> errorConsumers;
    private final Set<PipeStage<RecordInfo<Alert>>> alertConsumers;
    private boolean logWarning = true;

    /* loaded from: input_file:lib/cdap-etl-batch-5.0.0.jar:co/cask/cdap/etl/batch/PipeEmitter$Builder.class */
    public static class Builder {
        protected final String stageName;
        protected final Multimap<String, PipeStage<RecordInfo>> outputPortConsumers = HashMultimap.create();
        protected final Set<PipeStage<RecordInfo>> outputConsumers = new HashSet();
        protected final Set<PipeStage<RecordInfo<ErrorRecord<Object>>>> errorConsumers = new HashSet();
        protected final Set<PipeStage<RecordInfo<Alert>>> alertConsumers = new HashSet();

        /* JADX INFO: Access modifiers changed from: protected */
        public Builder(String str) {
            this.stageName = str;
        }

        public Builder addOutputConsumer(PipeStage<RecordInfo> pipeStage) {
            this.outputConsumers.add(pipeStage);
            return this;
        }

        public Builder addOutputConsumer(PipeStage<RecordInfo> pipeStage, String str) {
            this.outputPortConsumers.put(str, pipeStage);
            return this;
        }

        public Builder addErrorConsumer(PipeStage<RecordInfo<ErrorRecord<Object>>> pipeStage) {
            this.errorConsumers.add(pipeStage);
            return this;
        }

        public Builder addAlertConsumer(PipeStage<RecordInfo<Alert>> pipeStage) {
            this.alertConsumers.add(pipeStage);
            return this;
        }

        public PipeEmitter build() {
            return new PipeEmitter(this.stageName, this.outputConsumers, this.outputPortConsumers, this.errorConsumers, this.alertConsumers);
        }
    }

    public PipeEmitter(String str, Set<PipeStage<RecordInfo>> set, Multimap<String, PipeStage<RecordInfo>> multimap, Set<PipeStage<RecordInfo<ErrorRecord<Object>>>> set2, Set<PipeStage<RecordInfo<Alert>>> set3) {
        this.stageName = str;
        this.outputConsumers = ImmutableSet.copyOf((Collection) set);
        this.outputPortConsumers = ImmutableMultimap.copyOf(multimap);
        this.errorConsumers = ImmutableSet.copyOf((Collection) set2);
        this.alertConsumers = ImmutableSet.copyOf((Collection) set3);
    }

    @Override // co.cask.cdap.etl.api.MultiOutputEmitter
    public void emit(String str, Object obj) {
        if (str == null) {
            throw new IllegalArgumentException("Port cannot be null.");
        }
        RecordInfo pipeRecord = getPipeRecord(obj);
        Iterator<PipeStage<RecordInfo>> it = this.outputPortConsumers.get(str).iterator();
        while (it.hasNext()) {
            it.next().consume(pipeRecord);
        }
    }

    @Override // co.cask.cdap.etl.api.Emitter
    public void emit(Object obj) {
        RecordInfo pipeRecord = getPipeRecord(obj);
        Iterator<PipeStage<RecordInfo>> it = this.outputConsumers.iterator();
        while (it.hasNext()) {
            it.next().consume(pipeRecord);
        }
    }

    @Override // co.cask.cdap.etl.api.ErrorEmitter
    public void emitError(InvalidEntry<Object> invalidEntry) {
        if (this.logWarning && this.errorConsumers.isEmpty()) {
            this.logWarning = false;
            LOG.warn("Stage {} emits error records, but has no error consumer. Error records will be dropped.", this.stageName);
        } else {
            RecordInfo<ErrorRecord<Object>> build = RecordInfo.builder(new BasicErrorRecord(invalidEntry.getInvalidRecord(), this.stageName, invalidEntry.getErrorCode(), invalidEntry.getErrorMsg()), this.stageName, RecordType.ERROR).build();
            Iterator<PipeStage<RecordInfo<ErrorRecord<Object>>>> it = this.errorConsumers.iterator();
            while (it.hasNext()) {
                it.next().consume(build);
            }
        }
    }

    @Override // co.cask.cdap.etl.api.AlertEmitter
    public void emitAlert(Map<String, String> map) {
        RecordInfo<Alert> build = RecordInfo.builder(new Alert(this.stageName, ImmutableMap.copyOf((Map) map)), this.stageName, RecordType.ALERT).build();
        Iterator<PipeStage<RecordInfo<Alert>>> it = this.alertConsumers.iterator();
        while (it.hasNext()) {
            it.next().consume(build);
        }
    }

    protected RecordInfo getPipeRecord(Object obj) {
        return RecordInfo.builder(obj, this.stageName, RecordType.OUTPUT).build();
    }

    public static Builder builder(String str) {
        return new Builder(str);
    }
}
