package co.cask.cdap.template.etl.realtime.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.templates.plugins.PluginConfig;
import co.cask.cdap.format.RecordFormats;
import co.cask.cdap.template.etl.api.Emitter;
import co.cask.cdap.template.etl.api.PipelineConfigurer;
import co.cask.cdap.template.etl.api.realtime.RealtimeContext;
import co.cask.cdap.template.etl.api.realtime.RealtimeSource;
import co.cask.cdap.template.etl.api.realtime.SourceState;
import co.cask.cdap.template.etl.realtime.kafka.Kafka08SimpleApiConsumer;
import co.cask.cdap.template.etl.realtime.kafka.KafkaSimpleApiConsumer;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Kafka")
@Description("Kafka Real-time Source: Emits a record with two fields: 'key' (nullable string) and 'message' (bytes).")
@Plugin(type = "source")
/* loaded from: input_file:co/cask/cdap/template/etl/realtime/source/KafkaSource.class */
public class KafkaSource extends RealtimeSource<StructuredRecord> {
    public static final String MESSAGE = "message";
    public static final String KEY = "key";
    public static final String KAFKA_PARTITIONS = "kafka.partitions";
    public static final String KAFKA_TOPIC = "kafka.topic";
    public static final String KAFKA_ZOOKEEPER = "kafka.zookeeper";
    public static final String KAFKA_BROKERS = "kafka.brokers";
    public static final String KAFKA_DEFAULT_OFFSET = "kafka.default.offset";
    public static final String SCHEMA = "schema";
    public static final String FORMAT = "format";
    private static final String FORMAT_SETTING_PREFIX = "format.setting.";
    private KafkaSimpleApiConsumer kafkaConsumer;
    private KafkaPluginConfig config;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
    private static final Schema DEFAULT_SCHEMA = Schema.recordOf("Kafka Message", new Schema.Field[]{Schema.Field.of("message", Schema.of(Schema.Type.BYTES)), Schema.Field.of("key", Schema.nullableOf(Schema.of(Schema.Type.STRING)))});

    /* loaded from: input_file:co/cask/cdap/template/etl/realtime/source/KafkaSource$KafkaPluginConfig.class */
    public static class KafkaPluginConfig extends PluginConfig {

        @Name(KafkaSource.KAFKA_PARTITIONS)
        @Description("Number of partitions.")
        private final Integer partitions;

        @Name(KafkaSource.KAFKA_TOPIC)
        @Description("Topic of the messages.")
        private final String topic;

        @Name(KafkaSource.KAFKA_ZOOKEEPER)
        @Description("The connect string location of Zookeeper. Either this or the list of brokers is required.")
        @Nullable
        private final String zkConnect;

        @Name(KafkaSource.KAFKA_BROKERS)
        @Description("Comma-separated list of Kafka brokers. Either this or the Zookeeper connect info is required.")
        @Nullable
        private final String kafkaBrokers;

        @Name(KafkaSource.KAFKA_DEFAULT_OFFSET)
        @Description("The default offset for the partition. Default value is kafka.api.OffsetRequest.EarliestTime.")
        @Nullable
        private final Long defaultOffset;

        @Name("schema")
        @Description("Optional schema for the body of Kafka events. The schema is used in conjunction with the format to parse Kafka payloads. Some formats (such as the 'avro' format) require schema while others do not. The schema given is for the body of the Kafka event.")
        @Nullable
        private final String schema;

        @Name("format")
        @Description("Optional format of the Kafka event. Any format supported by CDAP is supported. For example, a value of 'csv' will attempt to parse Kafka payloads as comma-separated values. If no format is given, Kafka message payloads will be treated as bytes, resulting in a two-field schema: 'key' of type string (which is nullable) and 'payload' of type bytes.")
        @Nullable
        private final String format;

        public KafkaPluginConfig(String str, String str2, Integer num, String str3, Long l, @Nullable String str4, @Nullable String str5) {
            this.zkConnect = str;
            this.kafkaBrokers = str2;
            this.partitions = num;
            this.topic = str3;
            this.defaultOffset = l;
            this.schema = str5;
            this.format = str4;
        }

        public Integer getPartitions() {
            return this.partitions;
        }

        public String getTopic() {
            return this.topic;
        }

        @Nullable
        public String getZkConnect() {
            return this.zkConnect;
        }

        @Nullable
        public String getKafkaBrokers() {
            return this.kafkaBrokers;
        }

        @Nullable
        public Long getDefaultOffset() {
            return this.defaultOffset;
        }

        /* JADX INFO: Access modifiers changed from: private */
        @Nullable
        public FormatSpecification getFormatSpec() {
            FormatSpecification formatSpecification = null;
            if (!Strings.isNullOrEmpty(this.format)) {
                Schema parseSchema = parseSchema();
                if (getProperties() != null) {
                    ImmutableMap.Builder builder = ImmutableMap.builder();
                    for (Map.Entry entry : getProperties().getProperties().entrySet()) {
                        if (((String) entry.getKey()).startsWith(KafkaSource.FORMAT_SETTING_PREFIX)) {
                            String str = (String) entry.getKey();
                            builder.put(str.substring(KafkaSource.FORMAT_SETTING_PREFIX.length(), str.length()), entry.getValue());
                        }
                    }
                    formatSpecification = new FormatSpecification(this.format, parseSchema, builder.build());
                } else {
                    formatSpecification = new FormatSpecification(this.format, parseSchema, (Map) null);
                }
            }
            return formatSpecification;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Schema parseSchema() {
            try {
                if (Strings.isNullOrEmpty(this.schema)) {
                    return null;
                }
                return Schema.parseJson(this.schema);
            } catch (IOException e) {
                throw new IllegalArgumentException("Invalid schema: " + e.getMessage());
            }
        }
    }

    public KafkaSource(KafkaPluginConfig kafkaPluginConfig) {
        this.config = kafkaPluginConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        if (Strings.isNullOrEmpty(this.config.schema)) {
            return;
        }
        this.config.parseSchema();
    }

    public void initialize(RealtimeContext realtimeContext) throws Exception {
        super.initialize(realtimeContext);
        this.kafkaConsumer = new Kafka08SimpleApiConsumer(this);
        this.kafkaConsumer.initialize(realtimeContext);
        if (Strings.isNullOrEmpty(this.config.format)) {
            return;
        }
        FormatSpecification formatSpec = this.config.getFormatSpec();
        RecordFormats.createInitializedFormat(formatSpec).initialize(formatSpec);
    }

    @Nullable
    public SourceState poll(Emitter<StructuredRecord> emitter, SourceState sourceState) {
        try {
            this.kafkaConsumer.saveState(sourceState);
            this.kafkaConsumer.pollMessages(emitter);
            return new SourceState(this.kafkaConsumer.getSavedState());
        } catch (Throwable th) {
            LOG.error("Error encountered during poll to get message for Kafka source.", th);
            return sourceState;
        }
    }

    public StructuredRecord byteBufferToStructuredRecord(@Nullable String str, ByteBuffer byteBuffer) {
        FormatSpecification formatSpec = this.config.getFormatSpec();
        if (Strings.isNullOrEmpty(this.config.format)) {
            return byteBufferToSchemalessByteRecord(str, byteBuffer);
        }
        try {
            return (StructuredRecord) RecordFormats.createInitializedFormat(formatSpec).read(new StreamEvent(byteBuffer));
        } catch (Exception e) {
            LOG.debug("Could not parse Kafka payload into schema. Using default structured record instead.");
            return byteBufferToSchemalessByteRecord(str, byteBuffer);
        }
    }

    private StructuredRecord byteBufferToSchemalessByteRecord(@Nullable String str, ByteBuffer byteBuffer) {
        StructuredRecord.Builder builder = StructuredRecord.builder(DEFAULT_SCHEMA);
        if (str != null) {
            builder.set("key", str);
        }
        builder.set("message", byteBuffer);
        return builder.build();
    }

    @Nullable
    public KafkaPluginConfig getConfig() {
        return this.config;
    }
}
