package org.apache.inlong.sdk.sort.fetcher.kafka;

import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.sdk.sort.api.AbstractTopicFetcherBuilder;
import org.apache.inlong.sdk.sort.api.Deserializer;
import org.apache.inlong.sdk.sort.api.Interceptor;
import org.apache.inlong.sdk.sort.api.TopicFetcher;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer;
import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor;

/* loaded from: input_file:org/apache/inlong/sdk/sort/fetcher/kafka/KafkaTopicFetcherBuilderImpl.class */
public class KafkaTopicFetcherBuilderImpl extends AbstractTopicFetcherBuilder {
    private String bootstrapServers;

    public KafkaTopicFetcherBuilderImpl bootstrapServers(String str) {
        this.bootstrapServers = str;
        return this;
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicFetcherBuilder
    public TopicFetcher subscribe() {
        Optional.ofNullable(this.context).orElseThrow(() -> {
            return new IllegalStateException("context is null");
        });
        Optional.ofNullable(this.bootstrapServers).orElseThrow(() -> {
            return new IllegalStateException("kafka bootstrapServers is null");
        });
        this.deserializer = (Deserializer) Optional.ofNullable(this.deserializer).orElse(new MessageDeserializer());
        if (CollectionUtils.isNotEmpty(this.topics)) {
            return subscribeMultiTopic();
        }
        if (Objects.nonNull(this.topic)) {
            return subscribeSingleTopic();
        }
        throw new IllegalArgumentException("subscribe kafka fetcher, but never assign any topic");
    }

    private TopicFetcher subscribeSingleTopic() {
        this.interceptor = (Interceptor) Optional.ofNullable(this.interceptor).orElse(new MsgTimeInterceptor());
        this.interceptor.configure(this.topic);
        KafkaSingleTopicFetcher kafkaSingleTopicFetcher = new KafkaSingleTopicFetcher(this.topic, this.context, this.interceptor, this.deserializer, this.bootstrapServers);
        if (kafkaSingleTopicFetcher.init()) {
            return kafkaSingleTopicFetcher;
        }
        throw new IllegalStateException("init kafka single topic fetcher failed");
    }

    private TopicFetcher subscribeMultiTopic() {
        InLongTopic inLongTopic = this.topics.stream().findFirst().get();
        this.interceptor = (Interceptor) Optional.ofNullable(this.interceptor).orElse(new MsgTimeInterceptor());
        this.interceptor.configure(inLongTopic);
        KafkaMultiTopicsFetcher kafkaMultiTopicsFetcher = new KafkaMultiTopicsFetcher(this.topics, this.context, this.interceptor, this.deserializer, this.bootstrapServers, (String) Optional.ofNullable(this.fetchKey).orElse(inLongTopic.getInLongCluster().getClusterId() + new Random().nextLong()));
        if (kafkaMultiTopicsFetcher.init()) {
            return kafkaMultiTopicsFetcher;
        }
        throw new IllegalStateException("init kafka multi topic fetcher failed");
    }
}
