package co.cask.cdap.data.stream;

import co.cask.cdap.api.stream.StreamEventDecoder;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.KeyValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/* loaded from: input_file:co/cask/cdap/data/stream/StreamInputFormat.class */
public abstract class StreamInputFormat<K, V> extends InputFormat<K, V> {
    private static final String EVENT_START_TIME = "input.streaminputformat.event.starttime";
    private static final String EVENT_END_TIME = "input.streaminputformat.event.endtime";
    private static final String STREAM_PATH = "input.streaminputformat.stream.path";
    private static final String STREAM_TTL = "input.streaminputformat.stream.event.ttl";
    private static final String MAX_SPLIT_SIZE = "input.streaminputformat.max.splits.size";
    private static final String MIN_SPLIT_SIZE = "input.streaminputformat.min.splits.size";

    public static void setTTL(Job job, long j) {
        Preconditions.checkArgument(j >= 0, "TTL must be >= 0");
        job.getConfiguration().setLong(STREAM_TTL, j);
    }

    public static void setTimeRange(Job job, long j, long j2) {
        Preconditions.checkArgument(j >= 0, "Start time must be >= 0");
        Preconditions.checkArgument(j2 >= 0, "End time must be >= 0");
        job.getConfiguration().setLong(EVENT_START_TIME, j);
        job.getConfiguration().setLong(EVENT_END_TIME, j2);
    }

    public static void setStreamPath(Job job, URI uri) {
        job.getConfiguration().set(STREAM_PATH, uri.toString());
    }

    public static void setMaxSplitSize(Job job, long j) {
        job.getConfiguration().setLong(MAX_SPLIT_SIZE, j);
    }

    public static void setMinSplitSize(Job job, long j) {
        job.getConfiguration().setLong(MIN_SPLIT_SIZE, j);
    }

    protected abstract StreamEventDecoder<K, V> createStreamEventDecoder();

    public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
        Configuration configuration = jobContext.getConfiguration();
        long j = configuration.getLong(STREAM_TTL, KeyValue.LATEST_TIMESTAMP);
        long j2 = configuration.getLong(EVENT_END_TIME, KeyValue.LATEST_TIMESTAMP);
        long max = Math.max(configuration.getLong(EVENT_START_TIME, 0L), getCurrentTime() - j);
        Path path = new Path(URI.create(configuration.get(STREAM_PATH)));
        long j3 = configuration.getLong(MAX_SPLIT_SIZE, KeyValue.LATEST_TIMESTAMP);
        long min = Math.min(configuration.getLong(MIN_SPLIT_SIZE, 1L), j3);
        Preconditions.checkArgument(max >= 0, "Invalid start time %s", new Object[]{Long.valueOf(max)});
        Preconditions.checkArgument(j2 >= 0, "Invalid end time %s", new Object[]{Long.valueOf(j2)});
        ArrayList newArrayList = Lists.newArrayList();
        FileSystem fileSystem = path.getFileSystem(configuration);
        for (FileStatus fileStatus : fileSystem.listStatus(path)) {
            if (fileStatus.isDirectory()) {
                long partitionStartTime = StreamUtils.getPartitionStartTime(fileStatus.getPath().getName());
                long partitionEndTime = StreamUtils.getPartitionEndTime(fileStatus.getPath().getName());
                if (partitionStartTime <= j2 && partitionEndTime > max) {
                    Iterator<StreamDataFileSplitter> it = collectBuckets(fileSystem, fileStatus.getPath()).iterator();
                    while (it.hasNext()) {
                        it.next().computeSplits(fileSystem, min, j3, max, j2, newArrayList);
                    }
                }
            }
        }
        return newArrayList;
    }

    public RecordReader<K, V> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new StreamRecordReader(createStreamEventDecoder());
    }

    protected long getCurrentTime() {
        return System.currentTimeMillis();
    }

    private Collection<StreamDataFileSplitter> collectBuckets(FileSystem fileSystem, Path path) throws IOException {
        ImmutableList.Builder builder = ImmutableList.builder();
        for (FileStatus fileStatus : fileSystem.listStatus(path)) {
            if (StreamFileType.EVENT.isMatched(fileStatus.getPath().getName())) {
                builder.add(new StreamDataFileSplitter(fileStatus));
            }
        }
        return builder.build();
    }
}
