package co.cask.cdap.internal.app.runtime.batch;

import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.app.metrics.MapReduceMetrics;
import co.cask.cdap.common.lang.PropertyFieldSetter;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.internal.app.runtime.DataSetFieldSetter;
import co.cask.cdap.internal.app.runtime.MetricsFieldSetter;
import co.cask.cdap.internal.lang.Reflections;
import co.cask.cdap.internal.lang.Visitor;
import com.google.common.base.Throwables;
import com.google.common.reflect.TypeToken;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/MapperWrapper.class */
public class MapperWrapper extends Mapper {
    public static final String ATTR_MAPPER_CLASS = "c.mapper.class";
    private static final Logger LOG = LoggerFactory.getLogger(MapperWrapper.class);

    public void run(Mapper.Context context) throws IOException, InterruptedException {
        BasicMapReduceContext basicMapReduceContext = new MapReduceContextProvider(context, MapReduceMetrics.TaskType.Mapper).get();
        context.getConfiguration().setClassLoader(basicMapReduceContext.getProgram().getClassLoader());
        basicMapReduceContext.getMetricsCollectionService().startAndWait();
        try {
            ProgramLifecycle createMapperInstance = createMapperInstance(basicMapReduceContext.getProgram().getClassLoader(), context.getConfiguration().get(ATTR_MAPPER_CLASS));
            try {
                Reflections.visit(createMapperInstance, TypeToken.of(createMapperInstance.getClass()), new PropertyFieldSetter(basicMapReduceContext.getSpecification().getProperties()), new Visitor[]{new MetricsFieldSetter(basicMapReduceContext.getMetrics()), new DataSetFieldSetter(basicMapReduceContext)});
                LoggingContextAccessor.setLoggingContext(basicMapReduceContext.getLoggingContext());
                WrappedMapper.Context createAutoFlushingContext = createAutoFlushingContext(context, basicMapReduceContext);
                if (createMapperInstance instanceof ProgramLifecycle) {
                    try {
                        createMapperInstance.initialize(basicMapReduceContext);
                    } catch (Exception e) {
                        LOG.error("Failed to initialize mapper with {}", basicMapReduceContext, e);
                        throw Throwables.propagate(e);
                    }
                }
                createMapperInstance.run(createAutoFlushingContext);
                TimeUnit.SECONDS.sleep(2L);
                try {
                    basicMapReduceContext.flushOperations();
                    if (createMapperInstance instanceof ProgramLifecycle) {
                        try {
                            createMapperInstance.destroy();
                        } catch (Exception e2) {
                            LOG.error("Error during destroy of mapper {}", basicMapReduceContext, e2);
                        }
                    }
                } catch (Exception e3) {
                    LOG.error("Failed to flush operations at the end of mapper of {}", basicMapReduceContext, e3);
                    throw Throwables.propagate(e3);
                }
            } catch (Throwable th) {
                LOG.error("Failed to inject fields to {}.", createMapperInstance.getClass(), th);
                throw Throwables.propagate(th);
            }
        } finally {
            basicMapReduceContext.close();
            basicMapReduceContext.getMetricsCollectionService().stop();
        }
    }

    private WrappedMapper.Context createAutoFlushingContext(Mapper.Context context, BasicMapReduceContext basicMapReduceContext) {
        int i = context.getConfiguration().getInt("c.mapper.flush.freq", 10000);
        WrappedMapper wrappedMapper = new WrappedMapper();
        wrappedMapper.getClass();
        return new WrappedMapper.Context(wrappedMapper, context, i, basicMapReduceContext) { // from class: co.cask.cdap.internal.app.runtime.batch.MapperWrapper.1
            private int processedRecords;
            final /* synthetic */ int val$flushFreq;
            final /* synthetic */ BasicMapReduceContext val$basicMapReduceContext;

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(wrappedMapper, context);
                this.val$flushFreq = i;
                this.val$basicMapReduceContext = basicMapReduceContext;
                wrappedMapper.getClass();
                this.processedRecords = 0;
            }

            public boolean nextKeyValue() throws IOException, InterruptedException {
                boolean nextKeyValue = super.nextKeyValue();
                int i2 = this.processedRecords + 1;
                this.processedRecords = i2;
                if (i2 > this.val$flushFreq) {
                    try {
                        MapperWrapper.LOG.info("Flushing dataset operations...");
                        this.val$basicMapReduceContext.flushOperations();
                        this.processedRecords = 0;
                    } catch (Exception e) {
                        MapperWrapper.LOG.error("Failed to persist changes", e);
                        throw Throwables.propagate(e);
                    }
                }
                return nextKeyValue;
            }
        };
    }

    private Mapper createMapperInstance(ClassLoader classLoader, String str) {
        try {
            return (Mapper) classLoader.loadClass(str).newInstance();
        } catch (Exception e) {
            LOG.error("Failed to create instance of the user-defined Mapper class: " + str);
            throw Throwables.propagate(e);
        }
    }
}
