package org.apache.gobblin.kafka.tool;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry;
import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryFactory;
import org.apache.gobblin.kafka.serialize.LiAvroDeserializer;
import org.apache.gobblin.kafka.serialize.MD5Digest;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/kafka/tool/SimpleKafkaConsumer.class */
public class SimpleKafkaConsumer {
    private static final Logger log = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
    private final ConsumerConnector consumer;
    private final KafkaStream<byte[], byte[]> stream;
    private final ConsumerIterator<byte[], byte[]> iterator;
    private final String topic;
    private final KafkaSchemaRegistry<MD5Digest, Schema> schemaRegistry;
    private final Deserializer deserializer;

    public SimpleKafkaConsumer(Properties properties, KafkaCheckpoint kafkaCheckpoint) {
        Config parseProperties = ConfigFactory.parseProperties(properties);
        this.topic = parseProperties.getString("topic");
        String string = parseProperties.getString("zookeeper.connect");
        this.schemaRegistry = KafkaSchemaRegistryFactory.getSchemaRegistry(properties);
        this.deserializer = new LiAvroDeserializer(this.schemaRegistry);
        Properties properties2 = new Properties();
        properties2.put("zookeeper.connect", string);
        properties2.put("group.id", "gobblin-tool-" + System.nanoTime());
        properties2.put("zookeeper.session.timeout.ms", "10000");
        properties2.put("zookeeper.sync.time.ms", "10000");
        properties2.put("auto.commit.interval.ms", "10000");
        properties2.put("auto.offset.reset", "smallest");
        properties2.put("auto.commit.enable", "false");
        this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties2));
        this.stream = (KafkaStream) ((List) this.consumer.createMessageStreams(ImmutableMap.of(this.topic, 1)).get(this.topic)).get(0);
        this.iterator = this.stream.iterator();
    }

    public void close() {
        this.consumer.shutdown();
    }

    public void shutdown() {
        close();
    }

    public static void main(String[] strArr) throws IOException {
        Preconditions.checkArgument(strArr.length >= 1, "Usage: java " + SimpleKafkaConsumer.class.getName() + " <properties_file> <checkpoint_file>");
        String str = strArr[0];
        Properties properties = new Properties();
        properties.load(new FileInputStream(new File(str)));
        KafkaCheckpoint emptyCheckpoint = KafkaCheckpoint.emptyCheckpoint();
        File file = null;
        if (strArr.length > 1) {
            try {
                file = new File(strArr[1]);
                if (file.exists()) {
                    FileInputStream fileInputStream = null;
                    try {
                        fileInputStream = new FileInputStream(file);
                        emptyCheckpoint = KafkaCheckpoint.deserialize(fileInputStream);
                        if (fileInputStream != null) {
                            fileInputStream.close();
                        }
                    } catch (Throwable th) {
                        if (fileInputStream != null) {
                            fileInputStream.close();
                        }
                        throw th;
                    }
                } else {
                    log.info("Checkpoint doesn't exist, we will start with an empty one and store it here.");
                }
            } catch (IOException e) {
                log.warn("Could not deserialize the previous checkpoint. Starting with empty", e);
                if (!emptyCheckpoint.isEmpty()) {
                    emptyCheckpoint = KafkaCheckpoint.emptyCheckpoint();
                }
            }
        }
        SimpleKafkaConsumer simpleKafkaConsumer = new SimpleKafkaConsumer(properties, emptyCheckpoint);
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.gobblin.kafka.tool.SimpleKafkaConsumer.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                SimpleKafkaConsumer.log.info("Shutting down...");
                SimpleKafkaConsumer.this.shutdown();
            }
        });
        simpleKafkaConsumer.printLoop(emptyCheckpoint, file);
    }

    private void printLoop(KafkaCheckpoint kafkaCheckpoint, File file) throws IOException {
        boolean z = file != null;
        if (z && file.createNewFile()) {
            log.info("Created new checkpoint file: " + file.getAbsolutePath());
        }
        while (this.iterator.hasNext()) {
            try {
                try {
                    MessageAndMetadata next = this.iterator.next();
                    if (next != null) {
                        byte[] bArr = (byte[]) next.message();
                        System.out.println("Got a message of size " + bArr.length + " bytes");
                        System.out.println(((GenericRecord) this.deserializer.deserialize(this.topic, bArr)).toString());
                        kafkaCheckpoint.update(next.partition(), next.offset());
                    }
                    if (z && kafkaCheckpoint != KafkaCheckpoint.emptyCheckpoint()) {
                        System.out.println("Storing checkpoint to file: " + file.getAbsolutePath());
                        KafkaCheckpoint.serialize(kafkaCheckpoint, file);
                    }
                } catch (RuntimeException e) {
                    log.warn("Error detected", e);
                    if (z && kafkaCheckpoint != KafkaCheckpoint.emptyCheckpoint()) {
                        System.out.println("Storing checkpoint to file: " + file.getAbsolutePath());
                        KafkaCheckpoint.serialize(kafkaCheckpoint, file);
                    }
                }
            } finally {
                if (z && kafkaCheckpoint != KafkaCheckpoint.emptyCheckpoint()) {
                    System.out.println("Storing checkpoint to file: " + file.getAbsolutePath());
                    KafkaCheckpoint.serialize(kafkaCheckpoint, file);
                }
            }
        }
    }
}
