package co.cask.cdap.kafka.flow;

import co.cask.cdap.api.annotation.Tick;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.FailurePolicy;
import co.cask.cdap.api.flow.flowlet.FailureReason;
import co.cask.cdap.api.flow.flowlet.FlowletContext;
import co.cask.cdap.api.flow.flowlet.InputContext;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Service;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.twill.kafka.client.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/kafka/flow/KafkaConsumerFlowlet.class */
public abstract class KafkaConsumerFlowlet<KEY, PAYLOAD, OFFSET> extends AbstractFlowlet {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerFlowlet.class);
    protected static final int SO_TIMEOUT = 5000;
    private final Function<KafkaConsumerInfo<OFFSET>, OFFSET> consumerToOffset = new Function<KafkaConsumerInfo<OFFSET>, OFFSET>() { // from class: co.cask.cdap.kafka.flow.KafkaConsumerFlowlet.1
        public OFFSET apply(KafkaConsumerInfo<OFFSET> kafkaConsumerInfo) {
            return kafkaConsumerInfo.getReadOffset();
        }
    };
    private Function<ByteBuffer, KEY> keyDecoder;
    private Function<ByteBuffer, PAYLOAD> payloadDecoder;
    private KafkaConfig kafkaConfig;
    private Map<TopicPartition, KafkaConsumerInfo<OFFSET>> consumerInfos;
    private Map<TopicPartition, KafkaConsumerInfo<OFFSET>> changedConsumerInfos;
    private int instances;

    public void initialize(FlowletContext flowletContext) throws Exception {
        super.initialize(flowletContext);
        this.instances = flowletContext.getInstanceCount();
        Type type = TypeToken.of(getClass()).getSupertype(KafkaConsumerFlowlet.class).getType();
        if (type instanceof ParameterizedType) {
            Type[] actualTypeArguments = ((ParameterizedType) type).getActualTypeArguments();
            this.keyDecoder = createKeyDecoder(actualTypeArguments[0]);
            this.payloadDecoder = createPayloadDecoder(actualTypeArguments[1]);
        }
        DefaultKafkaConfigurer defaultKafkaConfigurer = new DefaultKafkaConfigurer();
        configureKafka(defaultKafkaConfigurer);
        if (defaultKafkaConfigurer.getZookeeper() == null && defaultKafkaConfigurer.getBrokers() == null) {
            throw new IllegalStateException("Kafka not configured. Must provide either zookeeper or broker list.");
        }
        this.kafkaConfig = new KafkaConfig(defaultKafkaConfigurer.getZookeeper(), defaultKafkaConfigurer.getBrokers());
        this.consumerInfos = createConsumerInfos(defaultKafkaConfigurer.getTopicPartitions());
        this.changedConsumerInfos = this.consumerInfos;
    }

    @Tick(delay = 100, unit = TimeUnit.MILLISECONDS)
    public void pollMessages() throws Exception {
        if (this.instances != getContext().getInstanceCount()) {
            DefaultKafkaConfigurer defaultKafkaConfigurer = new DefaultKafkaConfigurer();
            handleInstancesChanged(defaultKafkaConfigurer);
            this.changedConsumerInfos = Maps.newHashMap(this.consumerInfos);
            updateConsumerInfos(defaultKafkaConfigurer.getTopicPartitions(), this.changedConsumerInfos);
            return;
        }
        boolean z = false;
        for (KafkaConsumerInfo<OFFSET> kafkaConsumerInfo : this.consumerInfos.values()) {
            Iterator<KafkaMessage<OFFSET>> readMessages = readMessages(kafkaConsumerInfo);
            while (readMessages.hasNext()) {
                KafkaMessage<OFFSET> next = readMessages.next();
                processMessage((KafkaMessage) next);
                kafkaConsumerInfo.setReadOffset(next.getNextOffset());
            }
            if (kafkaConsumerInfo.hasPendingChanges()) {
                z = true;
            }
        }
        if (z) {
            saveReadOffsets(Maps.transformValues(this.consumerInfos, this.consumerToOffset));
        }
    }

    public void onSuccess(Object obj, InputContext inputContext) {
        super.onSuccess(obj, inputContext);
        if (obj != null) {
            return;
        }
        Iterator<KafkaConsumerInfo<OFFSET>> it = this.consumerInfos.values().iterator();
        while (it.hasNext()) {
            it.next().commitReadOffset();
        }
        if (getContext().getInstanceCount() != this.instances) {
            this.instances = getContext().getInstanceCount();
            this.consumerInfos = ImmutableMap.copyOf(this.changedConsumerInfos);
        }
    }

    public FailurePolicy onFailure(Object obj, InputContext inputContext, FailureReason failureReason) {
        if (obj == null) {
            Iterator<KafkaConsumerInfo<OFFSET>> it = this.consumerInfos.values().iterator();
            while (it.hasNext()) {
                it.next().rollbackReadOffset();
            }
        }
        return FailurePolicy.RETRY;
    }

    protected KeyValueTable getOffsetStore() {
        return null;
    }

    protected abstract void configureKafka(KafkaConfigurer kafkaConfigurer);

    protected abstract Iterator<KafkaMessage<OFFSET>> readMessages(KafkaConsumerInfo<OFFSET> kafkaConsumerInfo);

    protected abstract OFFSET getBeginOffset(TopicPartition topicPartition);

    protected abstract void saveReadOffsets(Map<TopicPartition, OFFSET> map);

    protected void handleInstancesChanged(KafkaConsumerConfigurer kafkaConsumerConfigurer) {
    }

    protected final KafkaConfig getKafkaConfig() {
        return this.kafkaConfig;
    }

    protected void processMessage(KafkaMessage<OFFSET> kafkaMessage) throws Exception {
        processMessage(decodeKey(kafkaMessage.getKey()), decodePayload(kafkaMessage.getPayload()));
    }

    protected void processMessage(KEY key, PAYLOAD payload) throws Exception {
        processMessage((KafkaConsumerFlowlet<KEY, PAYLOAD, OFFSET>) payload);
    }

    protected void processMessage(PAYLOAD payload) throws Exception {
    }

    protected KEY decodeKey(ByteBuffer byteBuffer) {
        if (this.keyDecoder != null) {
            return (KEY) this.keyDecoder.apply(byteBuffer);
        }
        return null;
    }

    protected PAYLOAD decodePayload(ByteBuffer byteBuffer) {
        if (this.payloadDecoder != null) {
            return (PAYLOAD) this.payloadDecoder.apply(byteBuffer);
        }
        return null;
    }

    protected final void stopService(Service service) {
        try {
            service.stopAndWait();
        } catch (Throwable th) {
            LOG.error("Failed when stopping service {}", service, th);
        }
    }

    protected String getStoreKey(TopicPartition topicPartition) {
        return topicPartition.getTopic() + ":" + topicPartition.getPartition();
    }

    private Function<ByteBuffer, KEY> createKeyDecoder(Type type) {
        return (Function<ByteBuffer, KEY>) createDecoder(type, "No decoder for decoding message key");
    }

    private Function<ByteBuffer, PAYLOAD> createPayloadDecoder(Type type) {
        return (Function<ByteBuffer, PAYLOAD>) createDecoder(type, "No decoder for decoding message payload");
    }

    private <T> Function<ByteBuffer, T> createDecoder(Type type, String str) {
        return String.class.equals(type) ? (Function<ByteBuffer, T>) createStringDecoder() : ByteBuffer.class.equals(type) ? (Function<ByteBuffer, T>) createByteBufferDecoder() : ((type instanceof GenericArrayType) && Byte.TYPE.equals(((GenericArrayType) type).getGenericComponentType())) ? (Function<ByteBuffer, T>) createBytesDecoder() : createFailureDecoder(str);
    }

    private Function<ByteBuffer, String> createStringDecoder() {
        return new Function<ByteBuffer, String>() { // from class: co.cask.cdap.kafka.flow.KafkaConsumerFlowlet.2
            public String apply(ByteBuffer byteBuffer) {
                byteBuffer.mark();
                String charBuffer = Charsets.UTF_8.decode(byteBuffer).toString();
                byteBuffer.reset();
                return charBuffer;
            }
        };
    }

    private Function<ByteBuffer, ByteBuffer> createByteBufferDecoder() {
        return new Function<ByteBuffer, ByteBuffer>() { // from class: co.cask.cdap.kafka.flow.KafkaConsumerFlowlet.3
            public ByteBuffer apply(ByteBuffer byteBuffer) {
                return byteBuffer;
            }
        };
    }

    private Function<ByteBuffer, byte[]> createBytesDecoder() {
        return new Function<ByteBuffer, byte[]>() { // from class: co.cask.cdap.kafka.flow.KafkaConsumerFlowlet.4
            public byte[] apply(ByteBuffer byteBuffer) {
                byte[] bArr = new byte[byteBuffer.remaining()];
                byteBuffer.mark();
                byteBuffer.get(bArr);
                byteBuffer.reset();
                return bArr;
            }
        };
    }

    private <T> Function<ByteBuffer, T> createFailureDecoder(final String str) {
        return new Function<ByteBuffer, T>() { // from class: co.cask.cdap.kafka.flow.KafkaConsumerFlowlet.5
            public T apply(ByteBuffer byteBuffer) {
                throw new IllegalStateException(str);
            }
        };
    }

    private Map<TopicPartition, KafkaConsumerInfo<OFFSET>> createConsumerInfos(Map<TopicPartition, Integer> map) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Map.Entry<TopicPartition, Integer> entry : map.entrySet()) {
            builder.put(entry.getKey(), new KafkaConsumerInfo(entry.getKey(), entry.getValue().intValue(), getBeginOffset(entry.getKey())));
        }
        return builder.build();
    }

    private void updateConsumerInfos(final Map<TopicPartition, Integer> map, Map<TopicPartition, KafkaConsumerInfo<OFFSET>> map2) {
        Iterables.removeIf(map2.entrySet(), new Predicate<Map.Entry<TopicPartition, KafkaConsumerInfo<OFFSET>>>() { // from class: co.cask.cdap.kafka.flow.KafkaConsumerFlowlet.6
            public boolean apply(Map.Entry<TopicPartition, KafkaConsumerInfo<OFFSET>> entry) {
                return !map.containsKey(entry.getKey());
            }
        });
        for (Map.Entry<TopicPartition, Integer> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            int intValue = entry.getValue().intValue();
            KafkaConsumerInfo<OFFSET> kafkaConsumerInfo = map2.get(key);
            if (kafkaConsumerInfo == null) {
                map2.put(key, new KafkaConsumerInfo<>(key, intValue, getBeginOffset(entry.getKey())));
            } else if (kafkaConsumerInfo.getFetchSize() != intValue) {
                map2.put(key, new KafkaConsumerInfo<>(key, intValue, kafkaConsumerInfo.getReadOffset()));
            }
        }
    }
}
