/*
 * Decompiled with CFR 0.152.
 */
package kafka.etl;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLRequest;
import kafka.etl.Props;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.MultipleOutputs;

public class KafkaETLContext {
    protected static int MAX_RETRY_TIME = 1;
    static final String CLIENT_BUFFER_SIZE = "client.buffer.size";
    static final String CLIENT_TIMEOUT = "client.so.timeout";
    static final int DEFAULT_BUFFER_SIZE = 0x100000;
    static final int DEFAULT_TIMEOUT = 60000;
    static final KafkaETLKey DUMMY_KEY = new KafkaETLKey();
    protected int _index;
    protected String _input = null;
    protected KafkaETLRequest _request = null;
    protected SimpleConsumer _consumer = null;
    protected long[] _offsetRange = new long[]{0L, 0L};
    protected long _offset = Long.MAX_VALUE;
    protected long _count;
    protected FetchResponse _response = null;
    protected Iterator<MessageAndOffset> _messageIt = null;
    protected Iterator<ByteBufferMessageSet> _respIterator = null;
    protected int _retry = 0;
    protected long _requestTime = 0L;
    protected long _startTime = -1L;
    protected int _bufferSize;
    protected int _timeout;
    protected Reporter _reporter;
    protected MultipleOutputs _mos;
    protected OutputCollector<KafkaETLKey, BytesWritable> _offsetOut = null;
    protected FetchRequestBuilder builder = new FetchRequestBuilder();

    public long getTotalBytes() {
        return this._offsetRange[1] > this._offsetRange[0] ? this._offsetRange[1] - this._offsetRange[0] : 0L;
    }

    public long getReadBytes() {
        return this._offset - this._offsetRange[0];
    }

    public long getCount() {
        return this._count;
    }

    public KafkaETLContext(JobConf job, Props props, Reporter reporter, MultipleOutputs mos, int index, String input) throws Exception {
        this._bufferSize = KafkaETLContext.getClientBufferSize(props);
        this._timeout = KafkaETLContext.getClientTimeout(props);
        System.out.println("bufferSize=" + this._bufferSize);
        System.out.println("timeout=" + this._timeout);
        this._reporter = reporter;
        this._mos = mos;
        this._index = index;
        this._input = input;
        this._request = new KafkaETLRequest(input.trim());
        URI uri = this._request.getURI();
        this._consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), this._timeout, this._bufferSize, "KafkaETLContext");
        this._offsetRange = this.getOffsetRange();
        System.out.println("Connected to node " + uri + " beginning reading at offset " + this._offsetRange[0] + " latest offset=" + this._offsetRange[1]);
        this._offset = this._offsetRange[0];
        this._count = 0L;
        this._requestTime = 0L;
        this._retry = 0;
        this._startTime = System.currentTimeMillis();
    }

    public boolean hasMore() {
        return this._messageIt != null && this._messageIt.hasNext() || this._response != null && this._respIterator.hasNext() || this._offset < this._offsetRange[1];
    }

    public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException {
        if (!this.hasMore()) {
            return false;
        }
        boolean gotNext = this.get(key, value);
        if (this._response != null) {
            while (!gotNext && this._respIterator.hasNext()) {
                ByteBufferMessageSet msgSet = this._respIterator.next();
                this._messageIt = msgSet.iterator();
                gotNext = this.get(key, value);
            }
        }
        return gotNext;
    }

    public boolean fetchMore() throws IOException {
        if (!this.hasMore()) {
            return false;
        }
        FetchRequest fetchRequest = this.builder.clientId(this._request.clientId()).addFetch(this._request.getTopic(), this._request.getPartition(), this._offset, this._bufferSize).build();
        long tempTime = System.currentTimeMillis();
        this._response = this._consumer.fetch(fetchRequest);
        if (this._response != null) {
            this._respIterator = new ArrayList<ByteBufferMessageSet>(){
                {
                    this.add(KafkaETLContext.this._response.messageSet(KafkaETLContext.this._request.getTopic(), KafkaETLContext.this._request.getPartition()));
                }
            }.iterator();
        }
        this._requestTime += System.currentTimeMillis() - tempTime;
        return true;
    }

    public void output(String fileprefix) throws IOException {
        String offsetString = this._request.toString(this._offset);
        if (this._offsetOut == null) {
            this._offsetOut = this._mos.getCollector("offsets", fileprefix + this._index, this._reporter);
        }
        this._offsetOut.collect((Object)DUMMY_KEY, (Object)new BytesWritable(offsetString.getBytes("UTF-8")));
    }

    public void close() throws IOException {
        if (this._consumer != null) {
            this._consumer.close();
        }
        String topic = this._request.getTopic();
        long endTime = System.currentTimeMillis();
        this._reporter.incrCounter(topic, "read-time(ms)", endTime - this._startTime);
        this._reporter.incrCounter(topic, "request-time(ms)", this._requestTime);
        long bytesRead = this._offset - this._offsetRange[0];
        double megaRead = (double)bytesRead / 1048576.0;
        this._reporter.incrCounter(topic, "data-read(mb)", (long)megaRead);
        this._reporter.incrCounter(topic, "event-count", this._count);
    }

    protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
        if (this._messageIt != null && this._messageIt.hasNext()) {
            MessageAndOffset messageAndOffset = this._messageIt.next();
            ByteBuffer buf = messageAndOffset.message().buffer();
            int origSize = buf.remaining();
            byte[] bytes = new byte[origSize];
            buf.get(bytes, buf.position(), origSize);
            value.set(bytes, 0, origSize);
            key.set(this._index, this._offset, messageAndOffset.message().checksum());
            this._offset = messageAndOffset.nextOffset();
            ++this._count;
            return true;
        }
        return false;
    }

    protected long[] getOffsetRange() throws IOException {
        long[] range = new long[2];
        TopicAndPartition topicAndPartition = new TopicAndPartition(this._request.getTopic(), this._request.getPartition());
        HashMap<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId());
        long[] startOffsets = this._consumer.getOffsetsBefore(request).offsets(this._request.getTopic(), this._request.getPartition());
        if (startOffsets.length != 1) {
            throw new IOException("input:" + this._input + " Expect one smallest offset but get " + startOffsets.length);
        }
        range[0] = startOffsets[0];
        requestInfo.clear();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
        request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId());
        long[] endOffsets = this._consumer.getOffsetsBefore(request).offsets(this._request.getTopic(), this._request.getPartition());
        if (endOffsets.length != 1) {
            throw new IOException("input:" + this._input + " Expect one latest offset but get " + endOffsets.length);
        }
        range[1] = endOffsets[0];
        if (this._request.isValidOffset()) {
            long startOffset = this._request.getOffset();
            if (startOffset > range[0]) {
                System.out.println("Update starting offset with " + startOffset);
                range[0] = startOffset;
            } else {
                System.out.println("WARNING: given starting offset " + startOffset + " is smaller than the smallest one " + range[0] + ". Will ignore it.");
            }
        }
        System.out.println("Using offset range [" + range[0] + ", " + range[1] + "]");
        return range;
    }

    public static int getClientBufferSize(Props props) throws Exception {
        return props.getInt(CLIENT_BUFFER_SIZE, 0x100000);
    }

    public static int getClientTimeout(Props props) throws Exception {
        return props.getInt(CLIENT_TIMEOUT, 60000);
    }
}

