package org.apache.storm.kafka.trident;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
import org.apache.storm.topology.FailedException;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/trident/TridentKafkaState.class */
public class TridentKafkaState<K, V> implements State {
    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
    private KafkaProducer<K, V> producer;
    private TridentTupleToKafkaMapper<K, V> mapper;
    private KafkaTopicSelector topicSelector;

    public TridentKafkaState<K, V> withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper<K, V> tridentTupleToKafkaMapper) {
        this.mapper = tridentTupleToKafkaMapper;
        return this;
    }

    public TridentKafkaState<K, V> withKafkaTopicSelector(KafkaTopicSelector kafkaTopicSelector) {
        this.topicSelector = kafkaTopicSelector;
        return this;
    }

    public void beginCommit(Long l) {
        LOG.debug("beginCommit is Noop.");
    }

    public void commit(Long l) {
        LOG.debug("commit is Noop.");
    }

    public void prepare(Properties properties) {
        Objects.requireNonNull(this.mapper, "mapper can not be null");
        Objects.requireNonNull(this.topicSelector, "topicSelector can not be null");
        this.producer = new KafkaProducer<>(properties);
    }

    public void updateState(List<TridentTuple> list, TridentCollector tridentCollector) {
        String str = null;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            ArrayList arrayList = new ArrayList(list.size());
            for (TridentTuple tridentTuple : list) {
                str = this.topicSelector.getTopic(tridentTuple);
                V messageFromTuple = this.mapper.getMessageFromTuple(tridentTuple);
                K keyFromTuple = this.mapper.getKeyFromTuple(tridentTuple);
                if (str == null) {
                    LOG.warn("skipping key = {}, topic selector returned null.", keyFromTuple);
                } else if (messageFromTuple != null) {
                    arrayList.add(this.producer.send(new ProducerRecord(str, keyFromTuple, messageFromTuple)));
                } else {
                    LOG.warn("skipping Message with Key {} as message was null", keyFromTuple);
                }
            }
            int size = arrayList.size();
            ArrayList arrayList2 = new ArrayList(size);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                try {
                    ((Future) it.next()).get();
                } catch (ExecutionException e) {
                    arrayList2.add(e);
                }
            }
            if (arrayList2.size() <= 0) {
                LOG.info("Emitted record {} sucessfully in {} ms to topic {} ", new Object[]{Integer.valueOf(size), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), str});
                return;
            }
            StringBuilder sb = new StringBuilder("Could not retrieve result for messages ");
            sb.append(list).append(" from topic = ").append(str).append(" because of the following exceptions:").append(System.lineSeparator());
            Iterator it2 = arrayList2.iterator();
            while (it2.hasNext()) {
                sb = sb.append(((ExecutionException) it2.next()).getMessage()).append(System.lineSeparator());
            }
            String sb2 = sb.toString();
            LOG.error(sb2);
            throw new FailedException(sb2);
        } catch (Exception e2) {
            String str2 = "Could not send messages " + list + " to topic = " + str;
            LOG.warn(str2, e2);
            throw new FailedException(str2, e2);
        }
    }
}
