package uk.co.gresearch.siembol.common.storm;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.co.gresearch.siembol.common.model.KafkaBatchWriterAttributesDto;

/* loaded from: input_file:uk/co/gresearch/siembol/common/storm/KafkaBatchWriterBolt.class */
public class KafkaBatchWriterBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int ACK_INTERVAL_ACK_IN_SEC = 1;
    private static final String AUTH_EXCEPTION_MESSAGE = "Authorization exception {} during writing messages to the kafka";
    private static final String KAFKA_EXCEPTION_MESSAGE = "Kafka exception {} during writing messages to the kafka";
    private static final String SENDING_MESSAGE_LOG = "Sending message: {} to the topic: {}";
    private static final String MISSING_MESSAGES_MSG = "Missing messages in tuple";
    private final int batchSize;
    private final String fieldName;
    private OutputCollector collector;
    private Producer<String, String> producer;
    private final ArrayList<Tuple> anchors = new ArrayList<>();
    private final ArrayList<KafkaBatchWriterMessage> messages = new ArrayList<>();
    private final Properties props = new Properties();

    public KafkaBatchWriterBolt(KafkaBatchWriterAttributesDto kafkaBatchWriterAttributesDto, String str) {
        kafkaBatchWriterAttributesDto.getProducerProperties().getRawMap().entrySet().forEach(entry -> {
            this.props.put(entry.getKey(), entry.getValue());
        });
        this.batchSize = kafkaBatchWriterAttributesDto.getBatchSize().intValue();
        this.fieldName = str;
    }

    public void execute(Tuple tuple) {
        if (TupleUtils.isTick(tuple)) {
            if (this.messages.isEmpty()) {
                return;
            }
            writeTuples();
            return;
        }
        Object valueByField = tuple.getValueByField(this.fieldName);
        if (!(valueByField instanceof KafkaBatchWriterMessages)) {
            LOG.error(MISSING_MESSAGES_MSG);
            throw new IllegalStateException(MISSING_MESSAGES_MSG);
        }
        this.messages.addAll((KafkaBatchWriterMessages) valueByField);
        this.anchors.add(tuple);
        if (this.messages.size() > this.batchSize) {
            writeTuples();
        }
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.producer = new KafkaProducer(this.props, new StringSerializer(), new StringSerializer());
    }

    public void cleanup() {
        this.producer.close();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public Map<String, Object> getComponentConfiguration() {
        return TupleUtils.putTickFrequencyIntoComponentConfig((Map) null, 1);
    }

    private void writeTuples() {
        try {
            try {
                try {
                    this.messages.forEach(kafkaBatchWriterMessage -> {
                        LOG.debug(SENDING_MESSAGE_LOG, kafkaBatchWriterMessage.getMessage(), kafkaBatchWriterMessage.getTopic());
                        this.producer.send(new ProducerRecord(kafkaBatchWriterMessage.getTopic(), String.valueOf(kafkaBatchWriterMessage.getMessage().hashCode()), kafkaBatchWriterMessage.getMessage()));
                    });
                    this.producer.flush();
                    this.anchors.forEach(tuple -> {
                        this.collector.ack(tuple);
                    });
                    this.anchors.clear();
                    this.messages.clear();
                } catch (KafkaException e) {
                    LOG.error(KAFKA_EXCEPTION_MESSAGE, ExceptionUtils.getStackTrace(e));
                    this.anchors.forEach(tuple2 -> {
                        this.collector.fail(tuple2);
                    });
                    this.anchors.clear();
                    this.messages.clear();
                }
            } catch (AuthorizationException e2) {
                LOG.error(AUTH_EXCEPTION_MESSAGE, ExceptionUtils.getStackTrace(e2));
                this.producer.close();
                throw new IllegalStateException((Throwable) e2);
            }
        } catch (Throwable th) {
            this.anchors.clear();
            this.messages.clear();
            throw th;
        }
    }
}
