package uk.co.gresearch.siembol.configeditor.sync.service;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.Health;
import uk.co.gresearch.siembol.common.model.StormTopologiesDto;
import uk.co.gresearch.siembol.common.model.StormTopologyDto;
import uk.co.gresearch.siembol.common.model.ZooKeeperAttributesDto;
import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnector;
import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactory;
import uk.co.gresearch.siembol.configeditor.model.ConfigEditorAttributes;
import uk.co.gresearch.siembol.configeditor.model.ConfigEditorResult;

/* loaded from: input_file:uk/co/gresearch/siembol/configeditor/sync/service/StormApplicationProviderImpl.class */
public class StormApplicationProviderImpl implements StormApplicationProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final ObjectReader TOPOLOGIES_READER = new ObjectMapper().readerFor(StormTopologiesDto.class);
    private static final ObjectWriter TOPOLOGIES_WRITER = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL).writerFor(StormTopologiesDto.class);
    private static final String WRONG_TOPOLOGIES_FORMAT = "Wrong format of storm topologies {}";
    private static final String NOTHING_TO_UPDATE_MSG = "nothing to update";
    private static final String WRONG_TOPOLOGY_NAME_MSG = "Topology with the name %s of the service %s is not released";
    private static final String REQUESTED_RESTART_TOPOLOGY_WITH_NAME = "Requested restart topology with name {}";
    private static final String INIT_START_MSG = "Starting initialising storm application provider";
    private static final String INIT_COMPLETED_MSG = "Initialisation of storm application provider completed";
    private static final String DUPLICATE_NAME_MSG = "Duplicate topology name %s in updated topologies";
    private static final String UPDATING_TOPOLOGIES_IN_ZOOKEEPER = "Updating topologies in zookeeper {}";
    private final ZooKeeperConnector zooKeeperConnector;
    private final AtomicReference<String> topologies = new AtomicReference<>();
    private final AtomicReference<Exception> exception = new AtomicReference<>();

    public StormApplicationProviderImpl(ZooKeeperConnector zooKeeperConnector) {
        this.zooKeeperConnector = zooKeeperConnector;
        updateTopologiesCallback();
        this.zooKeeperConnector.addCacheListener(this::updateTopologiesCallback);
    }

    private void updateTopologiesCallback() {
        this.topologies.set((String) this.zooKeeperConnector.getData());
        getCurrentTopologies();
    }

    private StormTopologiesDto getCurrentTopologies() {
        String str = this.topologies.get();
        try {
            return (StormTopologiesDto) TOPOLOGIES_READER.readValue(this.topologies.get());
        } catch (IOException e) {
            this.exception.set(e);
            LOGGER.error(WRONG_TOPOLOGIES_FORMAT, str);
            throw new IllegalStateException(e);
        }
    }

    private ConfigEditorResult sendTopologiesToZookeeper(List<StormTopologyDto> list) {
        HashSet hashSet = new HashSet();
        for (StormTopologyDto stormTopologyDto : list) {
            if (hashSet.contains(stormTopologyDto.getTopologyName())) {
                String format = String.format(DUPLICATE_NAME_MSG, stormTopologyDto.getTopologyName());
                LOGGER.error(format);
                IllegalStateException illegalStateException = new IllegalStateException(format);
                this.exception.set(illegalStateException);
                return ConfigEditorResult.fromException(illegalStateException);
            }
            hashSet.add(stormTopologyDto.getTopologyName());
        }
        list.sort(Comparator.comparing((v0) -> {
            return v0.getTopologyName();
        }));
        StormTopologiesDto stormTopologiesDto = new StormTopologiesDto();
        stormTopologiesDto.setTopologies(list);
        stormTopologiesDto.setTimestamp(Long.valueOf(System.currentTimeMillis()));
        try {
            String writeValueAsString = TOPOLOGIES_WRITER.writeValueAsString(stormTopologiesDto);
            LOGGER.info(UPDATING_TOPOLOGIES_IN_ZOOKEEPER, writeValueAsString);
            this.zooKeeperConnector.setData(writeValueAsString);
            ConfigEditorAttributes configEditorAttributes = new ConfigEditorAttributes();
            configEditorAttributes.setTopologies(list);
            return new ConfigEditorResult(ConfigEditorResult.StatusCode.OK, configEditorAttributes);
        } catch (Exception e) {
            LOGGER.error(ExceptionUtils.getStackTrace(e));
            return ConfigEditorResult.fromException(e);
        }
    }

    private static boolean areEqualTopologies(StormTopologyDto stormTopologyDto, StormTopologyDto stormTopologyDto2) {
        return stormTopologyDto.getTopologyName().equals(stormTopologyDto2.getTopologyName()) && stormTopologyDto.getImage().equals(stormTopologyDto2.getImage()) && stormTopologyDto.getServiceName().equals(stormTopologyDto2.getServiceName()) && stormTopologyDto.getAttributes().equals(stormTopologyDto2.getAttributes());
    }

    @Override // uk.co.gresearch.siembol.configeditor.sync.service.StormApplicationProvider
    public ConfigEditorResult getStormTopologies() {
        ConfigEditorAttributes configEditorAttributes = new ConfigEditorAttributes();
        configEditorAttributes.setTopologies(getCurrentTopologies().getTopologies());
        return new ConfigEditorResult(ConfigEditorResult.StatusCode.OK, configEditorAttributes);
    }

    @Override // uk.co.gresearch.siembol.configeditor.sync.service.StormApplicationProvider
    public ConfigEditorResult getStormTopologies(String str) {
        List list = (List) getCurrentTopologies().getTopologies().stream().filter(stormTopologyDto -> {
            return stormTopologyDto.getServiceName().equals(str);
        }).collect(Collectors.toList());
        ConfigEditorAttributes configEditorAttributes = new ConfigEditorAttributes();
        configEditorAttributes.setTopologies(list);
        return new ConfigEditorResult(ConfigEditorResult.StatusCode.OK, configEditorAttributes);
    }

    @Override // uk.co.gresearch.siembol.configeditor.sync.service.StormApplicationProvider
    public ConfigEditorResult updateStormTopologies(List<StormTopologyDto> list, Set<String> set) {
        StormTopologiesDto currentTopologies = getCurrentTopologies();
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        currentTopologies.getTopologies().forEach(stormTopologyDto -> {
            if (set.contains(stormTopologyDto.getServiceName())) {
                hashMap.put(stormTopologyDto.getTopologyName(), stormTopologyDto);
            } else {
                arrayList.add(stormTopologyDto);
            }
        });
        boolean z = false;
        for (StormTopologyDto stormTopologyDto2 : list) {
            String topologyName = stormTopologyDto2.getTopologyName();
            if (hashMap.containsKey(topologyName) && areEqualTopologies((StormTopologyDto) hashMap.get(topologyName), stormTopologyDto2)) {
                arrayList.add(hashMap.get(topologyName));
            } else {
                z = true;
                arrayList.add(stormTopologyDto2);
            }
        }
        return (z || arrayList.size() != currentTopologies.getTopologies().size()) ? sendTopologiesToZookeeper(arrayList) : ConfigEditorResult.fromMessage(ConfigEditorResult.StatusCode.OK, NOTHING_TO_UPDATE_MSG);
    }

    @Override // uk.co.gresearch.siembol.configeditor.sync.service.StormApplicationProvider
    public ConfigEditorResult restartStormTopology(String str, String str2) {
        LOGGER.info(REQUESTED_RESTART_TOPOLOGY_WITH_NAME, str2);
        StormTopologiesDto currentTopologies = getCurrentTopologies();
        for (StormTopologyDto stormTopologyDto : currentTopologies.getTopologies()) {
            if (stormTopologyDto.getTopologyName().equals(str2) && stormTopologyDto.getServiceName().equals(str)) {
                stormTopologyDto.setTopologyId(UUID.randomUUID().toString());
                return sendTopologiesToZookeeper(currentTopologies.getTopologies());
            }
        }
        String format = String.format(WRONG_TOPOLOGY_NAME_MSG, str2, str);
        LOGGER.info(format);
        return ConfigEditorResult.fromMessage(ConfigEditorResult.StatusCode.BAD_REQUEST, format);
    }

    @Override // uk.co.gresearch.siembol.configeditor.sync.service.StormApplicationProvider
    public Health checkHealth() {
        return this.exception.get() == null ? Health.up().build() : Health.down().withException(this.exception.get()).build();
    }

    public static StormApplicationProviderImpl create(ZooKeeperConnectorFactory zooKeeperConnectorFactory, ZooKeeperAttributesDto zooKeeperAttributesDto) throws Exception {
        LOGGER.info(INIT_START_MSG);
        ZooKeeperConnector createZookeeperConnector = zooKeeperConnectorFactory.createZookeeperConnector(zooKeeperAttributesDto);
        LOGGER.info(INIT_COMPLETED_MSG);
        return new StormApplicationProviderImpl(createZookeeperConnector);
    }
}
