package net.intelie.liverig.plugin.collectors;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import net.intelie.live.Live;
import net.intelie.live.Query;
import net.intelie.live.QueryEvent;
import net.intelie.live.QueryListener;
import net.intelie.liverig.plugin.assets.AssetTypeService;
import net.intelie.liverig.util.SafeConsumer;
import net.intelie.pipes.time.Clock;
import net.intelie.pipes.types.Type;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/intelie/liverig/plugin/collectors/QueryRunnerToUpdateCollectorStatus.class */
class QueryRunnerToUpdateCollectorStatus {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunnerToUpdateCollectorStatus.class);

    @NotNull
    private final Live.Queries queries;

    @NotNull
    private final AssetTypeService collectorTypeService;

    @NotNull
    private final CollectorPartService collectorPartService;

    @NotNull
    private final CollectorOfflineEventListener collectorOfflineEventListener;

    @NotNull
    private final CollectorInternalEventListener collectorInternalEventListener;
    static final String UNKNOWN_COLLECTOR_INSTANCE = "unknown";
    static final String NO_AUTH = "no_auth";
    private final CollectorService collectorService;
    private final ConcurrentHashMap<String, Map<String, CollectorSourceSummary>> cacheCollectorSourcesTime = new ConcurrentHashMap<>();
    private Live.Action action = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:net/intelie/liverig/plugin/collectors/QueryRunnerToUpdateCollectorStatus$Consumer.class */
    public interface Consumer {
        void consume(@NotNull CollectorSummary collectorSummary, @NotNull Map<String, Object> map);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryRunnerToUpdateCollectorStatus(@NotNull Live live, @NotNull AssetTypeService assetTypeService, @NotNull CollectorService collectorService) throws Exception {
        this.queries = live.queries();
        this.collectorService = collectorService;
        this.collectorTypeService = assetTypeService;
        this.collectorPartService = (CollectorPartService) Objects.requireNonNull((CollectorPartService) assetTypeService.getPartService(CollectorPartService.class));
        Clock clock = live.time().clock();
        this.collectorOfflineEventListener = new CollectorOfflineEventListener(assetTypeService, this.collectorPartService);
        this.collectorInternalEventListener = new CollectorInternalEventListener(clock, this.collectorPartService, assetTypeService, this.cacheCollectorSourcesTime);
        try {
            LOGGER.info("will update collectors states to offline");
            this.collectorOfflineEventListener.changeAllInstancesToOfflineWhenPluginStarted();
        } catch (Exception e) {
            LOGGER.error("Error updating collector states to offline", e);
        }
        collectorService.registerObserver(live, this::runGlobalQueriesToUpdateCollector);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void runGlobalQueriesToUpdateCollector() {
        try {
            if (this.action != null) {
                this.action.undo();
            }
        } catch (Exception e) {
            LOGGER.error("Unexpected error", e);
        }
        if (this.collectorService.getCollectorsFeatureDisabled()) {
            return;
        }
        try {
            this.action = this.queries.run(new Query[]{new Query(CollectorExpressions.internalQuery()).description("Monitoring internal events to create collector state").preloadWindow(false).follow().listenWith(new QueryListener.Empty() { // from class: net.intelie.liverig.plugin.collectors.QueryRunnerToUpdateCollectorStatus.1
                public void onEvent(QueryEvent queryEvent, boolean z) {
                    CollectorInternalEventListener collectorInternalEventListener = QueryRunnerToUpdateCollectorStatus.this.collectorInternalEventListener;
                    Objects.requireNonNull(collectorInternalEventListener);
                    queryEvent.forEach(SafeConsumer.safeConsumer(collectorInternalEventListener::processInternalEvent));
                }
            }), new Query(CollectorExpressions.collectorRawQuery()).description("Monitoring raw events create cache of collectors sources online").preloadWindow(false).follow().listenWith(new QueryListener.Empty() { // from class: net.intelie.liverig.plugin.collectors.QueryRunnerToUpdateCollectorStatus.2
                public void onEvent(QueryEvent queryEvent, boolean z) {
                    queryEvent.forEach(map -> {
                        CollectorRawEventListener.processRawCollectorEvent(map, QueryRunnerToUpdateCollectorStatus.this.cacheCollectorSourcesTime);
                    });
                }
            }), new Query(CollectorExpressions.statusOfflineQuery()).description("Monitoring internal events to change collector state to offline").preloadWindow(false).follow().listenWith(new QueryListener.Empty() { // from class: net.intelie.liverig.plugin.collectors.QueryRunnerToUpdateCollectorStatus.3
                public void onEvent(QueryEvent queryEvent, boolean z) {
                    CollectorOfflineEventListener collectorOfflineEventListener = QueryRunnerToUpdateCollectorStatus.this.collectorOfflineEventListener;
                    Objects.requireNonNull(collectorOfflineEventListener);
                    queryEvent.forEach(SafeConsumer.safeConsumer(collectorOfflineEventListener::processStatusOfflineEvent));
                }
            }), new Query(CollectorExpressions.serverReportQuery()).description("Monitoring internal events to get collector error ").span("last hour").preloadWindow(false).follow().listenWith(new QueryListener.Empty() { // from class: net.intelie.liverig.plugin.collectors.QueryRunnerToUpdateCollectorStatus.4
                public void onEvent(QueryEvent queryEvent, boolean z) {
                    queryEvent.forEach(map -> {
                        QueryRunnerToUpdateCollectorStatus.this.updateCollectorSummaryStateByEvent(map, (v0, v1) -> {
                            v0.updateCollectorSummaryServerEventErrors(v1);
                        });
                    });
                }
            }), new Query(CollectorExpressions.serverReportConnectedQuery()).description("Monitoring internal events to get collector ip by connected event ").span("last hour").preloadWindow(false).follow().listenWith(new QueryListener.Empty() { // from class: net.intelie.liverig.plugin.collectors.QueryRunnerToUpdateCollectorStatus.5
                public void onEvent(QueryEvent queryEvent, boolean z) {
                    queryEvent.forEach(map -> {
                        QueryRunnerToUpdateCollectorStatus.this.updateCollectorSummaryStateByEvent(map, (v0, v1) -> {
                            v0.updateCollectorSummaryRemoteAddress(v1);
                        });
                    });
                }
            }), new Query(CollectorExpressions.latencyCollectorQuery()).description("Monitoring internal events to get latency").preloadWindow(false).follow().listenWith(new QueryListener.Empty() { // from class: net.intelie.liverig.plugin.collectors.QueryRunnerToUpdateCollectorStatus.6
                public void onEvent(QueryEvent queryEvent, boolean z) {
                    queryEvent.forEach(map -> {
                        QueryRunnerToUpdateCollectorStatus.this.updateCollectorSummaryStateByEvent(map, (v0, v1) -> {
                            v0.updateCollectorSummaryLatency(v1);
                        });
                    });
                }
            }), new Query(CollectorExpressions.sentBytesRateCollectorQuery()).description("Monitoring internal events to get upload bytes rates").preloadWindow(false).follow().listenWith(new QueryListener.Empty() { // from class: net.intelie.liverig.plugin.collectors.QueryRunnerToUpdateCollectorStatus.7
                public void onEvent(QueryEvent queryEvent, boolean z) {
                    queryEvent.forEach(map -> {
                        QueryRunnerToUpdateCollectorStatus.this.updateCollectorSummaryStateByEvent(map, (v0, v1) -> {
                            v0.updateCollectorSummaryUploadBytes(v1);
                        });
                    });
                }
            }), new Query(CollectorExpressions.sentBytesRateSourceQuery()).description("Monitoring internal events to get upload bytes rates by source using perSourceCounters").preloadWindow(false).follow().listenWith(new QueryListener.Empty() { // from class: net.intelie.liverig.plugin.collectors.QueryRunnerToUpdateCollectorStatus.8
                public void onEvent(QueryEvent queryEvent, boolean z) {
                    queryEvent.forEach(map -> {
                        QueryRunnerToUpdateCollectorStatus.this.updateCollectorSummaryStateByEvent(map, (v0, v1) -> {
                            v0.updateCollectorSourcesSummaryUploadBytes(v1);
                        });
                    });
                }
            }), new Query(CollectorExpressions.averageChannelsBySource()).description("Monitoring raw events to get average transmitted channels by source every 5 minutes").preloadWindow(false).follow().listenWith(new QueryListener.Empty() { // from class: net.intelie.liverig.plugin.collectors.QueryRunnerToUpdateCollectorStatus.9
                public void onEvent(QueryEvent queryEvent, boolean z) {
                    queryEvent.forEach(map -> {
                        QueryRunnerToUpdateCollectorStatus.this.updateCollectorSummaryStateByEvent(map, (v0, v1) -> {
                            v0.updateCollectorSummaryAverageChannelsTransmitting(v1);
                        });
                    });
                }
            })});
        } catch (Exception e2) {
            LOGGER.error("Error starting query to update collector states ", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateCollectorSummaryStateByEvent(@NotNull Map<String, Object> map, @NotNull Consumer consumer) {
        String instanceOrUnknown = getInstanceOrUnknown(map);
        if (instanceOrUnknown == null) {
            return;
        }
        this.collectorPartService.createCollectorIfNotExists(instanceOrUnknown);
        this.collectorTypeService.list().stream().map(asset -> {
            return (Collector) asset;
        }).filter(collector -> {
            return instanceOrUnknown.equalsIgnoreCase(collector.getInstance());
        }).forEach(SafeConsumer.safeConsumer(collector2 -> {
            CollectorSummaryUtil.setCollectorState(collector2.getId(), AssetObserverToUpdateCollectorStatus.COLLECTORS_SUMMARY, stateEntry -> {
                Map<String, CollectorSummary> collectorMapFromAssetState = CollectorSummaryUtil.getCollectorMapFromAssetState(stateEntry);
                consumer.consume(collectorMapFromAssetState.computeIfAbsent(instanceOrUnknown, str -> {
                    return new CollectorSummary(map, instanceOrUnknown);
                }), map);
                return collectorMapFromAssetState;
            }, this.collectorTypeService, this.collectorPartService);
        }));
    }

    @Nullable
    private static String getInstanceOrUnknown(@NotNull Map<String, Object> map) {
        String cast = Type.STRING.cast(map.get("instance"));
        return (cast == null && NO_AUTH.equalsIgnoreCase(Type.STRING.cast(map.get("event")))) ? UNKNOWN_COLLECTOR_INSTANCE : cast;
    }
}
