package datahub.client.kafka;

import com.linkedin.mxe.MetadataChangeProposal;
import datahub.client.Callback;
import datahub.client.Emitter;
import datahub.client.MetadataWriteResponse;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.event.UpsertAspectRequest;
import datahub.shaded.io.confluent.kafka.serializers.KafkaAvroSerializer;
import datahub.shaded.org.apache.kafka.clients.admin.AdminClient;
import datahub.shaded.org.apache.kafka.clients.admin.ListTopicsOptions;
import datahub.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import datahub.shaded.org.apache.kafka.clients.producer.ProducerConfig;
import datahub.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import datahub.shaded.org.apache.kafka.clients.producer.RecordMetadata;
import datahub.shaded.org.apache.kafka.common.serialization.StringSerializer;
import datahub.shaded.org.slf4j.Logger;
import datahub.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Generated;

/* loaded from: input_file:datahub/client/kafka/KafkaEmitter.class */
public class KafkaEmitter implements Emitter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaEmitter.class);
    public static final String DEFAULT_MCP_KAFKA_TOPIC = "MetadataChangeProposal_v1";
    private final KafkaEmitterConfig config;
    private final KafkaProducer<Object, Object> producer;
    private final Properties kafkaConfigProperties;
    private AvroSerializer _avroSerializer;
    private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;
    private final String mcpKafkaTopic;

    public KafkaEmitter(KafkaEmitterConfig kafkaEmitterConfig) throws IOException {
        this(kafkaEmitterConfig, DEFAULT_MCP_KAFKA_TOPIC);
    }

    public KafkaEmitter(KafkaEmitterConfig kafkaEmitterConfig, String str) throws IOException {
        this.config = kafkaEmitterConfig;
        this.kafkaConfigProperties = new Properties();
        this.kafkaConfigProperties.put("bootstrap.servers", this.config.getBootstrap());
        this.kafkaConfigProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        this.kafkaConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        this.kafkaConfigProperties.put("schema.registry.url", this.config.getSchemaRegistryUrl());
        this.kafkaConfigProperties.putAll(kafkaEmitterConfig.getSchemaRegistryConfig());
        this.kafkaConfigProperties.putAll(kafkaEmitterConfig.getProducerConfig());
        this.producer = new KafkaProducer<>(this.kafkaConfigProperties);
        this._avroSerializer = new AvroSerializer();
        this.mcpKafkaTopic = str;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.producer.close();
    }

    @Override // datahub.client.Emitter
    public Future<MetadataWriteResponse> emit(MetadataChangeProposalWrapper metadataChangeProposalWrapper, Callback callback) throws IOException {
        return emit(this.config.getEventFormatter().convert(metadataChangeProposalWrapper), callback);
    }

    @Override // datahub.client.Emitter
    public Future<MetadataWriteResponse> emit(MetadataChangeProposal metadataChangeProposal, final Callback callback) throws IOException {
        ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(this.mcpKafkaTopic, metadataChangeProposal.getEntityUrn().toString(), this._avroSerializer.serialize(metadataChangeProposal));
        datahub.shaded.org.apache.kafka.clients.producer.Callback callback2 = new datahub.shaded.org.apache.kafka.clients.producer.Callback() { // from class: datahub.client.kafka.KafkaEmitter.1
            @Override // datahub.shaded.org.apache.kafka.clients.producer.Callback
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                callback.onCompletion(KafkaEmitter.mapResponse(recordMetadata, exc));
            }
        };
        log.debug("Emit: topic: {} \n record: {}", DEFAULT_MCP_KAFKA_TOPIC, producerRecord);
        return mapFuture(this.producer.send(producerRecord, callback2));
    }

    private Future<MetadataWriteResponse> mapFuture(final Future<RecordMetadata> future) {
        return new Future<MetadataWriteResponse>() { // from class: datahub.client.kafka.KafkaEmitter.2
            @Override // java.util.concurrent.Future
            public boolean cancel(boolean z) {
                return future.cancel(z);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Future
            public MetadataWriteResponse get() throws InterruptedException, ExecutionException {
                return KafkaEmitter.mapResponse((RecordMetadata) future.get(), null);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Future
            public MetadataWriteResponse get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                return KafkaEmitter.mapResponse((RecordMetadata) future.get(j, timeUnit), null);
            }

            @Override // java.util.concurrent.Future
            public boolean isCancelled() {
                return future.isCancelled();
            }

            @Override // java.util.concurrent.Future
            public boolean isDone() {
                return future.isDone();
            }
        };
    }

    @Override // datahub.client.Emitter
    public boolean testConnection() throws IOException, ExecutionException, InterruptedException {
        try {
            AdminClient create = AdminClient.create(this.kafkaConfigProperties);
            try {
                log.info("Available topics:" + String.valueOf(create.listTopics(new ListTopicsOptions().timeoutMs((Integer) 5000)).listings().get()));
                if (create != null) {
                    create.close();
                }
                return true;
            } finally {
            }
        } catch (ExecutionException e) {
            log.error("Kafka is not available, timed out after {} ms", (Object) 5000);
            return false;
        }
    }

    @Override // datahub.client.Emitter
    public Future<MetadataWriteResponse> emit(List<UpsertAspectRequest> list, Callback callback) throws IOException {
        throw new UnsupportedOperationException("UpsertAspectRequest cannot be sent over Kafka");
    }

    private static MetadataWriteResponse mapResponse(RecordMetadata recordMetadata, Exception exc) {
        MetadataWriteResponse.MetadataWriteResponseBuilder builder = MetadataWriteResponse.builder();
        if (exc == null) {
            builder.success(true);
            builder.underlyingResponse(recordMetadata);
            builder.responseContent(recordMetadata.toString());
        } else {
            builder.success(false);
            builder.underlyingResponse(exc);
            builder.responseContent(exc.toString());
        }
        return builder.build();
    }

    public Properties getKafkaConfigProperties() {
        return this.kafkaConfigProperties;
    }
}
