package io.openlineage.spark.agent.lifecycle.plan;

import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.QueryPlanVisitor;
import java.util.List;
import lombok.NonNull;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openlineage/spark/agent/lifecycle/plan/StreamingDataSourceV2RelationVisitor.class */
public class StreamingDataSourceV2RelationVisitor extends QueryPlanVisitor<StreamingDataSourceV2Relation, OpenLineage.InputDataset> {
    private static final Logger log = LoggerFactory.getLogger(StreamingDataSourceV2RelationVisitor.class);
    private static final String KAFKA_MICRO_BATCH_STREAM_CLASS_NAME = "org.apache.spark.sql.kafka010.KafkaMicroBatchStream";
    private static final String KINESIS_MICRO_BATCH_STREAM_CLASS_NAME = "org.apache.spark.sql.connector.kinesis.KinesisV2MicrobatchStream";
    private static final String MONGO_MICRO_BATCH_STREAM_CLASS_NAME = "com.mongodb.spark.sql.connector.read.MongoMicroBatchStream";

    public StreamingDataSourceV2RelationVisitor(@NonNull OpenLineageContext openLineageContext) {
        super(openLineageContext);
        if (openLineageContext == null) {
            throw new NullPointerException("context is marked non-null but is null");
        }
    }

    public List<OpenLineage.InputDataset> apply(LogicalPlan logicalPlan) {
        log.info("Applying {} to a logical plan with type {}", getClass().getSimpleName(), logicalPlan.getClass().getCanonicalName());
        return selectStrategy((StreamingDataSourceV2Relation) logicalPlan).getInputDatasets();
    }

    @Override // io.openlineage.spark.api.QueryPlanVisitor
    public boolean isDefinedAt(LogicalPlan logicalPlan) {
        boolean z = logicalPlan instanceof StreamingDataSourceV2Relation;
        if (log.isDebugEnabled()) {
            log.debug("The result of checking whether {} is an instance of {} is {}", new Object[]{logicalPlan.getClass().getCanonicalName(), StreamingDataSourceV2Relation.class.getCanonicalName(), Boolean.valueOf(z)});
        }
        return z;
    }

    public StreamStrategy selectStrategy(StreamingDataSourceV2Relation streamingDataSourceV2Relation) {
        StreamStrategy noOpStreamStrategy;
        String canonicalName = streamingDataSourceV2Relation.stream().getClass().getCanonicalName();
        if (KAFKA_MICRO_BATCH_STREAM_CLASS_NAME.equals(canonicalName)) {
            noOpStreamStrategy = new KafkaMicroBatchStreamStrategy(inputDataset(), streamingDataSourceV2Relation.schema(), streamingDataSourceV2Relation.stream(), ScalaConversionUtils.asJavaOptional(streamingDataSourceV2Relation.startOffset()));
        } else if (KINESIS_MICRO_BATCH_STREAM_CLASS_NAME.equals(canonicalName)) {
            noOpStreamStrategy = new KinesisMicroBatchStreamStrategy(inputDataset(), streamingDataSourceV2Relation);
        } else if (MONGO_MICRO_BATCH_STREAM_CLASS_NAME.equals(canonicalName)) {
            noOpStreamStrategy = new MongoMicroBatchStreamStrategy(inputDataset(), streamingDataSourceV2Relation);
        } else {
            log.warn("The {} has been selected because no rules have matched for the stream class of {}", NoOpStreamStrategy.class, canonicalName);
            noOpStreamStrategy = new NoOpStreamStrategy(inputDataset(), streamingDataSourceV2Relation.schema(), streamingDataSourceV2Relation.stream(), ScalaConversionUtils.asJavaOptional(streamingDataSourceV2Relation.startOffset()));
        }
        log.info("Selected this strategy: {}", noOpStreamStrategy.getClass().getSimpleName());
        return noOpStreamStrategy;
    }
}
