package uk.co.gresearch.siembol.parsers.storm;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.co.gresearch.siembol.common.model.StormParsingApplicationAttributesDto;
import uk.co.gresearch.siembol.common.model.ZooKeeperAttributesDto;
import uk.co.gresearch.siembol.common.storm.KafkaBatchWriterMessage;
import uk.co.gresearch.siembol.common.storm.KafkaBatchWriterMessages;
import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnector;
import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactory;
import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactoryImpl;
import uk.co.gresearch.siembol.parsers.application.factory.ParsingApplicationFactoryAttributes;
import uk.co.gresearch.siembol.parsers.application.factory.ParsingApplicationFactoryImpl;
import uk.co.gresearch.siembol.parsers.application.factory.ParsingApplicationFactoryResult;
import uk.co.gresearch.siembol.parsers.application.parsing.ParsingApplicationParser;
import uk.co.gresearch.siembol.parsers.application.parsing.ParsingApplicationResult;

/* loaded from: input_file:uk/co/gresearch/siembol/parsers/storm/ParsingApplicationBolt.class */
public class ParsingApplicationBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String PARSERCONFIG_UPDATE_TRY_MSG_FORMAT = "Trying to update parsing app: %s, by parser configs: %s, ";
    private static final String INIT_EXCEPTION_MSG_FORMAT = "Parsing application exception: %s during initialising";
    private static final String FACTORY_EXCEPTION_MSG_FORMAT = "Exception during creation of parsing application: %s";
    private static final String UPDATE_EXCEPTION_LOG = "Exception during parserconfig update: {}";
    private static final String ERROR_INIT_MESSAGE = "Parsing application exception: Parsing app initialisation error";
    private static final String INIT_START = "Parsing application initialisation start";
    private static final String INIT_COMPLETED = "Parsing application initialisation completed";
    private static final String PARSERS_UPDATE_START = "Parser config update start";
    private static final String PARSERS_UPDATE_COMPLETED = "Parser config update completed";
    private static final String INVALID_TYPE_IN_TUPLE = "Invalid type in tuple";
    private final AtomicReference<ParsingApplicationParser> parsingApplicationParser;
    private final ZooKeeperAttributesDto zookeperAttributes;
    private final String parsingAppSpecification;
    private OutputCollector collector;
    private ZooKeeperConnector zooKeeperConnector;
    private final ZooKeeperConnectorFactory zooKeeperConnectorFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ParsingApplicationBolt(StormParsingApplicationAttributesDto stormParsingApplicationAttributesDto, ParsingApplicationFactoryAttributes parsingApplicationFactoryAttributes, ZooKeeperConnectorFactory zooKeeperConnectorFactory) throws Exception {
        this.parsingApplicationParser = new AtomicReference<>();
        this.zookeperAttributes = stormParsingApplicationAttributesDto.getZookeeperAttributes();
        this.parsingAppSpecification = parsingApplicationFactoryAttributes.getApplicationParserSpecification();
        this.zooKeeperConnectorFactory = zooKeeperConnectorFactory;
    }

    public ParsingApplicationBolt(StormParsingApplicationAttributesDto stormParsingApplicationAttributesDto, ParsingApplicationFactoryAttributes parsingApplicationFactoryAttributes) throws Exception {
        this(stormParsingApplicationAttributesDto, parsingApplicationFactoryAttributes, new ZooKeeperConnectorFactoryImpl());
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        try {
            LOG.info(INIT_START);
            this.zooKeeperConnector = this.zooKeeperConnectorFactory.createZookeeperConnector(this.zookeperAttributes);
            updateParsers();
            if (this.parsingApplicationParser.get() == null) {
                throw new IllegalStateException(ERROR_INIT_MESSAGE);
            }
            this.zooKeeperConnector.addCacheListener(this::updateParsers);
            LOG.info(INIT_COMPLETED);
        } catch (Exception e) {
            String format = String.format(INIT_EXCEPTION_MSG_FORMAT, ExceptionUtils.getStackTrace(e));
            LOG.error(format);
            throw new IllegalStateException(format);
        }
    }

    private void updateParsers() {
        try {
            ParsingApplicationFactoryImpl parsingApplicationFactoryImpl = new ParsingApplicationFactoryImpl();
            LOG.info(PARSERS_UPDATE_START);
            String data = this.zooKeeperConnector.getData();
            LOG.info(String.format(PARSERCONFIG_UPDATE_TRY_MSG_FORMAT, this.parsingAppSpecification, StringUtils.left(data, 100)));
            ParsingApplicationFactoryResult create = parsingApplicationFactoryImpl.create(this.parsingAppSpecification, data);
            if (create.getStatusCode() != ParsingApplicationFactoryResult.StatusCode.OK) {
                String format = String.format(FACTORY_EXCEPTION_MSG_FORMAT, create.getAttributes().getMessage());
                LOG.error(format);
                throw new IllegalStateException(format);
            }
            this.parsingApplicationParser.set(create.getAttributes().getApplicationParser());
            LOG.info(PARSERS_UPDATE_COMPLETED);
        } catch (Exception e) {
            LOG.error(UPDATE_EXCEPTION_LOG, ExceptionUtils.getStackTrace(e));
        }
    }

    public void execute(Tuple tuple) {
        ParsingApplicationParser parsingApplicationParser = this.parsingApplicationParser.get();
        String stringByField = tuple.getStringByField(ParsingApplicationTuples.METADATA.toString());
        Object valueByField = tuple.getValueByField(ParsingApplicationTuples.LOG.toString());
        if (!(valueByField instanceof byte[])) {
            throw new IllegalArgumentException(INVALID_TYPE_IN_TUPLE);
        }
        ArrayList<ParsingApplicationResult> parse = parsingApplicationParser.parse(stringByField, (byte[]) valueByField);
        if (!parse.isEmpty()) {
            KafkaBatchWriterMessages kafkaBatchWriterMessages = new KafkaBatchWriterMessages();
            parse.forEach(parsingApplicationResult -> {
                parsingApplicationResult.getMessages().forEach(str -> {
                    kafkaBatchWriterMessages.add(new KafkaBatchWriterMessage(parsingApplicationResult.getTopic(), str));
                });
            });
            this.collector.emit(tuple, new Values(new Object[]{kafkaBatchWriterMessages}));
        }
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{ParsingApplicationTuples.PARSING_MESSAGES.toString()}));
    }
}
