package org.apache.flink.cdc.composer.flink.translator;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.class */
public class DataSourceTranslator {
    public DataStreamSource<Event> translate(SourceDef sourceDef, StreamExecutionEnvironment streamExecutionEnvironment, Configuration configuration) {
        DataSource createDataSource = FactoryDiscoveryUtils.getFactoryByIdentifier(sourceDef.getType(), DataSourceFactory.class).createDataSource(new FactoryHelper.DefaultContext(sourceDef.getConfig(), configuration, Thread.currentThread().getContextClassLoader()));
        FactoryDiscoveryUtils.getJarPathByIdentifier(sourceDef.getType(), DataSourceFactory.class).ifPresent(url -> {
            FlinkEnvironmentUtils.addJar(streamExecutionEnvironment, url);
        });
        int intValue = ((Integer) configuration.get(PipelineOptions.PIPELINE_PARALLELISM)).intValue();
        FlinkSourceProvider eventSourceProvider = createDataSource.getEventSourceProvider();
        if (eventSourceProvider instanceof FlinkSourceProvider) {
            return streamExecutionEnvironment.fromSource(eventSourceProvider.getSource(), WatermarkStrategy.noWatermarks(), sourceDef.getName().orElse(generateDefaultSourceName(sourceDef)), new EventTypeInfo()).setParallelism(intValue);
        }
        if (!(eventSourceProvider instanceof FlinkSourceFunctionProvider)) {
            throw new IllegalStateException(String.format("Unsupported EventSourceProvider type \"%s\"", eventSourceProvider.getClass().getCanonicalName()));
        }
        DataStreamSource<Event> parallelism = streamExecutionEnvironment.addSource(((FlinkSourceFunctionProvider) eventSourceProvider).getSourceFunction(), new EventTypeInfo()).setParallelism(intValue);
        if (sourceDef.getName().isPresent()) {
            parallelism.name(sourceDef.getName().get());
        }
        return parallelism;
    }

    private String generateDefaultSourceName(SourceDef sourceDef) {
        return String.format("Flink CDC Event Source: %s", sourceDef.getType());
    }
}
