package org.apache.flink.statefun.sdk.kafka;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.statefun.sdk.annotations.ForRuntime;
import org.apache.flink.statefun.sdk.core.OptionalProperty;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.kafka.clients.consumer.ConsumerConfig;

/* loaded from: input_file:org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.class */
public final class KafkaIngressBuilder<T> {
    private final IngressIdentifier<T> id;
    private final List<String> topics = new ArrayList();
    private final Properties properties = new Properties();
    private OptionalProperty<String> consumerGroupId = OptionalProperty.withoutDefault();
    private OptionalProperty<KafkaIngressDeserializer<T>> deserializer = OptionalProperty.withoutDefault();
    private OptionalProperty<String> kafkaAddress = OptionalProperty.withoutDefault();
    private OptionalProperty<KafkaIngressAutoResetPosition> autoResetPosition = OptionalProperty.withDefault(KafkaIngressAutoResetPosition.LATEST);
    private OptionalProperty<KafkaIngressStartupPosition> startupPosition = OptionalProperty.withDefault(KafkaIngressStartupPosition.fromLatest());

    private KafkaIngressBuilder(IngressIdentifier<T> ingressIdentifier) {
        this.id = (IngressIdentifier) Objects.requireNonNull(ingressIdentifier);
    }

    public static <T> KafkaIngressBuilder<T> forIdentifier(IngressIdentifier<T> ingressIdentifier) {
        return new KafkaIngressBuilder<>(ingressIdentifier);
    }

    public KafkaIngressBuilder<T> withConsumerGroupId(String str) {
        this.consumerGroupId.set(str);
        return this;
    }

    public KafkaIngressBuilder<T> withKafkaAddress(String str) {
        this.kafkaAddress.set(str);
        return this;
    }

    public KafkaIngressBuilder<T> withTopic(String str) {
        this.topics.add(str);
        return this;
    }

    public KafkaIngressBuilder<T> addTopics(List<String> list) {
        this.topics.addAll(list);
        return this;
    }

    public KafkaIngressBuilder<T> withProperties(Properties properties) {
        this.properties.putAll(properties);
        return this;
    }

    public KafkaIngressBuilder<T> withProperty(String str, String str2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        this.properties.setProperty(str, str2);
        return this;
    }

    public KafkaIngressBuilder<T> withDeserializer(Class<? extends KafkaIngressDeserializer<T>> cls) {
        Objects.requireNonNull(cls);
        this.deserializer.set(instantiateDeserializer(cls));
        return this;
    }

    public KafkaIngressBuilder<T> withAutoResetPosition(KafkaIngressAutoResetPosition kafkaIngressAutoResetPosition) {
        this.autoResetPosition.set(kafkaIngressAutoResetPosition);
        return this;
    }

    public KafkaIngressBuilder<T> withStartupPosition(KafkaIngressStartupPosition kafkaIngressStartupPosition) {
        this.startupPosition.set(kafkaIngressStartupPosition);
        return this;
    }

    public KafkaIngressSpec<T> build() {
        return new KafkaIngressSpec<>(this.id, resolveKafkaProperties(), this.topics, this.deserializer.get(), this.startupPosition.get());
    }

    private Properties resolveKafkaProperties() {
        Properties properties = new Properties();
        properties.putAll(this.properties);
        this.kafkaAddress.overwritePropertiesIfPresent(properties, "bootstrap.servers");
        this.autoResetPosition.overwritePropertiesIfPresent(properties, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
        this.consumerGroupId.overwritePropertiesIfPresent(properties, "group.id");
        return properties;
    }

    /* JADX WARN: Incorrect return type in method signature: <T::Lorg/apache/flink/statefun/sdk/kafka/KafkaIngressDeserializer<*>;>(Ljava/lang/Class<TT;>;)TT; */
    private static KafkaIngressDeserializer instantiateDeserializer(Class cls) {
        try {
            Constructor<T> declaredConstructor = cls.getDeclaredConstructor(new Class[0]);
            declaredConstructor.setAccessible(true);
            return (KafkaIngressDeserializer) declaredConstructor.newInstance(new Object[0]);
        } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) {
            throw new IllegalStateException("Unable to create an instance of deserializer " + cls.getName(), e);
        } catch (NoSuchMethodException e2) {
            throw new IllegalStateException("Unable to create an instance of deserializer " + cls.getName() + "; has no default constructor", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @ForRuntime
    public KafkaIngressBuilder<T> withDeserializer(KafkaIngressDeserializer<T> kafkaIngressDeserializer) {
        this.deserializer.set(kafkaIngressDeserializer);
        return this;
    }
}
