package co.cask.cdap.data.stream;

import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.format.RecordFormat;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.stream.StreamEventDecoder;
import co.cask.cdap.data.stream.decoder.BytesStreamEventDecoder;
import co.cask.cdap.data.stream.decoder.FormatStreamEventDecoder;
import co.cask.cdap.data.stream.decoder.IdentityStreamEventDecoder;
import co.cask.cdap.data.stream.decoder.StringStreamEventDecoder;
import co.cask.cdap.data.stream.decoder.TextStreamEventDecoder;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.KeyValue;
import co.cask.cdap.format.RecordFormats;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/* loaded from: input_file:co/cask/cdap/data/stream/StreamInputFormat.class */
public class StreamInputFormat<K, V> extends InputFormat<K, V> {
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private static final StreamInputSplitFactory<InputSplit> splitFactory = new StreamInputSplitFactory<InputSplit>() { // from class: co.cask.cdap.data.stream.StreamInputFormat.1
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // co.cask.cdap.data.stream.StreamInputSplitFactory
        public InputSplit createSplit(Path path, Path path2, long j, long j2, long j3, long j4, @Nullable String[] strArr) {
            return new StreamInputSplit(path, path2, j, j2, j3, j4, strArr);
        }
    };
    private static final String EVENT_START_TIME = "input.streaminputformat.event.starttime";
    private static final String EVENT_END_TIME = "input.streaminputformat.event.endtime";
    private static final String STREAM_PATH = "input.streaminputformat.stream.path";
    private static final String STREAM_TTL = "input.streaminputformat.stream.event.ttl";
    private static final String MAX_SPLIT_SIZE = "input.streaminputformat.max.splits.size";
    private static final String MIN_SPLIT_SIZE = "input.streaminputformat.min.splits.size";
    private static final String DECODER_TYPE = "input.streaminputformat.decoder.type";
    private static final String BODY_FORMAT = "input.streaminputformat.stream.body.format";

    public static void setTTL(Configuration configuration, long j) {
        Preconditions.checkArgument(j >= 0, "TTL must be >= 0");
        configuration.setLong(STREAM_TTL, j);
    }

    public static void setTimeRange(Configuration configuration, long j, long j2) {
        Preconditions.checkArgument(j >= 0, "Start time must be >= 0");
        Preconditions.checkArgument(j2 >= 0, "End time must be >= 0");
        configuration.setLong(EVENT_START_TIME, j);
        configuration.setLong(EVENT_END_TIME, j2);
    }

    public static void setStreamPath(Configuration configuration, URI uri) {
        configuration.set(STREAM_PATH, uri.toString());
    }

    public static void setMaxSplitSize(Configuration configuration, long j) {
        configuration.setLong(MAX_SPLIT_SIZE, j);
    }

    public static void setMinSplitSize(Configuration configuration, long j) {
        configuration.setLong(MIN_SPLIT_SIZE, j);
    }

    public static void setDecoderClassName(Configuration configuration, String str) {
        configuration.set(DECODER_TYPE, str);
    }

    public static Class<? extends StreamEventDecoder> getDecoderClass(Configuration configuration) {
        return configuration.getClass(DECODER_TYPE, (Class) null, StreamEventDecoder.class);
    }

    public static void setBodyFormatSpecification(Configuration configuration, FormatSpecification formatSpecification) {
        configuration.set(BODY_FORMAT, GSON.toJson(formatSpecification));
        setDecoderClassName(configuration, FormatStreamEventDecoder.class.getName());
    }

    public static void inferDecoderClass(Configuration configuration, Type type) {
        if (Text.class.equals(type)) {
            setDecoderClassName(configuration, TextStreamEventDecoder.class.getName());
            return;
        }
        if (String.class.equals(type)) {
            setDecoderClassName(configuration, StringStreamEventDecoder.class.getName());
            return;
        }
        if (BytesWritable.class.equals(type)) {
            setDecoderClassName(configuration, BytesStreamEventDecoder.class.getName());
        } else {
            if (!(type instanceof Class) || !((Class) type).isAssignableFrom(StreamEvent.class)) {
                throw new IllegalArgumentException("The value class must be of type BytesWritable, Text, StreamEvent or StreamEventData if no decoder type is provided");
            }
            setDecoderClassName(configuration, IdentityStreamEventDecoder.class.getName());
        }
    }

    public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
        Configuration configuration = jobContext.getConfiguration();
        long j = configuration.getLong(STREAM_TTL, KeyValue.LATEST_TIMESTAMP);
        long j2 = configuration.getLong(EVENT_END_TIME, KeyValue.LATEST_TIMESTAMP);
        long max = Math.max(configuration.getLong(EVENT_START_TIME, 0L), getCurrentTime() - j);
        long j3 = configuration.getLong(MAX_SPLIT_SIZE, KeyValue.LATEST_TIMESTAMP);
        return StreamInputSplitFinder.builder(URI.create(configuration.get(STREAM_PATH))).setStartTime(max).setEndTime(j2).setMinSplitSize(Math.min(configuration.getLong(MIN_SPLIT_SIZE, 1L), j3)).setMaxSplitSize(j3).build(splitFactory).getSplits(configuration);
    }

    public RecordReader<K, V> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new StreamRecordReader(createStreamEventDecoder(taskAttemptContext.getConfiguration()));
    }

    protected long getCurrentTime() {
        return System.currentTimeMillis();
    }

    protected StreamEventDecoder<K, V> createStreamEventDecoder(Configuration configuration) {
        Class<? extends StreamEventDecoder> decoderClass = getDecoderClass(configuration);
        Preconditions.checkNotNull(decoderClass, "Failed to load stream event decoder %s", new Object[]{configuration.get(DECODER_TYPE)});
        try {
            if (!decoderClass.isAssignableFrom(FormatStreamEventDecoder.class)) {
                return decoderClass.newInstance();
            }
            try {
                return new FormatStreamEventDecoder(getInitializedFormat(configuration));
            } catch (Exception e) {
                throw new IllegalArgumentException("Unable to get the stream body format.");
            }
        } catch (Exception e2) {
            throw Throwables.propagate(e2);
        }
    }

    private RecordFormat<StreamEvent, V> getInitializedFormat(Configuration configuration) throws UnsupportedTypeException, IllegalAccessException, ClassNotFoundException, InstantiationException {
        String str = configuration.get(BODY_FORMAT);
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("input.streaminputformat.stream.body.format must be set in the configuration in order to use a format for the stream body.");
        }
        return RecordFormats.createInitializedFormat((FormatSpecification) GSON.fromJson(str, FormatSpecification.class));
    }
}
