/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Map;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.Iterables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.SpelParserConfiguration;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;

class ConsumerSpEL {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerSpEL.class);
    private SpelParserConfiguration config = new SpelParserConfiguration(true, true);
    private ExpressionParser parser = new SpelExpressionParser(this.config);
    private Expression seek2endExpression = this.parser.parseExpression("#consumer.seekToEnd(#tp)");
    private Expression assignExpression = this.parser.parseExpression("#consumer.assign(#tp)");
    private Method timestampMethod;
    private boolean hasRecordTimestamp = false;
    private Method offsetGetterMethod;
    private Method offsetsForTimesMethod;
    private boolean hasOffsetsForTimes = false;

    public ConsumerSpEL() {
        try {
            this.timestampMethod = ConsumerRecord.class.getMethod("timestamp", null);
            this.hasRecordTimestamp = this.timestampMethod.getReturnType().equals(Long.TYPE);
        }
        catch (NoSuchMethodException | SecurityException e) {
            LOG.debug("Timestamp for Kafka message is not available.");
        }
        try {
            this.offsetGetterMethod = Class.forName("org.apache.kafka.clients.consumer.OffsetAndTimestamp").getMethod("offset", null);
            this.offsetsForTimesMethod = Consumer.class.getMethod("offsetsForTimes", Map.class);
            this.hasOffsetsForTimes = this.offsetsForTimesMethod.getReturnType().equals(Map.class);
        }
        catch (ClassNotFoundException | NoSuchMethodException | SecurityException e) {
            LOG.debug("OffsetsForTimes is not available.");
        }
    }

    public void evaluateSeek2End(Consumer consumer, TopicPartition topicPartitions) {
        StandardEvaluationContext mapContext = new StandardEvaluationContext();
        mapContext.setVariable("consumer", (Object)consumer);
        mapContext.setVariable("tp", (Object)topicPartitions);
        this.seek2endExpression.getValue((EvaluationContext)mapContext);
    }

    public void evaluateAssign(Consumer consumer, Collection<TopicPartition> topicPartitions) {
        StandardEvaluationContext mapContext = new StandardEvaluationContext();
        mapContext.setVariable("consumer", (Object)consumer);
        mapContext.setVariable("tp", topicPartitions);
        this.assignExpression.getValue((EvaluationContext)mapContext);
    }

    public long getRecordTimestamp(ConsumerRecord<byte[], byte[]> rawRecord) {
        long timestamp;
        try {
            if (!this.hasRecordTimestamp || (timestamp = ((Long)this.timestampMethod.invoke(rawRecord, new Object[0])).longValue()) <= 0L) {
                timestamp = System.currentTimeMillis();
            }
        }
        catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
        return timestamp;
    }

    public boolean hasOffsetsForTimes() {
        return this.hasOffsetsForTimes;
    }

    public long offsetForTime(Consumer<?, ?> consumer, TopicPartition topicPartition, Instant time) {
        Preconditions.checkArgument(this.hasOffsetsForTimes, "This Kafka Client must support Consumer.OffsetsForTimes().");
        ImmutableMap<TopicPartition, Long> timestampsToSearch = ImmutableMap.of(topicPartition, time.getMillis());
        try {
            Map offsetsByTimes = (Map)this.offsetsForTimesMethod.invoke(consumer, timestampsToSearch);
            Object offsetAndTimestamp = Iterables.getOnlyElement(offsetsByTimes.values());
            if (offsetAndTimestamp == null) {
                throw new RuntimeException("There are no messages has a timestamp that is greater than or equals to the target time or the message format version in this partition is before 0.10.0, topicPartition is: " + topicPartition);
            }
            return (Long)this.offsetGetterMethod.invoke(offsetAndTimestamp, new Object[0]);
        }
        catch (IllegalAccessException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
    }
}

