package co.cask.cdap.api.data.stream;

import co.cask.cdap.api.annotation.Beta;
import co.cask.cdap.api.data.batch.BatchReadable;
import co.cask.cdap.api.data.batch.Split;
import co.cask.cdap.api.data.batch.SplitReader;
import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.stream.StreamEventDecoder;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import co.cask.tephra.Transaction;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:co/cask/cdap/api/data/stream/StreamBatchReadable.class */
public class StreamBatchReadable implements BatchReadable<Long, String> {
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private static final String START_TIME_KEY = "start";
    private static final String END_TIME_KEY = "end";
    private static final String DECODER_KEY = "decoder";
    private static final String BODY_FORMAT_KEY = "bodyFormat";
    private final URI uri;
    private final String streamName;
    private final long startTime;
    private final long endTime;
    private final String decoderType;
    private final FormatSpecification bodyFormatSpec;

    public static void useStreamInput(MapReduceContext mapReduceContext, String str) {
        useStreamInput(mapReduceContext, str, 0L, Transaction.NO_TX_IN_PROGRESS);
    }

    public static void useStreamInput(MapReduceContext mapReduceContext, String str, long j, long j2) {
        mapReduceContext.setInput(new StreamBatchReadable(str, j, j2).toURI().toString());
    }

    public static void useStreamInput(MapReduceContext mapReduceContext, String str, long j, long j2, Class<? extends StreamEventDecoder> cls) {
        mapReduceContext.setInput(new StreamBatchReadable(str, j, j2, cls).toURI().toString());
    }

    public static void useStreamInput(MapReduceContext mapReduceContext, String str, long j, long j2, FormatSpecification formatSpecification) {
        mapReduceContext.setInput(new StreamBatchReadable(str, j, j2, formatSpecification).toURI().toString());
    }

    private static URI createStreamURI(String str, Map<String, Object> map) {
        return URI.create(String.format("stream://%s?%s", str, Joiner.on('&').join(Iterables.transform(map.entrySet(), new Function<Map.Entry<String, Object>, String>() { // from class: co.cask.cdap.api.data.stream.StreamBatchReadable.1
            @Override // com.google.common.base.Function
            public String apply(Map.Entry<String, Object> entry) {
                try {
                    return String.format("%s=%s", URLEncoder.encode(entry.getKey(), Charsets.UTF_8.name()), URLEncoder.encode(entry.getValue().toString(), Charsets.UTF_8.name()));
                } catch (UnsupportedEncodingException e) {
                    throw Throwables.propagate(e);
                }
            }
        }))));
    }

    public StreamBatchReadable(URI uri) {
        Preconditions.checkArgument(Constants.Notification.Stream.STREAM_FEED_CATEGORY.equals(uri.getScheme()));
        this.uri = uri;
        this.streamName = uri.getAuthority();
        String query = uri.getQuery();
        if (query == null || query.isEmpty()) {
            this.startTime = 0L;
            this.endTime = Transaction.NO_TX_IN_PROGRESS;
            this.decoderType = null;
            this.bodyFormatSpec = null;
            return;
        }
        Map<String, String> split = Splitter.on('&').withKeyValueSeparator("=").split(query);
        this.startTime = split.containsKey("start") ? Long.parseLong(split.get("start")) : 0L;
        this.endTime = split.containsKey("end") ? Long.parseLong(split.get("end")) : Transaction.NO_TX_IN_PROGRESS;
        this.decoderType = split.get(DECODER_KEY);
        this.bodyFormatSpec = decodeFormatSpec(split.get(BODY_FORMAT_KEY));
    }

    public StreamBatchReadable(String str) {
        this(str, 0L, Transaction.NO_TX_IN_PROGRESS);
    }

    public StreamBatchReadable(String str, long j, long j2) {
        this(createStreamURI(str, ImmutableMap.of("start", Long.valueOf(j), "end", Long.valueOf(j2))));
    }

    public StreamBatchReadable(String str, long j, long j2, Class<? extends StreamEventDecoder> cls) {
        this(createStreamURI(str, ImmutableMap.of("start", (String) Long.valueOf(j), "end", (String) Long.valueOf(j2), DECODER_KEY, cls.getName())));
    }

    @Beta
    public StreamBatchReadable(String str, long j, long j2, FormatSpecification formatSpecification) {
        this(createStreamURI(str, ImmutableMap.of("start", (String) Long.valueOf(j), "end", (String) Long.valueOf(j2), BODY_FORMAT_KEY, encodeFormatSpec(formatSpecification))));
    }

    public String getStreamName() {
        return this.streamName;
    }

    public long getStartTime() {
        return this.startTime;
    }

    public long getEndTime() {
        return this.endTime;
    }

    public String getDecoderType() {
        return this.decoderType;
    }

    public FormatSpecification getFormatSpecification() {
        return this.bodyFormatSpec;
    }

    public URI toURI() {
        return this.uri;
    }

    @Override // co.cask.cdap.api.data.batch.BatchReadable
    public List<Split> getSplits() {
        return null;
    }

    @Override // co.cask.cdap.api.data.batch.BatchReadable
    public SplitReader<Long, String> createSplitReader(Split split) {
        return null;
    }

    private static String encodeFormatSpec(FormatSpecification formatSpecification) {
        try {
            return URLEncoder.encode(GSON.toJson(formatSpecification), "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw Throwables.propagate(e);
        }
    }

    private static FormatSpecification decodeFormatSpec(String str) {
        if (str == null) {
            return null;
        }
        try {
            return (FormatSpecification) GSON.fromJson(URLDecoder.decode(str, "UTF-8"), FormatSpecification.class);
        } catch (UnsupportedEncodingException e) {
            throw Throwables.propagate(e);
        }
    }
}
