package datahub.spark.consumer.impl;

import datahub.client.Emitter;
import datahub.client.MetadataWriteResponse;
import datahub.client.kafka.KafkaEmitter;
import datahub.client.kafka.KafkaEmitterConfig;
import datahub.client.rest.RestEmitter;
import datahub.client.rest.RestEmitterConfig;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.shaded.org.slf4j.Logger;
import datahub.shaded.org.slf4j.LoggerFactory;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageEvent;
import datahub.spark2.shaded.typesafe.config.Config;
import datahub.spark2.shaded.typesafe.config.ConfigValue;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import lombok.Generated;

/* loaded from: input_file:datahub/spark/consumer/impl/McpEmitter.class */
public class McpEmitter implements LineageConsumer {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) McpEmitter.class);
    private final String emitterType;
    private Optional<RestEmitterConfig> restEmitterConfig;
    private Optional<KafkaEmitterConfig> kafkaEmitterConfig;
    private static final String TRANSPORT_KEY = "transport";
    private static final String GMS_URL_KEY = "rest.server";
    private static final String GMS_AUTH_TOKEN = "rest.token";
    private static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
    private static final String KAFKA_BOOTSTRAP = "kafka.connection.bootstrap";
    private static final String KAFKA_PRODUCER_CONFIG = "kafka.connection.producer_config";
    private static final String KAFKA_SCHEMA_REGISTRY_URL = "kafka.connection.schema_registry_url";
    private static final String KAFKA_SCHEMA_REGISTRY_CONFIG = "kafka.connection.schema_registry_config";

    private Optional<Emitter> getEmitter() {
        Optional<Emitter> empty = Optional.empty();
        String str = this.emitterType;
        boolean z = -1;
        switch (str.hashCode()) {
            case 3496916:
                if (str.equals("rest")) {
                    z = false;
                    break;
                }
                break;
            case 101807910:
                if (str.equals("kafka")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (this.restEmitterConfig.isPresent()) {
                    empty = Optional.of(new RestEmitter(this.restEmitterConfig.get()));
                    break;
                }
                break;
            case true:
                if (this.kafkaEmitterConfig.isPresent()) {
                    try {
                        empty = Optional.of(new KafkaEmitter(this.kafkaEmitterConfig.get()));
                        break;
                    } catch (IOException e) {
                        log.error("Failed to create KafkaEmitter", (Throwable) e);
                        break;
                    }
                }
                break;
            default:
                log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", this.emitterType);
                break;
        }
        return empty;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emit(List<MetadataChangeProposalWrapper> list) {
        Optional<Emitter> emitter = getEmitter();
        if (emitter.isPresent()) {
            ((List) list.stream().map(metadataChangeProposalWrapper -> {
                try {
                    log.debug("emitting mcpw: " + String.valueOf(metadataChangeProposalWrapper));
                    return ((Emitter) emitter.get()).emit(metadataChangeProposalWrapper);
                } catch (IOException e) {
                    log.error("Failed to emit metadata to DataHub", (Throwable) e);
                    return null;
                }
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList())).forEach(future -> {
                try {
                    log.info(((MetadataWriteResponse) future.get()).toString());
                } catch (InterruptedException | ExecutionException e) {
                    log.error("Failed to emit metadata to DataHub", e);
                }
            });
            try {
                emitter.get().close();
            } catch (IOException e) {
                log.error("Issue while closing emitter" + String.valueOf(e));
            }
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:13:0x005e. Please report as an issue. */
    public McpEmitter(Config config) {
        this.emitterType = config.hasPath(TRANSPORT_KEY) ? config.getString(TRANSPORT_KEY) : "rest";
        String str = this.emitterType;
        boolean z = -1;
        switch (str.hashCode()) {
            case 3496916:
                if (str.equals("rest")) {
                    z = false;
                    break;
                }
                break;
            case 101807910:
                if (str.equals("kafka")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                String string = config.hasPath(GMS_URL_KEY) ? config.getString(GMS_URL_KEY) : "http://localhost:8080";
                String string2 = config.hasPath(GMS_AUTH_TOKEN) ? config.getString(GMS_AUTH_TOKEN) : null;
                boolean z2 = config.hasPath(DISABLE_SSL_VERIFICATION_KEY) && config.getBoolean(DISABLE_SSL_VERIFICATION_KEY);
                log.info("REST Emitter Configuration: GMS url {}{}", string, config.hasPath(GMS_URL_KEY) ? "" : "(default)");
                if (string2 != null) {
                    log.info("REST Emitter Configuration: Token {}", "XXXXX");
                }
                if (z2) {
                    log.warn("REST Emitter Configuration: ssl verification will be disabled.");
                }
                this.restEmitterConfig = Optional.of(RestEmitterConfig.builder().server(string).token(string2).disableSslVerification(z2).build());
                return;
            case true:
                String string3 = config.hasPath(KAFKA_BOOTSTRAP) ? config.getString(KAFKA_BOOTSTRAP) : "localhost:9092";
                Map<String, String> map = config.hasPath(KAFKA_PRODUCER_CONFIG) ? (Map) config.getConfig(KAFKA_PRODUCER_CONFIG).entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry -> {
                    return ((ConfigValue) entry.getValue()).unwrapped().toString();
                })) : null;
                String string4 = config.hasPath(KAFKA_SCHEMA_REGISTRY_URL) ? config.getString(KAFKA_SCHEMA_REGISTRY_URL) : null;
                Map<String, String> map2 = config.hasPath(KAFKA_SCHEMA_REGISTRY_CONFIG) ? (Map) config.getConfig(KAFKA_SCHEMA_REGISTRY_CONFIG).entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry2 -> {
                    return ((ConfigValue) entry2.getValue()).unwrapped().toString();
                })) : null;
                log.info("Kafka Emitter Configuration: Kafka bootstrap {}{}", string3, config.hasPath(KAFKA_BOOTSTRAP) ? "" : "(default)");
                this.kafkaEmitterConfig = Optional.of(KafkaEmitterConfig.builder().bootstrap(string3).producerConfig(map).schemaRegistryUrl(string4).schemaRegistryConfig(map2).build());
            default:
                log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", this.emitterType);
                return;
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.function.Consumer
    public void accept(LineageEvent lineageEvent) {
        emit(lineageEvent.asMetadataEvents());
    }

    public void close() throws IOException {
    }
}
