package co.decodable.sdk.pipeline.internal;

import co.decodable.sdk.pipeline.DecodableStreamSource;
import co.decodable.sdk.pipeline.DecodableStreamSourceBuilder;
import co.decodable.sdk.pipeline.EnvironmentAccess;
import co.decodable.sdk.pipeline.StartupMode;
import co.decodable.sdk.pipeline.internal.config.StreamConfig;
import co.decodable.sdk.pipeline.internal.config.StreamConfigMapping;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

/* loaded from: input_file:co/decodable/sdk/pipeline/internal/DecodableStreamSourceBuilderImpl.class */
public class DecodableStreamSourceBuilderImpl<T> implements DecodableStreamSourceBuilder<T> {
    private String streamId;
    private String streamName;
    private StartupMode startupMode;
    private DeserializationSchema<T> deserializationSchema;

    @Override // co.decodable.sdk.pipeline.DecodableStreamSourceBuilder
    public DecodableStreamSourceBuilder<T> withStreamName(String str) {
        this.streamName = str;
        return this;
    }

    @Override // co.decodable.sdk.pipeline.DecodableStreamSourceBuilder
    public DecodableStreamSourceBuilder<T> withStreamId(String str) {
        this.streamId = str;
        return this;
    }

    @Override // co.decodable.sdk.pipeline.DecodableStreamSourceBuilder
    public DecodableStreamSourceBuilder<T> withStartupMode(StartupMode startupMode) {
        this.startupMode = startupMode;
        return this;
    }

    @Override // co.decodable.sdk.pipeline.DecodableStreamSourceBuilder
    public DecodableStreamSourceBuilder<T> withDeserializationSchema(DeserializationSchema<T> deserializationSchema) {
        this.deserializationSchema = deserializationSchema;
        return this;
    }

    @Override // co.decodable.sdk.pipeline.DecodableStreamSourceBuilder
    public DecodableStreamSource<T> build() {
        Objects.requireNonNull(this.deserializationSchema, "deserializationSchema");
        StreamConfig determineConfig = new StreamConfigMapping(EnvironmentAccess.getEnvironment().getEnvironmentConfiguration()).determineConfig(this.streamName, this.streamId);
        KafkaSourceBuilder valueOnlyDeserializer = KafkaSource.builder().setBootstrapServers(determineConfig.bootstrapServers()).setTopics(new String[]{determineConfig.topic()}).setProperties(toProperties(determineConfig.kafkaProperties())).setValueOnlyDeserializer(this.deserializationSchema);
        if (determineConfig.startupMode() != null) {
            valueOnlyDeserializer.setStartingOffsets(toOffsetsInitializer(determineConfig.startupMode()));
        } else if (this.startupMode != null) {
            valueOnlyDeserializer.setStartingOffsets(toOffsetsInitializer(this.startupMode));
        }
        return new DecodableStreamSourceImpl(valueOnlyDeserializer.build());
    }

    private static Properties toProperties(Map<String, String> map) {
        Properties properties = new Properties();
        properties.putAll(map);
        return properties;
    }

    private OffsetsInitializer toOffsetsInitializer(StartupMode startupMode) {
        switch (startupMode) {
            case EARLIEST_OFFSET:
                return OffsetsInitializer.earliest();
            case LATEST_OFFSET:
                return OffsetsInitializer.latest();
            default:
                throw new IllegalArgumentException("Unexpected startup mode: " + startupMode);
        }
    }
}
