package co.cask.cdap.etl.batch.mapreduce;

import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.api.Resources;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.dataset.lib.FileSetProperties;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSet;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSetArguments;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.mapreduce.MapReduceTaskContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.etl.api.InvalidEntry;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.TransformContext;
import co.cask.cdap.etl.api.batch.BatchConfigurable;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.api.batch.BatchSinkContext;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.cdap.etl.batch.LoggedBatchConfigurable;
import co.cask.cdap.etl.batch.LoggedBatchSink;
import co.cask.cdap.etl.batch.LoggedBatchSource;
import co.cask.cdap.etl.batch.config.ETLBatchConfig;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DatasetContextLookupProvider;
import co.cask.cdap.etl.common.DefaultEmitter;
import co.cask.cdap.etl.common.DefaultStageMetrics;
import co.cask.cdap.etl.common.Destroyables;
import co.cask.cdap.etl.common.LoggedTransform;
import co.cask.cdap.etl.common.Pipeline;
import co.cask.cdap.etl.common.PipelineRegisterer;
import co.cask.cdap.etl.common.SinkInfo;
import co.cask.cdap.etl.common.TransformDetail;
import co.cask.cdap.etl.common.TransformExecutor;
import co.cask.cdap.etl.common.TransformInfo;
import co.cask.cdap.etl.common.TransformResponse;
import co.cask.cdap.etl.log.LogStageInjector;
import co.cask.cdap.format.StructuredRecordStringConverter;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce.class */
public class ETLMapReduce extends AbstractMapReduce {
    private static final String SINK_OUTPUTS_KEY = "cdap.etl.sink.outputs";
    private static final String RUNTIME_ARGS_KEY_PREFIX = "cdap.etl.runtime.args.";
    private BatchConfigurable<BatchSourceContext> batchSource;
    private MapReduceSourceContext sourceContext;
    private Map<String, BatchConfigurable<BatchSinkContext>> batchSinks;
    private Map<String, MapReduceSinkContext> sinkContexts;
    private Metrics mrMetrics;
    private final ETLBatchConfig config;
    public static final String NAME = ETLMapReduce.class.getSimpleName();
    private static final Logger LOG = LoggerFactory.getLogger(ETLMapReduce.class);
    private static final Type SINK_OUTPUTS_TYPE = new TypeToken<List<SinkOutput>>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.1
    }.getType();
    private static final Type RUNTIME_ARGS_TYPE = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.2
    }.getType();
    private static final Gson GSON = new Gson();
    private static final Schema AVRO_ERROR_SCHEMA = new Schema.Parser().parse(Constants.ERROR_SCHEMA.toString());

    /* loaded from: input_file:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce$ETLMapper.class */
    public static class ETLMapper extends Mapper implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> {
        private static final Logger LOG = LoggerFactory.getLogger(ETLMapper.class);
        private static final Gson GSON = new Gson();
        private Set<String> transformsWithoutErrorDataset;
        private TransformExecutor<KeyValue> transformExecutor;
        private Metrics mapperMetrics;
        private Map<String, WrappedSink<Object, Object, Object>> sinks;
        private Map<String, ErrorSink<Object, Object>> transformErrorSinkMap;

        public void initialize(MapReduceTaskContext<Object, Object> mapReduceTaskContext) throws Exception {
            Map properties = mapReduceTaskContext.getSpecification().getProperties();
            if (Boolean.valueOf((String) properties.get(Constants.STAGE_LOGGING_ENABLED)).booleanValue()) {
                LogStageInjector.start();
            }
            Mapper.Context context = (Mapper.Context) mapReduceTaskContext.getHadoopContext();
            Configuration configuration = context.getConfiguration();
            Pipeline pipeline = (Pipeline) GSON.fromJson((String) properties.get(Constants.PIPELINEID), Pipeline.class);
            Preconditions.checkNotNull(pipeline, "Pipeline is null");
            Preconditions.checkNotNull(pipeline.getSinks(), "Sinks could not be found in program properties");
            Preconditions.checkNotNull(pipeline.getTransforms());
            Preconditions.checkNotNull(pipeline.getConnections(), "Connections could not be found in program properties");
            String source = pipeline.getSource();
            new DefaultEmitter(this.mapperMetrics);
            List<TransformInfo> transforms = pipeline.getTransforms();
            Map<String, List<String>> connections = pipeline.getConnections();
            HashMap hashMap = new HashMap();
            LoggedBatchSource loggedBatchSource = new LoggedBatchSource(source, (BatchSource) mapReduceTaskContext.newPluginInstance(source));
            loggedBatchSource.initialize((BatchRuntimeContext) new MapReduceRuntimeContext(mapReduceTaskContext, this.mapperMetrics, new DatasetContextLookupProvider(mapReduceTaskContext), source, (Map) GSON.fromJson(configuration.get(ETLMapReduce.RUNTIME_ARGS_KEY_PREFIX + source), ETLMapReduce.RUNTIME_ARGS_TYPE)));
            hashMap.put(source, new TransformDetail(loggedBatchSource, new DefaultStageMetrics(this.mapperMetrics, source), connections.get(source)));
            this.transformErrorSinkMap = new HashMap();
            this.transformsWithoutErrorDataset = new HashSet();
            addTransforms(hashMap, connections, transforms, mapReduceTaskContext);
            String str = context.getConfiguration().get(ETLMapReduce.SINK_OUTPUTS_KEY);
            Preconditions.checkNotNull(str, "Sink outputs not found in Hadoop conf.");
            List<SinkOutput> list = (List) GSON.fromJson(str, ETLMapReduce.SINK_OUTPUTS_TYPE);
            Preconditions.checkArgument(!list.isEmpty(), "Sink outputs not found in Hadoop conf.");
            boolean hasOneOutput = hasOneOutput(transforms, list);
            this.sinks = new HashMap(list.size());
            for (SinkOutput sinkOutput : list) {
                String sinkPluginId = sinkOutput.getSinkPluginId();
                Set<String> sinkOutputs = sinkOutput.getSinkOutputs();
                LoggedBatchSink loggedBatchSink = new LoggedBatchSink(sinkPluginId, (BatchSink) mapReduceTaskContext.newPluginInstance(sinkPluginId));
                loggedBatchSink.initialize((BatchRuntimeContext) new MapReduceRuntimeContext(mapReduceTaskContext, this.mapperMetrics, new DatasetContextLookupProvider(mapReduceTaskContext), sinkPluginId, (Map) GSON.fromJson(configuration.get(ETLMapReduce.RUNTIME_ARGS_KEY_PREFIX + sinkPluginId), ETLMapReduce.RUNTIME_ARGS_TYPE)));
                if (hasOneOutput) {
                    this.sinks.put(sinkPluginId, new SingleOutputSink(loggedBatchSink, mapReduceTaskContext));
                } else {
                    this.sinks.put(sinkPluginId, new MultiOutputSink(loggedBatchSink, mapReduceTaskContext, sinkOutputs));
                }
                hashMap.put(sinkPluginId, new TransformDetail(loggedBatchSink, new DefaultStageMetrics(this.mapperMetrics, sinkPluginId), new ArrayList()));
            }
            this.transformExecutor = new TransformExecutor<>(hashMap, ImmutableList.of(source));
        }

        private boolean hasOneOutput(List<TransformInfo> list, List<SinkOutput> list2) {
            Iterator<TransformInfo> it = list.iterator();
            while (it.hasNext()) {
                if (it.next().getErrorDatasetName() != null) {
                    return false;
                }
            }
            HashSet hashSet = new HashSet();
            for (SinkOutput sinkOutput : list2) {
                if (sinkOutput.getErrorDatasetName() != null) {
                    return false;
                }
                hashSet.addAll(sinkOutput.getSinkOutputs());
            }
            return hashSet.size() == 1;
        }

        private void addTransforms(Map<String, TransformDetail> map, Map<String, List<String>> map2, List<TransformInfo> list, MapReduceTaskContext mapReduceTaskContext) throws Exception {
            for (TransformInfo transformInfo : list) {
                String transformId = transformInfo.getTransformId();
                LoggedTransform loggedTransform = new LoggedTransform(transformId, (Transform) mapReduceTaskContext.newPluginInstance(transformId));
                MapReduceRuntimeContext mapReduceRuntimeContext = new MapReduceRuntimeContext(mapReduceTaskContext, this.mapperMetrics, new DatasetContextLookupProvider(mapReduceTaskContext), transformId, mapReduceTaskContext.getRuntimeArguments());
                LOG.debug("Transform Class : {}", loggedTransform.getClass().getName());
                loggedTransform.initialize((TransformContext) mapReduceRuntimeContext);
                map.put(transformId, new TransformDetail(loggedTransform, new DefaultStageMetrics(this.mapperMetrics, transformId), map2.get(transformId)));
                if (transformInfo.getErrorDatasetName() != null) {
                    this.transformErrorSinkMap.put(transformId, new ErrorSink<>(mapReduceTaskContext, transformInfo.getErrorDatasetName()));
                }
            }
        }

        public void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            try {
                TransformResponse runOneIteration = this.transformExecutor.runOneIteration(new KeyValue(obj, obj2));
                for (Map.Entry<String, Collection<Object>> entry : runOneIteration.getSinksResults().entrySet()) {
                    WrappedSink<Object, Object, Object> wrappedSink = this.sinks.get(entry.getKey());
                    Iterator<Object> it = entry.getValue().iterator();
                    while (it.hasNext()) {
                        wrappedSink.write((KeyValue) it.next());
                    }
                }
                for (Map.Entry<String, Collection<InvalidEntry<Object>>> entry2 : runOneIteration.getMapTransformIdToErrorEmitter().entrySet()) {
                    if (!this.transformsWithoutErrorDataset.contains(entry2.getKey())) {
                        if (!entry2.getValue().isEmpty()) {
                            if (this.transformErrorSinkMap.containsKey(entry2.getKey())) {
                                this.transformErrorSinkMap.get(entry2.getKey()).write(entry2.getValue());
                            } else {
                                LOG.warn("Transform : {} has error records, but does not have a error dataset configured.", entry2.getKey());
                                this.transformsWithoutErrorDataset.add(entry2.getKey());
                            }
                        }
                    }
                }
                this.transformExecutor.resetEmitter();
            } catch (Exception e) {
                LOG.error("Exception thrown in BatchDriver Mapper.", (Throwable) e);
                Throwables.propagate(e);
            }
        }

        public void destroy() {
            Destroyables.destroyQuietly(this.transformExecutor);
            LOG.debug("Number of sinks to destroy: {}", Integer.valueOf(this.sinks.size()));
            for (WrappedSink<Object, Object, Object> wrappedSink : this.sinks.values()) {
                LOG.trace("Destroying sink: {}", wrappedSink.sink);
                Destroyables.destroyQuietly(wrappedSink.sink);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce$ErrorSink.class */
    public static class ErrorSink<KEY_OUT, VAL_OUT> {
        private final MapReduceTaskContext<KEY_OUT, VAL_OUT> context;
        private final String errorDatasetName;

        private ErrorSink(MapReduceTaskContext<KEY_OUT, VAL_OUT> mapReduceTaskContext, String str) {
            this.context = mapReduceTaskContext;
            this.errorDatasetName = str;
        }

        public void write(Collection<InvalidEntry<Object>> collection) throws Exception {
            Iterator<InvalidEntry<Object>> it = collection.iterator();
            while (it.hasNext()) {
                this.context.write(this.errorDatasetName, new AvroKey(ETLMapReduce.getGenericRecordForInvalidEntry(it.next())), NullWritable.get());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce$MultiOutputSink.class */
    public static class MultiOutputSink<IN, KEY_OUT, VAL_OUT> extends WrappedSink<IN, KEY_OUT, VAL_OUT> {
        private final Set<String> outputNames;

        private MultiOutputSink(BatchSink<IN, KEY_OUT, VAL_OUT> batchSink, MapReduceTaskContext<KEY_OUT, VAL_OUT> mapReduceTaskContext, Set<String> set) {
            super(batchSink, mapReduceTaskContext);
            this.outputNames = set;
        }

        @Override // co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.WrappedSink
        public void write(KeyValue<KEY_OUT, VAL_OUT> keyValue) throws Exception {
            Iterator<String> it = this.outputNames.iterator();
            while (it.hasNext()) {
                this.context.write(it.next(), keyValue.getKey(), keyValue.getValue());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce$SingleOutputSink.class */
    public static class SingleOutputSink<IN, KEY_OUT, VAL_OUT> extends WrappedSink<IN, KEY_OUT, VAL_OUT> {
        protected SingleOutputSink(BatchSink<IN, KEY_OUT, VAL_OUT> batchSink, MapReduceTaskContext<KEY_OUT, VAL_OUT> mapReduceTaskContext) {
            super(batchSink, mapReduceTaskContext);
        }

        @Override // co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.WrappedSink
        public void write(KeyValue<KEY_OUT, VAL_OUT> keyValue) throws Exception {
            this.context.write(keyValue.getKey(), keyValue.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce$SinkOutput.class */
    public static class SinkOutput {
        private String sinkPluginId;
        private Set<String> sinkOutputs;
        private String errorDatasetName;

        private SinkOutput(String str, Set<String> set, String str2) {
            this.sinkPluginId = str;
            this.sinkOutputs = set;
            this.errorDatasetName = str2;
        }

        public String getSinkPluginId() {
            return this.sinkPluginId;
        }

        public Set<String> getSinkOutputs() {
            return this.sinkOutputs;
        }

        public String getErrorDatasetName() {
            return this.errorDatasetName;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce$WrappedSink.class */
    public static abstract class WrappedSink<IN, KEY_OUT, VAL_OUT> {
        protected final BatchSink<IN, KEY_OUT, VAL_OUT> sink;
        protected final MapReduceTaskContext<KEY_OUT, VAL_OUT> context;

        protected WrappedSink(BatchSink<IN, KEY_OUT, VAL_OUT> batchSink, MapReduceTaskContext<KEY_OUT, VAL_OUT> mapReduceTaskContext) {
            this.sink = batchSink;
            this.context = mapReduceTaskContext;
        }

        protected abstract void write(KeyValue<KEY_OUT, VAL_OUT> keyValue) throws Exception;
    }

    public ETLMapReduce(ETLBatchConfig eTLBatchConfig) {
        this.config = eTLBatchConfig;
    }

    public void configure() {
        setName(NAME);
        setDescription("MapReduce Driver for ETL Batch Applications");
        Pipeline registerPlugins = new PipelineRegisterer(getConfigurer(), "batch").registerPlugins(this.config, TimePartitionedFileSet.class, FileSetProperties.builder().setInputFormat(AvroKeyInputFormat.class).setOutputFormat(AvroKeyOutputFormat.class).setEnableExploreOnCreate(true).setSerDe("org.apache.hadoop.hive.serde2.avro.AvroSerDe").setExploreInputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat").setExploreOutputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat").setTableProperty("avro.schema.literal", Constants.ERROR_SCHEMA.toString()).build(), true);
        Resources resources = this.config.getResources();
        if (resources != null) {
            setMapperResources(resources);
        }
        Resources driverResources = this.config.getDriverResources();
        if (driverResources != null) {
            setDriverResources(driverResources);
        }
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINEID, GSON.toJson(registerPlugins));
        hashMap.put(Constants.STAGE_LOGGING_ENABLED, String.valueOf(this.config.isStageLoggingEnabled()));
        setProperties(hashMap);
    }

    public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
        if (Boolean.valueOf(mapReduceContext.getSpecification().getProperty(Constants.STAGE_LOGGING_ENABLED)).booleanValue()) {
            LogStageInjector.start();
        }
        Job job = (Job) mapReduceContext.getHadoopJob();
        Configuration configuration = job.getConfiguration();
        Pipeline pipeline = (Pipeline) GSON.fromJson((String) mapReduceContext.getSpecification().getProperties().get(Constants.PIPELINEID), Pipeline.class);
        Preconditions.checkNotNull(pipeline, "Pipeline is null");
        Preconditions.checkNotNull(pipeline.getSinks(), "Sinks could not be found in program properties");
        Preconditions.checkNotNull(pipeline.getTransforms());
        Preconditions.checkNotNull(pipeline.getConnections(), "Connections could not be found in program properties");
        String source = pipeline.getSource();
        this.batchSource = (BatchConfigurable) mapReduceContext.newPluginInstance(source);
        this.batchSource = new LoggedBatchConfigurable(source, this.batchSource);
        this.sourceContext = new MapReduceSourceContext(mapReduceContext, this.mrMetrics, new DatasetContextLookupProvider(mapReduceContext), source, mapReduceContext.getRuntimeArguments());
        this.batchSource.prepareRun(this.sourceContext);
        configuration.set(RUNTIME_ARGS_KEY_PREFIX + source, GSON.toJson(this.sourceContext.getRuntimeArguments(), RUNTIME_ARGS_TYPE));
        for (TransformInfo transformInfo : pipeline.getTransforms()) {
            if (transformInfo.getErrorDatasetName() != null) {
                addPropertiesToErrorDataset(transformInfo.getErrorDatasetName(), mapReduceContext);
            }
        }
        ArrayList arrayList = new ArrayList();
        List<SinkInfo> sinks = pipeline.getSinks();
        this.batchSinks = new HashMap(sinks.size());
        this.sinkContexts = new HashMap(sinks.size());
        for (SinkInfo sinkInfo : sinks) {
            LoggedBatchConfigurable loggedBatchConfigurable = new LoggedBatchConfigurable(sinkInfo.getSinkId(), (BatchConfigurable) mapReduceContext.newPluginInstance(sinkInfo.getSinkId()));
            MapReduceSinkContext mapReduceSinkContext = new MapReduceSinkContext(mapReduceContext, this.mrMetrics, new DatasetContextLookupProvider(mapReduceContext), sinkInfo.getSinkId(), mapReduceContext.getRuntimeArguments());
            this.sinkContexts.put(sinkInfo.getSinkId(), mapReduceSinkContext);
            this.batchSinks.put(sinkInfo.getSinkId(), loggedBatchConfigurable);
            loggedBatchConfigurable.prepareRun(mapReduceSinkContext);
            arrayList.add(new SinkOutput(sinkInfo.getSinkId(), mapReduceSinkContext.getOutputNames(), sinkInfo.getErrorDatasetName()));
            if (sinkInfo.getErrorDatasetName() != null) {
                addPropertiesToErrorDataset(sinkInfo.getErrorDatasetName(), mapReduceContext);
            }
            configuration.set(RUNTIME_ARGS_KEY_PREFIX + sinkInfo.getSinkId(), GSON.toJson(mapReduceSinkContext.getRuntimeArguments(), RUNTIME_ARGS_TYPE));
        }
        configuration.set(SINK_OUTPUTS_KEY, GSON.toJson(arrayList));
        job.setMapperClass(ETLMapper.class);
        job.setNumReduceTasks(0);
    }

    private void addPropertiesToErrorDataset(String str, MapReduceContext mapReduceContext) {
        HashMap hashMap = new HashMap();
        hashMap.put("output.properties.avro.schema.output.key", Constants.ERROR_SCHEMA.toString());
        TimePartitionedFileSetArguments.setOutputPartitionTime(hashMap, mapReduceContext.getLogicalStartTime());
        mapReduceContext.addOutput(str, hashMap);
    }

    public void onFinish(boolean z, MapReduceContext mapReduceContext) throws Exception {
        onRunFinishSource(z);
        onRunFinishSinks(mapReduceContext, z);
        LOG.info("Batch Run finished : succeeded = {}", Boolean.valueOf(z));
    }

    private void onRunFinishSource(boolean z) {
        LOG.info("On RunFinish Source : {}", this.batchSource.getClass().getName());
        try {
            this.batchSource.onRunFinish(z, this.sourceContext);
        } catch (Throwable th) {
            LOG.warn("Exception when calling onRunFinish on {}", this.batchSource, th);
        }
    }

    private void onRunFinishSinks(MapReduceContext mapReduceContext, boolean z) {
        String property = mapReduceContext.getSpecification().getProperty(Constants.PIPELINEID);
        Preconditions.checkNotNull(property, "pipeline could not be found in program properties.");
        for (SinkInfo sinkInfo : ((Pipeline) GSON.fromJson(property, Pipeline.class)).getSinks()) {
            BatchConfigurable<BatchSinkContext> batchConfigurable = this.batchSinks.get(sinkInfo.getSinkId());
            try {
                batchConfigurable.onRunFinish(z, this.sinkContexts.get(sinkInfo.getSinkId()));
            } catch (Throwable th) {
                LOG.warn("Exception when calling onRunFinish on {}", batchConfigurable, th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static GenericRecord getGenericRecordForInvalidEntry(InvalidEntry invalidEntry) {
        String format;
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(AVRO_ERROR_SCHEMA);
        genericRecordBuilder.set(Constants.ErrorDataset.ERRCODE, Integer.valueOf(invalidEntry.getErrorCode()));
        genericRecordBuilder.set(Constants.ErrorDataset.ERRMSG, invalidEntry.getErrorMsg());
        if (invalidEntry.getInvalidRecord() instanceof StructuredRecord) {
            try {
                format = StructuredRecordStringConverter.toJsonString((StructuredRecord) invalidEntry.getInvalidRecord());
            } catch (IOException e) {
                format = "Exception while converting StructuredRecord to String, " + e.getCause();
            }
        } else {
            format = String.format("Error Entry is of type %s, only a record of type %s is supported currently", invalidEntry.getInvalidRecord().getClass().getName(), StructuredRecord.class.getName());
        }
        genericRecordBuilder.set(Constants.ErrorDataset.INVALIDENTRY, format);
        return genericRecordBuilder.build();
    }
}
