package co.cask.cdap.etl.batch.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.cdap.etl.common.BatchFileFilter;
import co.cask.cdap.etl.common.Properties;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("File")
@Description("Batch source for File Systems")
@Plugin(type = "batchsource")
/* loaded from: input_file:co/cask/cdap/etl/batch/source/FileBatchSource.class */
public class FileBatchSource extends BatchSource<LongWritable, Object, StructuredRecord> {
    public static final String INPUT_NAME_CONFIG = "input.path.name";
    public static final String INPUT_REGEX_CONFIG = "input.path.regex";
    public static final String LAST_TIME_READ = "last.time.read";
    public static final String CUTOFF_READ_TIME = "cutoff.read.time";
    public static final String USE_TIMEFILTER = "timefilter";
    protected static final String MAX_SPLIT_SIZE_DESCRIPTION = "Maximum split-size for each mapper in the MapReduce Job. Defaults to 128MB.";
    protected static final String PATH_DESCRIPTION = "Path to file(s) to be read. If a directory is specified, terminate the path name with a '/'.";
    protected static final String TABLE_DESCRIPTION = "Name of the Table that keeps track of the last time files were read in.";
    protected static final String INPUT_FORMAT_CLASS_DESCRIPTION = "Name of the input format class, which must be a subclass of FileInputFormat. Defaults to CombineTextInputFormat.";
    protected static final String REGEX_DESCRIPTION = "Regex to filter out filenames in the path. To use the TimeFilter, input \"timefilter\". The TimeFilter assumes that it is reading in files with the File log naming convention of 'YYYY-MM-DD-HH-mm-SS-Tag'. The TimeFilter reads in files from the previous hour if the field 'timeTable' is left blank. If it's currently 2015-06-16-15 (June 16th 2015, 3pm), it will read in files that contain '2015-06-16-14' in the filename. If the field 'timeTable' is present, then it will read in files that have not yet been read.";
    private static final String FILESYSTEM_PROPERTIES_DESCRIPTION = "JSON of the properties needed for the distributed file system. The formatting needs to be as follows:\n{\n\t\"<property name>\" : \"<property value>\", ...\n}. For example, the property names needed for S3 are \"fs.s3n.awsSecretAccessKey\" and \"fs.s3n.awsAccessKeyId\".";
    private static final String FILESYSTEM_DESCRIPTION = "Distributed file system to read in from.";
    private static final int DEFAULT_SPLIT_SIZE = 134217728;
    private final FileBatchConfig config;
    private KeyValueTable table;
    private Date prevHour;
    private String datesToRead;
    public static final Schema DEFAULT_SCHEMA = Schema.recordOf("event", new Schema.Field[]{Schema.Field.of("ts", Schema.of(Schema.Type.LONG)), Schema.Field.of(Properties.Stream.DEFAULT_BODY_FIELD, Schema.of(Schema.Type.STRING))});
    private static final Gson GSON = new Gson();
    private static final Logger LOG = LoggerFactory.getLogger(FileBatchSource.class);
    private static final Type ARRAYLIST_DATE_TYPE = new TypeToken<ArrayList<Date>>() { // from class: co.cask.cdap.etl.batch.source.FileBatchSource.1
    }.getType();
    private static final Type MAP_STRING_STRING_TYPE = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.etl.batch.source.FileBatchSource.2
    }.getType();

    /* loaded from: input_file:co/cask/cdap/etl/batch/source/FileBatchSource$FileBatchConfig.class */
    public static class FileBatchConfig extends PluginConfig {

        @Name(Properties.File.FILESYSTEM)
        @Description(FileBatchSource.FILESYSTEM_DESCRIPTION)
        private String fileSystem;

        @Name(Properties.File.FILESYSTEM_PROPERTIES)
        @Description(FileBatchSource.FILESYSTEM_PROPERTIES_DESCRIPTION)
        @Nullable
        private String fileSystemProperties;

        @Name("path")
        @Description(FileBatchSource.PATH_DESCRIPTION)
        private String path;

        @Name("fileRegex")
        @Description(FileBatchSource.REGEX_DESCRIPTION)
        @Nullable
        private String fileRegex;

        @Name("timeTable")
        @Description(FileBatchSource.TABLE_DESCRIPTION)
        @Nullable
        private String timeTable;

        @Name("inputFormatClass")
        @Description(FileBatchSource.INPUT_FORMAT_CLASS_DESCRIPTION)
        @Nullable
        private String inputFormatClass;

        @Name("maxSplitSize")
        @Description(FileBatchSource.MAX_SPLIT_SIZE_DESCRIPTION)
        @Nullable
        private String maxSplitSize;

        public FileBatchConfig(String str, String str2, @Nullable String str3, @Nullable String str4, @Nullable String str5, @Nullable String str6, @Nullable String str7) {
            this.fileSystem = str;
            this.fileSystemProperties = str6;
            this.path = str2;
            this.fileRegex = str3;
            this.timeTable = str4;
            this.inputFormatClass = str5;
            this.maxSplitSize = str7;
        }
    }

    public FileBatchSource(FileBatchConfig fileBatchConfig) {
        this.config = fileBatchConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        if (this.config.timeTable != null) {
            pipelineConfigurer.createDataset(this.config.timeTable, KeyValueTable.class, DatasetProperties.EMPTY);
        }
    }

    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        long j;
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH");
        this.prevHour = new Date(batchSourceContext.getLogicalStartTime() - TimeUnit.HOURS.toMillis(1L));
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(this.prevHour);
        calendar.set(12, 0);
        calendar.set(13, 0);
        calendar.set(14, 0);
        this.prevHour = calendar.getTime();
        Job job = (Job) batchSourceContext.getHadoopJob();
        Configuration configuration = job.getConfiguration();
        if (this.config.fileSystemProperties != null) {
            for (Map.Entry entry : ((Map) GSON.fromJson(this.config.fileSystemProperties, MAP_STRING_STRING_TYPE)).entrySet()) {
                configuration.set((String) entry.getKey(), (String) entry.getValue());
            }
        }
        if (this.config.fileRegex != null) {
            configuration.set(INPUT_REGEX_CONFIG, this.config.fileRegex);
        }
        configuration.set(INPUT_NAME_CONFIG, this.config.path);
        if (this.config.timeTable != null) {
            this.table = batchSourceContext.getDataset(this.config.timeTable);
            this.datesToRead = Bytes.toString(this.table.read(LAST_TIME_READ));
            if (this.datesToRead == null) {
                this.datesToRead = GSON.toJson(Lists.newArrayList(new Date[]{new Date(0L)}), ARRAYLIST_DATE_TYPE);
            }
            String json = GSON.toJson(Lists.newArrayList(new Date[]{this.prevHour}), ARRAYLIST_DATE_TYPE);
            if (!json.equals(this.datesToRead)) {
                this.table.write(LAST_TIME_READ, json);
            }
            configuration.set(LAST_TIME_READ, this.datesToRead);
        }
        configuration.set(CUTOFF_READ_TIME, simpleDateFormat.format(this.prevHour));
        if (Strings.isNullOrEmpty(this.config.inputFormatClass)) {
            job.setInputFormatClass(CombineTextInputFormat.class);
        } else {
            job.setInputFormatClass(Thread.currentThread().getContextClassLoader().loadClass(this.config.inputFormatClass));
        }
        FileInputFormat.setInputPathFilter(job, BatchFileFilter.class);
        FileInputFormat.addInputPath(job, new Path(this.config.path));
        try {
            j = Long.parseLong(this.config.maxSplitSize);
        } catch (NumberFormatException e) {
            j = 134217728;
        }
        CombineTextInputFormat.setMaxInputSplitSize(job, j);
    }

    public void transform(KeyValue<LongWritable, Object> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(StructuredRecord.builder(DEFAULT_SCHEMA).set("ts", Long.valueOf(System.currentTimeMillis())).set(Properties.Stream.DEFAULT_BODY_FIELD, keyValue.getValue().toString()).build());
    }

    public void onRunFinish(boolean z, BatchSourceContext batchSourceContext) {
        if (z || this.table == null || !USE_TIMEFILTER.equals(this.config.fileRegex)) {
            return;
        }
        List list = (List) GSON.fromJson(Bytes.toString(this.table.read(LAST_TIME_READ)), ARRAYLIST_DATE_TYPE);
        List list2 = (List) GSON.fromJson(this.datesToRead, ARRAYLIST_DATE_TYPE);
        list2.add(this.prevHour);
        list2.addAll(list);
        this.table.write(LAST_TIME_READ, GSON.toJson(list2, ARRAYLIST_DATE_TYPE));
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<LongWritable, Object>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
