package org.apache.kylin.source.kafka;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
import org.apache.kylin.source.kafka.job.MergeOffsetStep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/source/kafka/KafkaMRInput.class */
public class KafkaMRInput implements IMRInput {
    CubeSegment cubeSegment;

    /* loaded from: input_file:org/apache/kylin/source/kafka/KafkaMRInput$BatchCubingInputSide.class */
    public static class BatchCubingInputSide implements IMRInput.IMRBatchCubingInputSide {
        final JobEngineConfig conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
        final CubeSegment seg;
        private String outputPath;

        public BatchCubingInputSide(CubeSegment cubeSegment) {
            this.seg = cubeSegment;
        }

        @Override // org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide
        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable defaultChainedExecutable) {
            defaultChainedExecutable.addTask(createSaveKafkaDataStep(defaultChainedExecutable.getId()));
        }

        private MapReduceExecutable createSaveKafkaDataStep(String str) {
            MapReduceExecutable mapReduceExecutable = new MapReduceExecutable();
            this.outputPath = JoinedFlatTable.getTableDir(new CubeJoinedFlatTableDesc(this.seg), JobBuilderSupport.getJobWorkingDir(this.conf, str));
            mapReduceExecutable.setName("Save data from Kafka");
            mapReduceExecutable.setMapReduceJobClass(KafkaFlatTableJob.class);
            JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(this.seg, "system");
            StringBuilder sb = new StringBuilder();
            jobBuilderSupport.appendMapReduceParameters(sb);
            JobBuilderSupport.appendExecCmdParameters(sb, BatchConstants.ARG_CUBE_NAME, this.seg.getRealization().getName());
            JobBuilderSupport.appendExecCmdParameters(sb, BatchConstants.ARG_OUTPUT, this.outputPath);
            JobBuilderSupport.appendExecCmdParameters(sb, BatchConstants.ARG_SEGMENT_ID, this.seg.getUuid());
            JobBuilderSupport.appendExecCmdParameters(sb, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + this.seg.getRealization().getName() + "_Step");
            mapReduceExecutable.setMapReduceParams(sb.toString());
            return mapReduceExecutable;
        }

        @Override // org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide
        public void addStepPhase4_Cleanup(DefaultChainedExecutable defaultChainedExecutable) {
            GarbageCollectionStep garbageCollectionStep = new GarbageCollectionStep();
            garbageCollectionStep.setName(ExecutableConstants.STEP_NAME_KAFKA_CLEANUP);
            garbageCollectionStep.setDataPath(this.outputPath);
            defaultChainedExecutable.addTask(garbageCollectionStep);
        }

        @Override // org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide
        public IMRInput.IMRTableInputFormat getFlatTableInputFormat() {
            KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(this.seg.getRealization().getFactTable());
            return new KafkaTableInputFormat(this.seg, new CubeJoinedFlatTableDesc(this.seg).getAllColumns(), kafkaConfig, this.conf);
        }
    }

    /* loaded from: input_file:org/apache/kylin/source/kafka/KafkaMRInput$GarbageCollectionStep.class */
    public static class GarbageCollectionStep extends AbstractExecutable {
        private static final Logger logger = LoggerFactory.getLogger((Class<?>) GarbageCollectionStep.class);

        @Override // org.apache.kylin.job.execution.AbstractExecutable
        protected ExecuteResult doWork(ExecutableContext executableContext) throws ExecuteException {
            try {
                rmdirOnHDFS(getDataPath());
                return new ExecuteResult(ExecuteResult.State.SUCCEED, "HDFS path " + getDataPath() + " is dropped.\n");
            } catch (IOException e) {
                logger.error("job:" + getId() + " execute finished with exception", (Throwable) e);
                return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
            }
        }

        private void rmdirOnHDFS(String str) throws IOException {
            Path path = new Path(str);
            FileSystem fileSystem = FileSystem.get(path.toUri(), HadoopUtil.getCurrentConfiguration());
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, true);
            }
        }

        public void setDataPath(String str) {
            setParam("dataPath", str);
        }

        private String getDataPath() {
            return getParam("dataPath");
        }
    }

    /* loaded from: input_file:org/apache/kylin/source/kafka/KafkaMRInput$KafkaMRBatchMergeInputSide.class */
    class KafkaMRBatchMergeInputSide implements IMRInput.IMRBatchMergeInputSide {
        private CubeSegment cubeSegment;

        KafkaMRBatchMergeInputSide(CubeSegment cubeSegment) {
            this.cubeSegment = cubeSegment;
        }

        @Override // org.apache.kylin.engine.mr.IMRInput.IMRBatchMergeInputSide
        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable defaultChainedExecutable) {
            MergeOffsetStep mergeOffsetStep = new MergeOffsetStep();
            mergeOffsetStep.setName("Merge offset step");
            CubingExecutableUtil.setCubeName(this.cubeSegment.getRealization().getName(), mergeOffsetStep.getParams());
            CubingExecutableUtil.setSegmentId(this.cubeSegment.getUuid(), mergeOffsetStep.getParams());
            CubingExecutableUtil.setCubingJobId(defaultChainedExecutable.getId(), mergeOffsetStep.getParams());
            defaultChainedExecutable.addTask(mergeOffsetStep);
        }
    }

    /* loaded from: input_file:org/apache/kylin/source/kafka/KafkaMRInput$KafkaTableInputFormat.class */
    public static class KafkaTableInputFormat implements IMRInput.IMRTableInputFormat {
        private final CubeSegment cubeSegment;
        private StreamingParser streamingParser;
        private final JobEngineConfig conf;

        public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> list, KafkaConfig kafkaConfig, JobEngineConfig jobEngineConfig) {
            this.cubeSegment = cubeSegment;
            this.conf = jobEngineConfig;
            try {
                this.streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), list);
            } catch (ReflectiveOperationException e) {
                throw new IllegalArgumentException(e);
            }
        }

        @Override // org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat
        public void configureJob(Job job) {
            job.setInputFormatClass(SequenceFileInputFormat.class);
            job.setMapOutputValueClass(Text.class);
            try {
                FileInputFormat.addInputPath(job, new Path(JoinedFlatTable.getTableDir(new CubeJoinedFlatTableDesc(this.cubeSegment), JobBuilderSupport.getJobWorkingDir(this.conf, job.getConfiguration().get("cubingJobId")))));
            } catch (IOException e) {
                throw new IllegalStateException(e);
            }
        }

        @Override // org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat
        public String[] parseMapperInput(Object obj) {
            Text text = (Text) obj;
            StreamingMessage parse = this.streamingParser.parse(ByteBuffer.wrap(text.getBytes(), 0, text.getLength()));
            return (String[]) parse.getData().toArray(new String[parse.getData().size()]);
        }
    }

    @Override // org.apache.kylin.engine.mr.IMRInput
    public IMRInput.IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc iJoinedFlatTableDesc) {
        this.cubeSegment = (CubeSegment) iJoinedFlatTableDesc.getSegment();
        return new BatchCubingInputSide(this.cubeSegment);
    }

    @Override // org.apache.kylin.engine.mr.IMRInput
    public IMRInput.IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
        KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(tableDesc.getIdentity());
        return new KafkaTableInputFormat(this.cubeSegment, Lists.transform(Arrays.asList(tableDesc.getColumns()), new Function<ColumnDesc, TblColRef>() { // from class: org.apache.kylin.source.kafka.KafkaMRInput.1
            @Override // com.google.common.base.Function
            @Nullable
            public TblColRef apply(ColumnDesc columnDesc) {
                return columnDesc.getRef();
            }
        }), kafkaConfig, null);
    }

    @Override // org.apache.kylin.engine.mr.IMRInput
    public IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(ISegment iSegment) {
        return new KafkaMRBatchMergeInputSide((CubeSegment) iSegment);
    }
}
