package co.cask.cdap.etl.batch.mapreduce;

import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.mapreduce.MapReduceTaskContext;
import co.cask.cdap.etl.api.InvalidEntry;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.format.StructuredRecordStringConverter;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.mapred.AvroKey;
import org.apache.hadoop.io.NullWritable;

/* loaded from: input_file:lib/cdap-etl-batch-4.3.5.jar:co/cask/cdap/etl/batch/mapreduce/ErrorOutputWriter.class */
public class ErrorOutputWriter<KEY_OUT, VAL_OUT> {
    private static final Schema AVRO_ERROR_SCHEMA = new Schema.Parser().parse(Constants.ERROR_SCHEMA.toString());
    private final MapReduceTaskContext<KEY_OUT, VAL_OUT> context;
    private final String errorDatasetName;

    public ErrorOutputWriter(MapReduceTaskContext<KEY_OUT, VAL_OUT> mapReduceTaskContext, String str) {
        this.context = mapReduceTaskContext;
        this.errorDatasetName = str;
    }

    public void write(Collection<InvalidEntry<Object>> collection) throws Exception {
        Iterator<InvalidEntry<Object>> it = collection.iterator();
        while (it.hasNext()) {
            write(it.next());
        }
    }

    public void write(InvalidEntry<Object> invalidEntry) throws Exception {
        this.context.write(this.errorDatasetName, new AvroKey(getGenericRecordForInvalidEntry(invalidEntry)), NullWritable.get());
    }

    private GenericRecord getGenericRecordForInvalidEntry(InvalidEntry invalidEntry) {
        String format;
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(AVRO_ERROR_SCHEMA);
        genericRecordBuilder.set(Constants.ErrorDataset.ERRCODE, Integer.valueOf(invalidEntry.getErrorCode()));
        genericRecordBuilder.set(Constants.ErrorDataset.ERRMSG, invalidEntry.getErrorMsg());
        if (invalidEntry.getInvalidRecord() instanceof KeyValue) {
            try {
                format = StructuredRecordStringConverter.toJsonString((StructuredRecord) ((KeyValue) invalidEntry.getInvalidRecord()).getValue());
            } catch (IOException e) {
                format = "Exception while converting StructuredRecord to String, " + e.getCause();
            }
        } else if (invalidEntry.getInvalidRecord() instanceof StructuredRecord) {
            try {
                format = StructuredRecordStringConverter.toJsonString((StructuredRecord) invalidEntry.getInvalidRecord());
            } catch (IOException e2) {
                format = "Exception while converting StructuredRecord to String, " + e2.getCause();
            }
        } else {
            format = String.format("Error Entry is of type %s, only a record of type %s is supported currently", invalidEntry.getInvalidRecord().getClass().getName(), StructuredRecord.class.getName());
        }
        genericRecordBuilder.set(Constants.ErrorDataset.INVALIDENTRY, format);
        return genericRecordBuilder.build();
    }
}
