package org.apache.druid.indexing.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;

/* loaded from: input_file:org/apache/druid/indexing/kafka/KafkaRecordSupplier.class */
public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaRecordEntity> {
    private final KafkaConsumer<byte[], byte[]> consumer;
    private boolean closed;

    public KafkaRecordSupplier(Map<String, Object> map, ObjectMapper objectMapper) {
        this(getKafkaConsumer(objectMapper, map));
    }

    @VisibleForTesting
    public KafkaRecordSupplier(KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        this.consumer = kafkaConsumer;
    }

    public void assign(Set<StreamPartition<Integer>> set) {
        wrapExceptions(() -> {
            this.consumer.assign((Collection) set.stream().map(streamPartition -> {
                return new TopicPartition(streamPartition.getStream(), ((Integer) streamPartition.getPartitionId()).intValue());
            }).collect(Collectors.toSet()));
        });
    }

    public void seek(StreamPartition<Integer> streamPartition, Long l) {
        wrapExceptions(() -> {
            this.consumer.seek(new TopicPartition(streamPartition.getStream(), ((Integer) streamPartition.getPartitionId()).intValue()), l.longValue());
        });
    }

    public void seekToEarliest(Set<StreamPartition<Integer>> set) {
        wrapExceptions(() -> {
            this.consumer.seekToBeginning((Collection) set.stream().map(streamPartition -> {
                return new TopicPartition(streamPartition.getStream(), ((Integer) streamPartition.getPartitionId()).intValue());
            }).collect(Collectors.toList()));
        });
    }

    public void seekToLatest(Set<StreamPartition<Integer>> set) {
        wrapExceptions(() -> {
            this.consumer.seekToEnd((Collection) set.stream().map(streamPartition -> {
                return new TopicPartition(streamPartition.getStream(), ((Integer) streamPartition.getPartitionId()).intValue());
            }).collect(Collectors.toList()));
        });
    }

    /* renamed from: getAssignment, reason: merged with bridge method [inline-methods] */
    public Set<StreamPartition<Integer>> m11getAssignment() {
        return (Set) wrapExceptions(() -> {
            return (Set) this.consumer.assignment().stream().map(topicPartition -> {
                return new StreamPartition(topicPartition.topic(), Integer.valueOf(topicPartition.partition()));
            }).collect(Collectors.toSet());
        });
    }

    @Nonnull
    public List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> poll(long j) {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.consumer.poll(Duration.ofMillis(j)).iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            arrayList.add(new OrderedPartitionableRecord(consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(consumerRecord))));
        }
        return arrayList;
    }

    public Long getLatestSequenceNumber(StreamPartition<Integer> streamPartition) {
        Long position = getPosition(streamPartition);
        seekToLatest(Collections.singleton(streamPartition));
        Long position2 = getPosition(streamPartition);
        seek(streamPartition, position);
        return position2;
    }

    public Long getEarliestSequenceNumber(StreamPartition<Integer> streamPartition) {
        Long position = getPosition(streamPartition);
        seekToEarliest(Collections.singleton(streamPartition));
        Long position2 = getPosition(streamPartition);
        seek(streamPartition, position);
        return position2;
    }

    public Long getPosition(StreamPartition<Integer> streamPartition) {
        return (Long) wrapExceptions(() -> {
            return Long.valueOf(this.consumer.position(new TopicPartition(streamPartition.getStream(), ((Integer) streamPartition.getPartitionId()).intValue())));
        });
    }

    public Set<Integer> getPartitionIds(String str) {
        return (Set) wrapExceptions(() -> {
            List partitionsFor = this.consumer.partitionsFor(str);
            if (partitionsFor == null) {
                throw new ISE("Topic [%s] is not found in KafkaConsumer's list of topics", new Object[]{str});
            }
            return (Set) partitionsFor.stream().map((v0) -> {
                return v0.partition();
            }).collect(Collectors.toSet());
        });
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.consumer.close();
    }

    public static void addConsumerPropertiesFromConfig(Properties properties, ObjectMapper objectMapper, Map<String, Object> map) {
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            String key = entry.getKey();
            if (!KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY.equals(key)) {
                if (key.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY) || key.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY) || key.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) {
                    properties.setProperty(key, ((PasswordProvider) objectMapper.convertValue(entry.getValue(), PasswordProvider.class)).getPassword());
                } else {
                    properties.setProperty(key, String.valueOf(entry.getValue()));
                }
            }
        }
        Object obj = map.get(KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY);
        if (obj != null) {
            for (Map.Entry entry2 : ((DynamicConfigProvider) objectMapper.convertValue(obj, DynamicConfigProvider.class)).getConfig().entrySet()) {
                properties.setProperty((String) entry2.getKey(), (String) entry2.getValue());
            }
        }
    }

    private static Deserializer getKafkaDeserializer(Properties properties, String str) {
        try {
            Class<?> cls = Class.forName(properties.getProperty(str, ByteArrayDeserializer.class.getTypeName()));
            Type genericReturnType = cls.getMethod("deserialize", String.class, byte[].class).getGenericReturnType();
            if (genericReturnType == byte[].class) {
                return (Deserializer) cls.getConstructor(new Class[0]).newInstance(new Object[0]);
            }
            throw new IllegalArgumentException("Kafka deserializers must return a byte array (byte[]), " + cls.getName() + " returns " + genericReturnType.getTypeName());
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new StreamException(e);
        }
    }

    private static KafkaConsumer<byte[], byte[]> getKafkaConsumer(ObjectMapper objectMapper, Map<String, Object> map) {
        Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
        Properties properties = new Properties();
        addConsumerPropertiesFromConfig(properties, objectMapper, map);
        properties.putIfAbsent("isolation.level", "read_committed");
        properties.putIfAbsent("group.id", StringUtils.format("kafka-supervisor-%s", new Object[]{IdUtils.getRandomId()}));
        properties.putAll(consumerProperties);
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(KafkaRecordSupplier.class.getClassLoader());
            KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties, getKafkaDeserializer(properties, "key.deserializer"), getKafkaDeserializer(properties, "value.deserializer"));
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return kafkaConsumer;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private static <T> T wrapExceptions(Callable<T> callable) {
        try {
            return callable.call();
        } catch (Exception e) {
            throw new StreamException(e);
        }
    }

    private static void wrapExceptions(Runnable runnable) {
        wrapExceptions(() -> {
            runnable.run();
            return null;
        });
    }

    /* renamed from: getPosition, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m8getPosition(StreamPartition streamPartition) {
        return getPosition((StreamPartition<Integer>) streamPartition);
    }

    /* renamed from: getEarliestSequenceNumber, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m9getEarliestSequenceNumber(StreamPartition streamPartition) {
        return getEarliestSequenceNumber((StreamPartition<Integer>) streamPartition);
    }

    /* renamed from: getLatestSequenceNumber, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m10getLatestSequenceNumber(StreamPartition streamPartition) {
        return getLatestSequenceNumber((StreamPartition<Integer>) streamPartition);
    }

    public /* bridge */ /* synthetic */ void seek(StreamPartition streamPartition, Object obj) throws InterruptedException {
        seek((StreamPartition<Integer>) streamPartition, (Long) obj);
    }
}
