/*
 * Decompiled with CFR 0.152.
 */
package kafka.etl.impl;

import java.net.URI;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Random;
import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLRequest;
import kafka.etl.Props;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.kafka.common.utils.Utils;

public class DataGenerator {
    protected static final Random RANDOM = new Random(System.currentTimeMillis());
    protected Props _props;
    protected Producer _producer = null;
    protected URI _uri = null;
    protected String _topic;
    protected int _count;
    protected String _offsetsDir;
    protected final int TCP_BUFFER_SIZE = 300000;
    protected final int CONNECT_TIMEOUT = 20000;
    protected final int RECONNECT_INTERVAL = Integer.MAX_VALUE;

    public DataGenerator(String id, Props props) throws Exception {
        this._props = props;
        this._topic = props.getProperty("kafka.etl.topic");
        System.out.println("topics=" + this._topic);
        this._count = props.getInt("event.count");
        this._offsetsDir = this._props.getProperty("input");
        String serverUri = this._props.getProperty("kafka.server.uri");
        this._uri = new URI(serverUri);
        System.out.println("server uri:" + this._uri.toString());
        Properties producerProps = new Properties();
        producerProps.put("metadata.broker.list", Utils.formatAddress((String)this._uri.getHost(), (Integer)this._uri.getPort()));
        producerProps.put("send.buffer.bytes", String.valueOf(300000));
        producerProps.put("connect.timeout.ms", String.valueOf(20000));
        producerProps.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
        this._producer = new Producer(new ProducerConfig(producerProps));
    }

    public void run() throws Exception {
        ArrayList<KeyedMessage> list = new ArrayList<KeyedMessage>();
        for (int i = 0; i < this._count; ++i) {
            Long timestamp = RANDOM.nextLong();
            if (timestamp < 0L) {
                timestamp = -timestamp.longValue();
            }
            byte[] bytes = timestamp.toString().getBytes("UTF8");
            list.add(new KeyedMessage(this._topic, null, (Object)bytes));
        }
        System.out.println(" send " + list.size() + " " + this._topic + " count events to " + this._uri);
        this._producer.send(list);
        this._producer.close();
        this.generateOffsets();
    }

    protected void generateOffsets() throws Exception {
        JobConf conf = new JobConf();
        conf.set("hadoop.job.ugi", this._props.getProperty("hadoop.job.ugi"));
        conf.setCompressMapOutput(false);
        Path outPath = new Path(this._offsetsDir + "/" + "1.dat");
        FileSystem fs = outPath.getFileSystem((Configuration)conf);
        if (fs.exists(outPath)) {
            fs.delete(outPath);
        }
        KafkaETLRequest request = new KafkaETLRequest(this._topic, "tcp://" + Utils.formatAddress((String)this._uri.getHost(), (Integer)this._uri.getPort()), 0);
        System.out.println("Dump " + request.toString() + " to " + outPath.toUri().toString());
        byte[] bytes = request.toString().getBytes("UTF-8");
        KafkaETLKey dummyKey = new KafkaETLKey();
        SequenceFile.setCompressionType((Configuration)conf, (SequenceFile.CompressionType)SequenceFile.CompressionType.NONE);
        SequenceFile.Writer writer = SequenceFile.createWriter((FileSystem)fs, (Configuration)conf, (Path)outPath, KafkaETLKey.class, BytesWritable.class);
        writer.append((Writable)dummyKey, (Writable)new BytesWritable(bytes));
        writer.close();
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new Exception("Usage: - config_file");
        }
        Props props = new Props(args[0]);
        DataGenerator job = new DataGenerator("DataGenerator", props);
        job.run();
    }
}

