package datahub.spark;

import com.linkedin.common.UrnArray;
import com.linkedin.data.template.JacksonDataTemplateCodec;
import com.linkedin.data.template.StringMap;
import com.linkedin.dataprocess.DataProcessInstanceRelationships;
import com.linkedin.dataprocess.RunResultType;
import com.linkedin.metadata.Constants;
import com.linkedin.mxe.MetadataChangeProposal;
import datahub.client.Emitter;
import datahub.client.MetadataWriteResponse;
import datahub.client.file.FileEmitter;
import datahub.client.kafka.KafkaEmitter;
import datahub.client.rest.RestEmitter;
import datahub.client.s3.S3Emitter;
import datahub.event.EventFormatter;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.spark.conf.FileDatahubEmitterConfig;
import datahub.spark.conf.KafkaDatahubEmitterConfig;
import datahub.spark.conf.RestDatahubEmitterConfig;
import datahub.spark.conf.S3DatahubEmitterConfig;
import datahub.spark.conf.SparkLineageConf;
import io.acryl.shaded.jackson.annotation.JsonInclude;
import io.acryl.shaded.jackson.core.StreamReadConstraints;
import io.acryl.shaded.jackson.databind.ObjectMapper;
import io.datahubproject.openlineage.converter.OpenLineageToDataHub;
import io.datahubproject.openlineage.dataset.DatahubDataset;
import io.datahubproject.openlineage.dataset.DatahubJob;
import io.datahubproject.openlineage.utils.DatahubUtils;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.api.SparkOpenLineageConfig;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:datahub/spark/DatahubEventEmitter.class */
public class DatahubEventEmitter extends EventEmitter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DatahubEventEmitter.class);
    private final AtomicBoolean streaming;
    private final List<DatahubJob> _datahubJobs;
    private final Map<String, MetadataChangeProposalWrapper> schemaMap;
    private SparkLineageConf datahubConf;
    private static final int DEFAULT_TIMEOUT_SEC = 10;
    private final ObjectMapper objectMapper;
    private final JacksonDataTemplateCodec dataTemplateCodec;
    private final EventFormatter eventFormatter;

    public DatahubEventEmitter(SparkOpenLineageConfig sparkOpenLineageConfig, String str) throws URISyntaxException {
        super(sparkOpenLineageConfig, str);
        this.streaming = new AtomicBoolean(false);
        this._datahubJobs = new LinkedList();
        this.schemaMap = new HashMap();
        this.eventFormatter = new EventFormatter();
        this.objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
        this.objectMapper.getFactory().setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.parseInt(System.getenv().getOrDefault(Constants.INGESTION_MAX_SERIALIZED_STRING_LENGTH, Constants.MAX_JACKSON_STRING_SIZE))).build());
        this.dataTemplateCodec = new JacksonDataTemplateCodec(this.objectMapper.getFactory());
    }

    private Optional<Emitter> getEmitter() {
        Optional<Emitter> empty = Optional.empty();
        if (this.datahubConf.getDatahubEmitterConfig() == null) {
            log.error("No Transport set. DataHub Lineage emission will not work");
        } else if (this.datahubConf.getDatahubEmitterConfig() instanceof RestDatahubEmitterConfig) {
            empty = Optional.of(new RestEmitter(((RestDatahubEmitterConfig) this.datahubConf.getDatahubEmitterConfig()).getRestEmitterConfig()));
        } else if (this.datahubConf.getDatahubEmitterConfig() instanceof KafkaDatahubEmitterConfig) {
            KafkaDatahubEmitterConfig kafkaDatahubEmitterConfig = (KafkaDatahubEmitterConfig) this.datahubConf.getDatahubEmitterConfig();
            try {
                empty = Optional.of(new KafkaEmitter(kafkaDatahubEmitterConfig.getKafkaEmitterConfig(), kafkaDatahubEmitterConfig.getMcpTopic()));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } else if (this.datahubConf.getDatahubEmitterConfig() instanceof FileDatahubEmitterConfig) {
            empty = Optional.of(new FileEmitter(((FileDatahubEmitterConfig) this.datahubConf.getDatahubEmitterConfig()).getFileEmitterConfig()));
        } else if (this.datahubConf.getDatahubEmitterConfig() instanceof S3DatahubEmitterConfig) {
            try {
                empty = Optional.of(new S3Emitter(((S3DatahubEmitterConfig) this.datahubConf.getDatahubEmitterConfig()).getS3EmitterConfig()));
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        } else {
            log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", RestDatahubEmitterConfig.class.getName());
        }
        return empty;
    }

    public Optional<DatahubJob> convertOpenLineageRunEventToDatahubJob(OpenLineage.RunEvent runEvent) {
        Optional<DatahubJob> empty = Optional.empty();
        try {
            log.debug("Emitting lineage: {}", OpenLineageClientUtils.toJson(runEvent));
            if (isStreaming()) {
                return empty;
            }
            Optional<DatahubJob> ofNullable = Optional.ofNullable(OpenLineageToDataHub.convertRunEventToJob(runEvent, this.datahubConf.getOpenLineageConf()));
            if (!ofNullable.isPresent()) {
                return ofNullable;
            }
            log.info("Converted Job: {}, from {}", ofNullable.get(), OpenLineageClientUtils.toJson(runEvent));
            this._datahubJobs.add(ofNullable.get());
            return ofNullable;
        } catch (IOException | URISyntaxException e) {
            throw new RuntimeException("Error: " + e.getMessage(), e);
        }
    }

    @Override // io.openlineage.spark.agent.EventEmitter
    public void emit(OpenLineage.RunEvent runEvent) {
        long currentTimeMillis = System.currentTimeMillis();
        OpenLineage.RunEvent runEventFromJson = OpenLineageClientUtils.runEventFromJson(OpenLineageClientUtils.toJson(runEvent));
        Optional<DatahubJob> convertOpenLineageRunEventToDatahubJob = convertOpenLineageRunEventToDatahubJob(runEventFromJson);
        if (convertOpenLineageRunEventToDatahubJob.isPresent()) {
            if (!this.datahubConf.getTags().isEmpty()) {
                convertOpenLineageRunEventToDatahubJob.get().setFlowGlobalTags(OpenLineageToDataHub.generateTags(this.datahubConf.getTags()));
            }
            if (!this.datahubConf.getDomains().isEmpty()) {
                convertOpenLineageRunEventToDatahubJob.get().setFlowDomains(OpenLineageToDataHub.generateDomains(this.datahubConf.getDomains()));
            }
            if (isStreaming()) {
                log.info("Streaming mode is enabled. Skipping lineage emission.");
                return;
            }
            if (!this.datahubConf.isCoalesceEnabled()) {
                log.info("Emitting lineage");
                try {
                    emitMcps(convertOpenLineageRunEventToDatahubJob.get().toMcps(this.datahubConf.getOpenLineageConf()));
                    log.debug("Emitting non-coalesced lineage completed successfully: {}", OpenLineageClientUtils.toJson(runEventFromJson));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            if (this.datahubConf.isCoalesceEnabled() && this.datahubConf.isEmitCoalescePeriodically()) {
                log.info("Emitting coalesced lineage periodically");
                emitCoalesced();
                log.debug("Collecting coalesced lineage periodically completed successfully: {}", OpenLineageClientUtils.toJson(runEventFromJson));
            }
            log.info("Collecting lineage successfully in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        }
    }

    public void emitCoalesced() {
        long currentTimeMillis = System.currentTimeMillis();
        if (isStreaming()) {
            log.info("Streaming mode is enabled. Skipping lineage emission.");
            return;
        }
        if (this.datahubConf.isCoalesceEnabled()) {
            List<MetadataChangeProposal> generateCoalescedMcps = generateCoalescedMcps();
            log.info("Emitting Coalesced lineage completed successfully");
            emitMcps(generateCoalescedMcps);
        }
        log.info("Emitting coalesced lineage completed in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public List<MetadataChangeProposal> generateCoalescedMcps() {
        ArrayList arrayList = new ArrayList();
        if (this._datahubJobs.isEmpty()) {
            log.warn("No lineage events to emit. Maybe the spark job finished prematurely?");
            return arrayList;
        }
        DatahubJob build = DatahubJob.builder().build();
        AtomicLong atomicLong = new AtomicLong(NetworkClientDelegate.PollResult.WAIT_FOREVER);
        AtomicLong atomicLong2 = new AtomicLong();
        this._datahubJobs.forEach(datahubJob -> {
            log.info("Merging job stored job {} with {}", datahubJob, build);
            build.setJobUrn(DatahubUtils.jobUrn(datahubJob.getFlowUrn(), datahubJob.getFlowUrn().getFlowIdEntity()));
            build.setFlowUrn(datahubJob.getFlowUrn());
            build.setFlowPlatformInstance(datahubJob.getFlowPlatformInstance());
            if (build.getJobInfo() == null && datahubJob.getJobInfo() != null) {
                build.setJobInfo(datahubJob.getJobInfo());
                build.getJobInfo().setName(datahubJob.getFlowUrn().getFlowIdEntity());
            }
            if (datahubJob.getJobInfo() != null && datahubJob.getJobInfo().getCustomProperties() != null) {
                if (build.getJobInfo().getCustomProperties() == null) {
                    build.getJobInfo().setCustomProperties(datahubJob.getJobInfo().getCustomProperties());
                } else {
                    build.getJobInfo().setCustomProperties(new StringMap((Map<String, String>) Stream.of((Object[]) new StringMap[]{build.getJobInfo().getCustomProperties(), datahubJob.getJobInfo().getCustomProperties()}).flatMap(stringMap -> {
                        return stringMap.entrySet().stream();
                    }).collect(Collectors.toMap((v0) -> {
                        return v0.getKey();
                    }, (v0) -> {
                        return v0.getValue();
                    }, (str, str2) -> {
                        return str;
                    }))));
                }
            }
            if (build.getDataFlowInfo() == null) {
                build.setDataFlowInfo(datahubJob.getDataFlowInfo());
            }
            if (datahubJob.getStartTime() < atomicLong.get()) {
                atomicLong.set(datahubJob.getStartTime());
            }
            if (datahubJob.getEndTime() > atomicLong2.get()) {
                atomicLong2.set(datahubJob.getEndTime());
            }
            mergeDatasets(datahubJob.getOutSet(), build.getOutSet());
            mergeDatasets(datahubJob.getInSet(), build.getInSet());
            mergeDataProcessInstance(build, datahubJob);
            mergeCustomProperties(build, datahubJob);
        });
        build.setStartTime(atomicLong.get());
        build.setEndTime(atomicLong2.get());
        if (!this.datahubConf.getTags().isEmpty()) {
            build.setFlowGlobalTags(OpenLineageToDataHub.generateTags(this.datahubConf.getTags()));
        }
        if (!this.datahubConf.getDomains().isEmpty()) {
            build.setFlowDomains(OpenLineageToDataHub.generateDomains(this.datahubConf.getDomains()));
        }
        try {
            if (this.datahubConf.getOpenLineageConf().getParentJobUrn() != null) {
                build.getParentJobs().add(this.datahubConf.getOpenLineageConf().getParentJobUrn());
            }
        } catch (ClassCastException e) {
            log.warn(this.datahubConf.getOpenLineageConf().getParentJobUrn() + " is not a valid Datajob URN. Skipping setting up upstream job.");
        }
        log.info("Generating MCPs for job: {}", build);
        try {
            return build.toMcps(this.datahubConf.getOpenLineageConf());
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    private static void mergeDatasets(Set<DatahubDataset> set, Set<DatahubDataset> set2) {
        for (DatahubDataset datahubDataset : set) {
            Optional<DatahubDataset> findFirst = set2.stream().filter(datahubDataset2 -> {
                return datahubDataset2.getUrn().equals(datahubDataset.getUrn());
            }).findFirst();
            if (findFirst.isPresent()) {
                if (datahubDataset.getSchemaMetadata() != null) {
                    findFirst.get().setSchemaMetadata(datahubDataset.getSchemaMetadata());
                }
                if (datahubDataset.getLineage() != null) {
                    findFirst.get().setLineage(datahubDataset.getLineage());
                }
            } else {
                set2.add(datahubDataset);
            }
        }
    }

    private static void mergeDataProcessInstance(DatahubJob datahubJob, DatahubJob datahubJob2) {
        if (datahubJob.getDataProcessInstanceUrn() == null) {
            datahubJob.setDataProcessInstanceUrn(datahubJob2.getDataProcessInstanceUrn());
        }
        if (datahubJob2.getEventTime() > datahubJob.getEventTime()) {
            datahubJob.setEventTime(datahubJob2.getEventTime());
            datahubJob.setDataProcessInstanceProperties(datahubJob2.getDataProcessInstanceProperties());
            DataProcessInstanceRelationships dataProcessInstanceRelationships = new DataProcessInstanceRelationships();
            dataProcessInstanceRelationships.setParentTemplate(datahubJob.getJobUrn());
            dataProcessInstanceRelationships.setUpstreamInstances(new UrnArray());
            datahubJob.setDataProcessInstanceRelationships(dataProcessInstanceRelationships);
        }
        log.info("DataProcessInstanceRunEvent: {}", datahubJob2.getDataProcessInstanceRunEvent());
        if (datahubJob2.getDataProcessInstanceRunEvent() != null && datahubJob2.getDataProcessInstanceRunEvent().getResult() != null) {
            RunResultType type = datahubJob2.getDataProcessInstanceRunEvent().getResult().getType();
            if (datahubJob.getDataProcessInstanceRunEvent() == null) {
                datahubJob.setDataProcessInstanceRunEvent(datahubJob2.getDataProcessInstanceRunEvent());
            } else if (type == RunResultType.FAILURE) {
                datahubJob.setDataProcessInstanceRunEvent(datahubJob2.getDataProcessInstanceRunEvent());
            }
        }
        log.info("DataProcessInstanceRunEvent: {}", datahubJob.getDataProcessInstanceRunEvent());
    }

    private void mergeCustomProperties(DatahubJob datahubJob, DatahubJob datahubJob2) {
        if (datahubJob2.getDataFlowInfo().getCustomProperties() != null) {
            if (datahubJob.getDataFlowInfo().getCustomProperties() == null) {
                datahubJob.getDataFlowInfo().setCustomProperties(datahubJob2.getDataFlowInfo().getCustomProperties());
                return;
            }
            Map map = (Map) Stream.of((Object[]) new StringMap[]{datahubJob.getDataFlowInfo().getCustomProperties(), datahubJob2.getDataFlowInfo().getCustomProperties()}).flatMap(stringMap -> {
                return stringMap.entrySet().stream();
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }, (str, str2) -> {
                return str;
            }));
            map.put("finishedAt", ZonedDateTime.now(ZoneOffset.UTC).toString());
            if (this.datahubConf.getSparkAppContext() != null) {
                if (this.datahubConf.getSparkAppContext().getStartTime() != null) {
                    map.put("startedAt", ZonedDateTime.ofInstant(Instant.ofEpochMilli(this.datahubConf.getSparkAppContext().getStartTime().longValue()), ZoneOffset.UTC).toString());
                }
                if (this.datahubConf.getSparkAppContext().getAppAttemptId() != null) {
                    map.put("attemptId", this.datahubConf.getSparkAppContext().getAppAttemptId());
                }
                if (this.datahubConf.getSparkAppContext().getSparkUser() != null) {
                    map.put("sparkUser", this.datahubConf.getSparkAppContext().getSparkUser());
                }
                if (this.datahubConf.getSparkAppContext().getAppId() != null) {
                    map.put("appId", this.datahubConf.getSparkAppContext().getAppId());
                }
                if (this.datahubConf.getSparkAppContext().getDatabricksTags() != null) {
                    map.putAll(this.datahubConf.getSparkAppContext().getDatabricksTags());
                }
            }
            datahubJob.getDataFlowInfo().setCustomProperties(new StringMap((Map<String, String>) map));
        }
    }

    protected void emitMcps(List<MetadataChangeProposal> list) {
        Optional<Emitter> emitter = getEmitter();
        if (emitter.isPresent()) {
            ((List) list.stream().map(metadataChangeProposal -> {
                try {
                    if (this.datahubConf.isLogMcps()) {
                        log.info("emitting mcpw: {}", this.dataTemplateCodec.mapToString(metadataChangeProposal.data()));
                    } else {
                        log.info("emitting aspect: {} for urn: {}", metadataChangeProposal.getAspectName(), metadataChangeProposal.getEntityUrn());
                    }
                    return ((Emitter) emitter.get()).emit(metadataChangeProposal);
                } catch (IOException e) {
                    log.error("Failed to emit metadata to DataHub", e);
                    return null;
                }
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList())).forEach(future -> {
                try {
                    log.info(((MetadataWriteResponse) future.get(10L, TimeUnit.SECONDS)).toString());
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    log.error("Failed to emit metadata to DataHub", e);
                }
            });
            try {
                emitter.get().close();
            } catch (IOException e) {
                log.error("Issue while closing emitter" + e);
            }
        }
    }

    public void setConfig(SparkLineageConf sparkLineageConf) {
        this.datahubConf = sparkLineageConf;
    }

    public boolean isStreaming() {
        return this.streaming.get();
    }

    public void setStreaming(boolean z) {
        this.streaming.set(z);
    }
}
