package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.util.Collections;
import java.util.Optional;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.connectors.TransformationScanProvider;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;

/* loaded from: input_file:flink-table-planner.jar:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.class */
public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> implements MultipleTransformationTranslator<RowData> {
    public static final String SOURCE_TRANSFORMATION = "source";
    public static final String FIELD_NAME_SCAN_TABLE_SOURCE = "scanTableSource";

    @JsonProperty(FIELD_NAME_SCAN_TABLE_SOURCE)
    private final DynamicTableSourceSpec tableSourceSpec;

    /* JADX INFO: Access modifiers changed from: protected */
    public CommonExecTableSourceScan(int i, ExecNodeContext execNodeContext, ReadableConfig readableConfig, DynamicTableSourceSpec dynamicTableSourceSpec, LogicalType logicalType, String str) {
        super(i, execNodeContext, readableConfig, Collections.emptyList(), logicalType, str);
        this.tableSourceSpec = dynamicTableSourceSpec;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    public String getSimplifiedName() {
        return this.tableSourceSpec.getContextResolvedTable().getIdentifier().getObjectName();
    }

    public DynamicTableSourceSpec getTableSourceSpec() {
        return this.tableSourceSpec;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    public Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        StreamExecutionEnvironment execEnv = plannerBase.getExecEnv();
        TransformationMetadata createTransformationMeta = createTransformationMeta(SOURCE_TRANSFORMATION, execNodeConfig);
        InternalTypeInfo of = InternalTypeInfo.of(getOutputType());
        SourceFunctionProvider scanRuntimeProvider = this.tableSourceSpec.getScanTableSource(plannerBase.getFlinkContext()).getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
        if (scanRuntimeProvider instanceof SourceFunctionProvider) {
            SourceFunctionProvider sourceFunctionProvider = scanRuntimeProvider;
            return createTransformationMeta.fill(createSourceFunctionTransformation(execEnv, sourceFunctionProvider.createSourceFunction(), sourceFunctionProvider.isBounded(), createTransformationMeta.getName(), of));
        }
        if (scanRuntimeProvider instanceof InputFormatProvider) {
            return createTransformationMeta.fill(createInputFormatTransformation(execEnv, ((InputFormatProvider) scanRuntimeProvider).createInputFormat(), of, createTransformationMeta.getName()));
        }
        if (scanRuntimeProvider instanceof SourceProvider) {
            return createTransformationMeta.fill(execEnv.fromSource(((SourceProvider) scanRuntimeProvider).createSource(), WatermarkStrategy.noWatermarks(), createTransformationMeta.getName(), of).getTransformation());
        }
        if (scanRuntimeProvider instanceof DataStreamScanProvider) {
            Transformation<RowData> transformation = ((DataStreamScanProvider) scanRuntimeProvider).produceDataStream(createProviderContext(execNodeConfig), execEnv).getTransformation();
            createTransformationMeta.fill(transformation);
            transformation.setOutputType(of);
            return transformation;
        }
        if (!(scanRuntimeProvider instanceof TransformationScanProvider)) {
            throw new UnsupportedOperationException(scanRuntimeProvider.getClass().getSimpleName() + " is unsupported now.");
        }
        Transformation<RowData> createTransformation = ((TransformationScanProvider) scanRuntimeProvider).createTransformation(createProviderContext(execNodeConfig));
        createTransformationMeta.fill(createTransformation);
        createTransformation.setOutputType(of);
        return createTransformation;
    }

    private ProviderContext createProviderContext(ReadableConfig readableConfig) {
        return str -> {
            return (!(this instanceof StreamExecNode) || ((Boolean) readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)).booleanValue()) ? Optional.empty() : Optional.of(createTransformationUid(str));
        };
    }

    protected Transformation<RowData> createSourceFunctionTransformation(StreamExecutionEnvironment streamExecutionEnvironment, SourceFunction<RowData> sourceFunction, boolean z, String str, TypeInformation<RowData> typeInformation) {
        streamExecutionEnvironment.clean(sourceFunction);
        return new LegacySourceTransformation(str, new StreamSource(sourceFunction, !z), typeInformation, sourceFunction instanceof ParallelSourceFunction ? streamExecutionEnvironment.getParallelism() : 1, z ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED);
    }

    protected abstract Transformation<RowData> createInputFormatTransformation(StreamExecutionEnvironment streamExecutionEnvironment, InputFormat<RowData, ?> inputFormat, InternalTypeInfo<RowData> internalTypeInfo, String str);
}
