package org.apache.beam.sdk.io.kafka;

import java.util.Collection;
import java.util.Map;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.SpelParserConfiguration;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/ConsumerSpEL.class */
public class ConsumerSpEL {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerSpEL.class);
    private static final SpelParserConfiguration config = new SpelParserConfiguration(true, true);
    private static final ExpressionParser parser = new SpelExpressionParser(config);
    private static final Expression seek2endExpression = parser.parseExpression("#consumer.seekToEnd(#tp)");
    private static final Expression assignExpression = parser.parseExpression("#consumer.assign(#tp)");
    private static boolean hasRecordTimestamp;
    private static boolean hasHeaders;
    private static boolean hasOffsetsForTimes;
    private static boolean deserializerSupportsHeaders;

    ConsumerSpEL() {
    }

    public static void evaluateSeek2End(Consumer<?, ?> consumer, TopicPartition topicPartition) {
        StandardEvaluationContext standardEvaluationContext = new StandardEvaluationContext();
        standardEvaluationContext.setVariable("consumer", consumer);
        standardEvaluationContext.setVariable("tp", topicPartition);
        seek2endExpression.getValue(standardEvaluationContext);
    }

    public static void evaluateAssign(Consumer<?, ?> consumer, Collection<TopicPartition> collection) {
        StandardEvaluationContext standardEvaluationContext = new StandardEvaluationContext();
        standardEvaluationContext.setVariable("consumer", consumer);
        standardEvaluationContext.setVariable("tp", collection);
        assignExpression.getValue(standardEvaluationContext);
    }

    public static long getRecordTimestamp(ConsumerRecord<byte[], byte[]> consumerRecord) {
        if (hasRecordTimestamp) {
            return consumerRecord.timestamp();
        }
        return -1L;
    }

    public static KafkaTimestampType getRecordTimestampType(ConsumerRecord<byte[], byte[]> consumerRecord) {
        return hasRecordTimestamp ? KafkaTimestampType.forOrdinal(consumerRecord.timestampType().ordinal()) : KafkaTimestampType.NO_TIMESTAMP_TYPE;
    }

    public static boolean hasOffsetsForTimes() {
        return hasOffsetsForTimes;
    }

    public static boolean hasHeaders() {
        return hasHeaders;
    }

    public static boolean deserializerSupportsHeaders() {
        return deserializerSupportsHeaders;
    }

    public static <T> T deserializeKey(Deserializer<T> deserializer, ConsumerRecord<byte[], byte[]> consumerRecord) {
        return deserializerSupportsHeaders ? (T) deserializer.deserialize(consumerRecord.topic(), consumerRecord.headers(), (byte[]) consumerRecord.key()) : (T) deserializer.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.key());
    }

    public static <T> T deserializeValue(Deserializer<T> deserializer, ConsumerRecord<byte[], byte[]> consumerRecord) {
        return deserializerSupportsHeaders ? (T) deserializer.deserialize(consumerRecord.topic(), consumerRecord.headers(), (byte[]) consumerRecord.value()) : (T) deserializer.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.value());
    }

    public static long offsetForTime(Consumer<?, ?> consumer, TopicPartition topicPartition, Instant instant) {
        Preconditions.checkArgument(hasOffsetsForTimes, "This Kafka Client must support Consumer.OffsetsForTimes().");
        OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) Iterables.getOnlyElement(consumer.offsetsForTimes(ImmutableMap.of(topicPartition, Long.valueOf(instant.getMillis()))).values());
        if (offsetAndTimestamp == null) {
            throw new RuntimeException("There are no messages has a timestamp that is greater than or equals to the target time or the message format version in this partition is before 0.10.0, topicPartition is: " + topicPartition);
        }
        return offsetAndTimestamp.offset();
    }

    static {
        try {
            hasRecordTimestamp = ConsumerRecord.class.getMethod("timestamp", (Class[]) null).getReturnType().equals(Long.TYPE);
        } catch (NoSuchMethodException | SecurityException e) {
            LOG.debug("Timestamp for Kafka message is not available.");
        }
        try {
            hasHeaders = "org.apache.kafka.common.header.Headers".equals(ConsumerRecord.class.getMethod("headers", (Class[]) null).getReturnType().getName());
        } catch (NoSuchMethodException | SecurityException e2) {
            LOG.debug("Headers is not available");
        }
        try {
            hasOffsetsForTimes = Consumer.class.getMethod("offsetsForTimes", Map.class).getReturnType().equals(Map.class);
        } catch (NoSuchMethodException | SecurityException e3) {
            LOG.debug("OffsetsForTimes is not available.");
        }
        try {
            deserializerSupportsHeaders = "T".equals(Deserializer.class.getDeclaredMethod("deserialize", String.class, Headers.class, byte[].class).getGenericReturnType().getTypeName());
        } catch (NoSuchMethodException | SecurityException e4) {
            LOG.debug("Deserializer interface does not support Kafka headers");
        }
    }
}
