/*
 * Decompiled with CFR 0.152.
 */
package kafka.bridge.hadoop;

import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class KafkaRecordWriter<K, V>
extends RecordWriter<K, V> {
    protected Producer<Object, byte[]> producer;
    protected String topic;
    protected List<KeyedMessage<Object, byte[]>> msgList = new LinkedList<KeyedMessage<Object, byte[]>>();
    protected int totalBytes = 0;
    protected int queueBytes;

    public KafkaRecordWriter(Producer<Object, byte[]> producer, String topic, int queueBytes) {
        this.producer = producer;
        this.topic = topic;
        this.queueBytes = queueBytes;
    }

    protected void sendMsgList() throws IOException {
        if (this.msgList.size() > 0) {
            try {
                this.producer.send(this.msgList);
            }
            catch (Exception e) {
                throw new IOException(e);
            }
            this.msgList.clear();
            this.totalBytes = 0;
        }
    }

    public void write(K key, V value) throws IOException, InterruptedException {
        byte[] valBytes;
        if (value instanceof byte[]) {
            valBytes = (byte[])value;
        } else if (value instanceof BytesWritable) {
            valBytes = Arrays.copyOf(((BytesWritable)value).getBytes(), ((BytesWritable)value).getLength());
        } else {
            throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish");
        }
        if (this.totalBytes + valBytes.length > this.queueBytes || this.msgList.size() >= Short.MAX_VALUE) {
            this.sendMsgList();
        }
        this.msgList.add((KeyedMessage<Object, byte[]>)new KeyedMessage(this.topic, key, (Object)valBytes));
        this.totalBytes += valBytes.length;
    }

    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        this.sendMsgList();
        this.producer.close();
    }
}

