package org.apache.flink.statefun.sdk.kafka;

import java.util.List;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.statefun.sdk.IngressType;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;

/* loaded from: input_file:org/apache/flink/statefun/sdk/kafka/KafkaIngressSpec.class */
public class KafkaIngressSpec<T> implements IngressSpec<T> {
    private final Properties properties;
    private final List<String> topics;
    private final KafkaIngressDeserializer<T> deserializer;
    private final KafkaIngressStartupPosition startupPosition;
    private final IngressIdentifier<T> ingressIdentifier;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaIngressSpec(IngressIdentifier<T> ingressIdentifier, Properties properties, List<String> list, KafkaIngressDeserializer<T> kafkaIngressDeserializer, KafkaIngressStartupPosition kafkaIngressStartupPosition) {
        this.properties = requireValidProperties(properties);
        this.topics = requireValidTopics(list);
        this.startupPosition = requireValidStartupPosition(kafkaIngressStartupPosition, properties);
        this.deserializer = (KafkaIngressDeserializer) Objects.requireNonNull(kafkaIngressDeserializer);
        this.ingressIdentifier = (IngressIdentifier) Objects.requireNonNull(ingressIdentifier);
    }

    @Override // org.apache.flink.statefun.sdk.io.IngressSpec
    public IngressIdentifier<T> id() {
        return this.ingressIdentifier;
    }

    @Override // org.apache.flink.statefun.sdk.io.IngressSpec
    public IngressType type() {
        return Constants.KAFKA_INGRESS_TYPE;
    }

    public Properties properties() {
        return this.properties;
    }

    public List<String> topics() {
        return this.topics;
    }

    public KafkaIngressDeserializer<T> deserializer() {
        return this.deserializer;
    }

    public KafkaIngressStartupPosition startupPosition() {
        return this.startupPosition;
    }

    private static Properties requireValidProperties(Properties properties) {
        Objects.requireNonNull(properties);
        if (!properties.containsKey("bootstrap.servers")) {
            throw new IllegalArgumentException("Missing setting for Kafka address.");
        }
        if (properties.containsKey("group.id")) {
            return properties;
        }
        throw new IllegalArgumentException("Missing setting for consumer group id.");
    }

    private static List<String> requireValidTopics(List<String> list) {
        Objects.requireNonNull(list);
        if (list.isEmpty()) {
            throw new IllegalArgumentException("Must define at least one Kafka topic to consume from.");
        }
        return list;
    }

    private static KafkaIngressStartupPosition requireValidStartupPosition(KafkaIngressStartupPosition kafkaIngressStartupPosition, Properties properties) {
        if (!kafkaIngressStartupPosition.isGroupOffsets() || properties.containsKey("group.id")) {
            return kafkaIngressStartupPosition;
        }
        throw new IllegalStateException("The ingress is configured to start from committed consumer group offsets in Kafka, but no consumer group id was set.\nPlease set the group id with the withConsumerGroupId(String) method.");
    }
}
