/*
 * Decompiled with CFR 0.152.
 */
package kafka.consumer;

import java.io.OutputStream;
import java.util.Hashtable;
import java.util.Properties;
import java.util.Random;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.consumer.Blacklist;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConfig$;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.DefaultMessageFormatter;
import kafka.consumer.KafkaStream;
import kafka.consumer.MessageFormatter;
import kafka.consumer.MessageFormatter$;
import kafka.consumer.TopicFilter;
import kafka.consumer.Whitelist;
import kafka.message.MessageAndMetadata;
import kafka.metrics.KafkaMetricsReporter$;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.DefaultDecoder$;
import kafka.utils.CommandLineUtils$;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.Logging$class;
import kafka.utils.Utils$;
import kafka.utils.VerifiableProperties;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.Serializable;
import scala.collection.Iterable;
import scala.collection.JavaConversions$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;

/*
 * Duplicate member names - consider using --renamedupmembers true
 */
public final class ConsoleConsumer$
implements Logging {
    public static final ConsoleConsumer$ MODULE$;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private volatile boolean bitmap$0;

    static {
        new ConsoleConsumer$();
    }

    @Override
    public String loggerName() {
        return this.loggerName;
    }

    private Logger logger$lzycompute() {
        ConsoleConsumer$ consoleConsumer$ = this;
        synchronized (consoleConsumer$) {
            if (!this.bitmap$0) {
                this.logger = Logging$class.logger(this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    @Override
    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    @Override
    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    @Override
    public void kafka$utils$Logging$_setter_$loggerName_$eq(String x$1) {
        this.loggerName = x$1;
    }

    @Override
    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ x$1) {
        this.kafka$utils$Logging$$log4jController = x$1;
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging$class.trace(this, msg);
    }

    @Override
    public Object trace(Function0<Throwable> e) {
        return Logging$class.trace(this, e);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.trace(this, msg, e);
    }

    @Override
    public void swallowTrace(Function0<BoxedUnit> action) {
        Logging$class.swallowTrace(this, action);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging$class.debug(this, msg);
    }

    @Override
    public Object debug(Function0<Throwable> e) {
        return Logging$class.debug(this, e);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.debug(this, msg, e);
    }

    @Override
    public void swallowDebug(Function0<BoxedUnit> action) {
        Logging$class.swallowDebug(this, action);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging$class.info(this, msg);
    }

    @Override
    public Object info(Function0<Throwable> e) {
        return Logging$class.info(this, e);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.info(this, msg, e);
    }

    @Override
    public void swallowInfo(Function0<BoxedUnit> action) {
        Logging$class.swallowInfo(this, action);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging$class.warn(this, msg);
    }

    @Override
    public Object warn(Function0<Throwable> e) {
        return Logging$class.warn(this, e);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.warn(this, msg, e);
    }

    @Override
    public void swallowWarn(Function0<BoxedUnit> action) {
        Logging$class.swallowWarn(this, action);
    }

    @Override
    public void swallow(Function0<BoxedUnit> action) {
        Logging$class.swallow(this, action);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging$class.error(this, msg);
    }

    @Override
    public Object error(Function0<Throwable> e) {
        return Logging$class.error(this, e);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.error(this, msg, e);
    }

    @Override
    public void swallowError(Function0<BoxedUnit> action) {
        Logging$class.swallowError(this, action);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging$class.fatal(this, msg);
    }

    @Override
    public Object fatal(Function0<Throwable> e) {
        return Logging$class.fatal(this, e);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.fatal(this, msg, e);
    }

    public void main(String[] args) {
        OptionParser parser = new OptionParser();
        ArgumentAcceptingOptionSpec topicIdOpt = parser.accepts("topic", "The topic id to consume on.").withRequiredArg().describedAs("topic").ofType(String.class);
        ArgumentAcceptingOptionSpec whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.").withRequiredArg().describedAs("whitelist").ofType(String.class);
        ArgumentAcceptingOptionSpec blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.").withRequiredArg().describedAs("blacklist").ofType(String.class);
        ArgumentAcceptingOptionSpec zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.").withRequiredArg().describedAs("urls").ofType(String.class);
        ArgumentAcceptingOptionSpec groupIdOpt = parser.accepts("group", "The group id to consume on.").withRequiredArg().describedAs("gid").defaultsTo((Object)new StringBuilder().append((Object)"console-consumer-").append(BoxesRunTime.boxToInteger(new Random().nextInt(100000))).toString(), (Object[])new String[0]).ofType(String.class);
        ArgumentAcceptingOptionSpec fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(0x100000), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec minFetchBytesOpt = parser.accepts("min-fetch-bytes", "The min number of bytes each fetch request waits for.").withRequiredArg().describedAs("bytes").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(1), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(100), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(0x200000), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec socketTimeoutMsOpt = parser.accepts("socket-timeout-ms", "The socket timeout used for the connection to the broker").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(ConsumerConfig$.MODULE$.SocketTimeout()), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec refreshMetadataBackoffMsOpt = parser.accepts("refresh-leader-backoff-ms", "Backoff time before refreshing metadata").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(ConsumerConfig$.MODULE$.RefreshMetadataBackoffMs()), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec consumerTimeoutMsOpt = parser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much of time without incoming messages").withRequiredArg().describedAs("prop").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(-1), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.").withRequiredArg().describedAs("class").ofType(String.class).defaultsTo((Object)DefaultMessageFormatter.class.getName(), (Object[])new String[0]);
        ArgumentAcceptingOptionSpec messageFormatterArgOpt = parser.accepts("property").withRequiredArg().describedAs("prop").ofType(String.class);
        OptionSpecBuilder resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.");
        ArgumentAcceptingOptionSpec autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(ConsumerConfig$.MODULE$.AutoCommitInterval()), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.").withRequiredArg().describedAs("num_messages").ofType(Integer.class);
        OptionSpecBuilder skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, skip it instead of halt.");
        OptionSpecBuilder csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled");
        ArgumentAcceptingOptionSpec metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter isset, the csv metrics will be outputed here").withRequiredArg().describedAs("metrics dictory").ofType(String.class);
        OptionSet options = this.tryParse(parser, args);
        CommandLineUtils$.MODULE$.checkRequiredArgs(parser, options, Predef$.MODULE$.wrapRefArray((Object[])new OptionSpec[]{zkConnectOpt}));
        List topicOrFilterOpt = (List)List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray((Object[])new ArgumentAcceptingOptionSpec[]{topicIdOpt, whitelistOpt, blacklistOpt})).filter(new Serializable(options){
            public static final long serialVersionUID = 0L;
            private final OptionSet options$1;

            public final boolean apply(OptionSpec<?> x$1) {
                return this.options$1.has(x$1);
            }
            {
                this.options$1 = options$1;
            }
        });
        if (topicOrFilterOpt.size() != 1) {
            this.error((Function0<String>)((Object)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Exactly one of whitelist/blacklist/topic is required.";
                }
            }));
            parser.printHelpOn((OutputStream)System.err);
            System.exit(1);
        }
        String topicArg = (String)options.valueOf((OptionSpec)topicOrFilterOpt.head());
        TopicFilter filterSpec = options.has((OptionSpec)blacklistOpt) ? new Blacklist(topicArg) : new Whitelist(topicArg);
        boolean csvMetricsReporterEnabled = options.has((OptionSpec)csvMetricsReporterEnabledOpt);
        if (csvMetricsReporterEnabled) {
            Properties csvReporterProps = new Properties();
            ((Hashtable)csvReporterProps).put("kafka.metrics.polling.interval.secs", "5");
            ((Hashtable)csvReporterProps).put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter");
            Object object = options.has((OptionSpec)metricsDirectoryOpt) ? ((Hashtable)csvReporterProps).put("kafka.csv.metrics.dir", options.valueOf((OptionSpec)metricsDirectoryOpt)) : ((Hashtable)csvReporterProps).put("kafka.csv.metrics.dir", "kafka_metrics");
            ((Hashtable)csvReporterProps).put("kafka.csv.metrics.reporter.enabled", "true");
            VerifiableProperties verifiableProps = new VerifiableProperties(csvReporterProps);
            KafkaMetricsReporter$.MODULE$.startReporters(verifiableProps);
        }
        Properties props = new Properties();
        ((Hashtable)props).put("group.id", options.valueOf((OptionSpec)groupIdOpt));
        ((Hashtable)props).put("socket.receive.buffer.bytes", ((Integer)options.valueOf((OptionSpec)socketBufferSizeOpt)).toString());
        ((Hashtable)props).put("socket.timeout.ms", ((Integer)options.valueOf((OptionSpec)socketTimeoutMsOpt)).toString());
        ((Hashtable)props).put("fetch.message.max.bytes", ((Integer)options.valueOf((OptionSpec)fetchSizeOpt)).toString());
        ((Hashtable)props).put("fetch.min.bytes", ((Integer)options.valueOf((OptionSpec)minFetchBytesOpt)).toString());
        ((Hashtable)props).put("fetch.wait.max.ms", ((Integer)options.valueOf((OptionSpec)maxWaitMsOpt)).toString());
        ((Hashtable)props).put("auto.commit.enable", "true");
        ((Hashtable)props).put("auto.commit.interval.ms", ((Integer)options.valueOf((OptionSpec)autoCommitIntervalOpt)).toString());
        ((Hashtable)props).put("auto.offset.reset", options.has((OptionSpec)resetBeginningOpt) ? "smallest" : "largest");
        ((Hashtable)props).put("zookeeper.connect", options.valueOf((OptionSpec)zkConnectOpt));
        ((Hashtable)props).put("consumer.timeout.ms", ((Integer)options.valueOf((OptionSpec)consumerTimeoutMsOpt)).toString());
        ((Hashtable)props).put("refresh.leader.backoff.ms", ((Integer)options.valueOf((OptionSpec)refreshMetadataBackoffMsOpt)).toString());
        ConsumerConfig config = new ConsumerConfig(props);
        boolean skipMessageOnError = options.has((OptionSpec)skipMessageOnErrorOpt);
        Class<?> messageFormatterClass = Class.forName((String)options.valueOf((OptionSpec)messageFormatterOpt));
        Properties formatterArgs = MessageFormatter$.MODULE$.tryParseFormatterArgs(JavaConversions$.MODULE$.asScalaBuffer(options.valuesOf((OptionSpec)messageFormatterArgOpt)));
        int maxMessages = options.has((OptionSpec)maxMessagesOpt) ? (Integer)options.valueOf((OptionSpec)maxMessagesOpt) : -1;
        ConsumerConnector connector = Consumer$.MODULE$.create(config);
        if (options.has((OptionSpec)resetBeginningOpt)) {
            ZkUtils$.MODULE$.maybeDeletePath((String)options.valueOf((OptionSpec)zkConnectOpt), new StringBuilder().append((Object)"/consumers/").append(options.valueOf((OptionSpec)groupIdOpt)).toString());
        }
        Runtime.getRuntime().addShutdownHook(new Thread(zkConnectOpt, groupIdOpt, options, connector){
            private final ArgumentAcceptingOptionSpec zkConnectOpt$1;
            private final ArgumentAcceptingOptionSpec groupIdOpt$1;
            private final OptionSet options$1;
            private final ConsumerConnector connector$1;

            public void run() {
                this.connector$1.shutdown();
                if (!this.options$1.has((OptionSpec)this.groupIdOpt$1)) {
                    ZkUtils$.MODULE$.maybeDeletePath((String)this.options$1.valueOf((OptionSpec)this.zkConnectOpt$1), new StringBuilder().append((Object)"/consumers/").append(this.options$1.valueOf((OptionSpec)this.groupIdOpt$1)).toString());
                }
            }
            {
                this.zkConnectOpt$1 = zkConnectOpt$1;
                this.groupIdOpt$1 = groupIdOpt$1;
                this.options$1 = options$1;
                this.connector$1 = connector$1;
            }
        });
        LongRef numMessages = new LongRef(0L);
        MessageFormatter formatter = (MessageFormatter)messageFormatterClass.newInstance();
        formatter.init(formatterArgs);
        try {
            KafkaStream<byte[], byte[]> stream = JavaConversions$.MODULE$.seqAsJavaList(connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(DefaultDecoder$.MODULE$.$lessinit$greater$default$1()), new DefaultDecoder(DefaultDecoder$.MODULE$.$lessinit$greater$default$1()))).get(0);
            KafkaStream<byte[], byte[]> iter2 = maxMessages >= 0 ? (Iterable)stream.slice(0, maxMessages) : stream;
            iter2.foreach(new Serializable(skipMessageOnError, connector, numMessages, formatter){
                public static final long serialVersionUID = 0L;
                private final boolean skipMessageOnError$1;
                private final ConsumerConnector connector$1;
                private final LongRef numMessages$1;
                private final MessageFormatter formatter$1;

                public final void apply(MessageAndMetadata<byte[], byte[]> messageAndTopic) {
                    Throwable throwable2;
                    block3: {
                        try {
                            this.formatter$1.writeTo(messageAndTopic.key(), messageAndTopic.message(), System.out);
                            ++this.numMessages$1.elem;
                        }
                        catch (Throwable throwable2) {
                            if (!this.skipMessageOnError$1) break block3;
                            ConsoleConsumer$.MODULE$.error((Function0<String>)((Object)new Serializable(this){
                                public static final long serialVersionUID = 0L;

                                public final String apply() {
                                    return "Error processing message, skipping this message: ";
                                }
                            }), (Function0<Throwable>)((Object)new Serializable(this, throwable2){
                                public static final long serialVersionUID = 0L;
                                private final Throwable e$1;

                                public final Throwable apply() {
                                    return this.e$1;
                                }
                                {
                                    this.e$1 = e$1;
                                }
                            }));
                        }
                        if (System.out.checkError()) {
                            System.err.println("Unable to write to standard out, closing consumer.");
                            System.err.println(new StringOps(Predef$.MODULE$.augmentString("Consumed %d messages")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(this.numMessages$1.elem)})));
                            this.formatter$1.close();
                            this.connector$1.shutdown();
                            System.exit(1);
                        }
                        return;
                    }
                    throw throwable2;
                }
                {
                    this.skipMessageOnError$1 = skipMessageOnError$1;
                    this.connector$1 = connector$1;
                    this.numMessages$1 = numMessages$1;
                    this.formatter$1 = formatter$1;
                }
            });
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)((Object)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Error processing message, stopping consumer: ";
                }
            }), (Function0<Throwable>)((Object)new Serializable(throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable e$2;

                public final Throwable apply() {
                    return this.e$2;
                }
                {
                    this.e$2 = e$2;
                }
            }));
        }
        System.err.println(new StringOps(Predef$.MODULE$.augmentString("Consumed %d messages")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(numMessages.elem)})));
        System.out.flush();
        formatter.close();
        connector.shutdown();
    }

    public OptionSet tryParse(OptionParser parser, String[] args) {
        OptionSet optionSet;
        try {
            optionSet = parser.parse(args);
        }
        catch (OptionException optionException) {
            Utils$.MODULE$.croak(optionException.getMessage());
            optionSet = null;
        }
        return optionSet;
    }

    public void tryCleanupZookeeper(String zkUrl, String groupId) {
        try {
            String dir = new StringBuilder().append((Object)"/consumers/").append((Object)groupId).toString();
            this.info((Function0<String>)((Object)new Serializable(dir){
                public static final long serialVersionUID = 0L;
                private final String dir$1;

                public final String apply() {
                    return new StringBuilder().append((Object)"Cleaning up temporary zookeeper data under ").append((Object)this.dir$1).append((Object)".").toString();
                }
                {
                    this.dir$1 = dir$1;
                }
            }));
            ZkClient zk = new ZkClient(zkUrl, 30000, 30000, ZKStringSerializer$.MODULE$);
            zk.deleteRecursive(dir);
            zk.close();
        }
        catch (Throwable throwable) {}
    }

    private ConsoleConsumer$() {
        MODULE$ = this;
        Logging$class.$init$(this);
    }
}

