package org.apache.streampipes.manager.monitoring.runtime;

import com.google.gson.JsonParser;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.streampipes.commons.exceptions.NoMatchingFormatException;
import org.apache.streampipes.commons.exceptions.NoMatchingProtocolException;
import org.apache.streampipes.commons.exceptions.NoMatchingSchemaException;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.client.pipeline.Pipeline;
import org.apache.streampipes.model.graph.DataSourceDescription;
import org.apache.streampipes.storage.couchdb.impl.PipelineStorageImpl;

/* loaded from: input_file:BOOT-INF/lib/streampipes-pipeline-management-0.66.0.jar:org/apache/streampipes/manager/monitoring/runtime/SepStoppedMonitoring.class */
public class SepStoppedMonitoring implements EpRuntimeMonitoring<DataSourceDescription>, Runnable {
    private Map<String, List<PipelineObserver>> streamToObserver;
    private Map<String, Pipeline> streamToStoppedMonitoringPipeline;
    private SpKafkaConsumer kafkaConsumerGroup;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/streampipes-pipeline-management-0.66.0.jar:org/apache/streampipes/manager/monitoring/runtime/SepStoppedMonitoring$KafkaCallback.class */
    public class KafkaCallback implements InternalEventProcessor<byte[]> {
        private KafkaCallback() {
        }

        @Override // org.apache.streampipes.messaging.InternalEventProcessor
        public void onEvent(byte[] bArr) {
            Iterator it = ((List) SepStoppedMonitoring.this.streamToObserver.get(new JsonParser().parse(new String(bArr, StandardCharsets.UTF_8)).getAsJsonObject().get(ConsumerProtocol.TOPIC_KEY_NAME).getAsString())).iterator();
            while (it.hasNext()) {
                ((PipelineObserver) it.next()).update();
            }
        }
    }

    @Override // org.apache.streampipes.manager.monitoring.runtime.EpRuntimeMonitoring
    public boolean register(PipelineObserver pipelineObserver) {
        try {
            Pipeline pipeline = new PipelineStorageImpl().getPipeline(pipelineObserver.getPipelineId());
            ArrayList<SpDataStream> arrayList = new ArrayList();
            pipeline.getStreams().forEach(spDataStream -> {
                arrayList.add(spDataStream);
            });
            for (SpDataStream spDataStream2 : arrayList) {
                if (this.streamToObserver.get(spDataStream2.getElementId()) == null) {
                    ArrayList arrayList2 = new ArrayList();
                    arrayList2.add(pipelineObserver);
                    String elementId = spDataStream2.getElementId();
                    String substring = elementId.substring(0, elementId.lastIndexOf("/"));
                    this.streamToObserver.put(elementId, arrayList2);
                    Pipeline buildPipeline = new SepStoppedMonitoringPipelineBuilder(substring, elementId).buildPipeline();
                    Operations.startPipeline(buildPipeline, false, false, false);
                    this.streamToStoppedMonitoringPipeline.put(elementId, buildPipeline);
                } else {
                    this.streamToObserver.get(spDataStream2.getElementId()).add(pipelineObserver);
                }
            }
            return false;
        } catch (URISyntaxException e) {
            e.printStackTrace();
            return false;
        } catch (NoMatchingFormatException e2) {
            e2.printStackTrace();
            return false;
        } catch (NoMatchingProtocolException e3) {
            e3.printStackTrace();
            return false;
        } catch (NoMatchingSchemaException e4) {
            e4.printStackTrace();
            return false;
        } catch (Exception e5) {
            e5.printStackTrace();
            return false;
        }
    }

    @Override // org.apache.streampipes.manager.monitoring.runtime.EpRuntimeMonitoring
    public boolean remove(PipelineObserver pipelineObserver) {
        Iterator<SpDataStream> it = new PipelineStorageImpl().getPipeline(pipelineObserver.getPipelineId()).getStreams().iterator();
        while (it.hasNext()) {
            String elementId = it.next().getElementId();
            List<PipelineObserver> list = this.streamToObserver.get(elementId);
            if (list.size() == 1) {
                this.streamToObserver.remove(elementId);
                Operations.stopPipeline(this.streamToStoppedMonitoringPipeline.get(elementId), false, false, false);
                this.streamToStoppedMonitoringPipeline.remove(elementId);
            } else {
                list.remove(pipelineObserver);
            }
        }
        return false;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.streamToObserver = new HashMap();
        this.streamToStoppedMonitoringPipeline = new HashMap();
        this.kafkaConsumerGroup = new SpKafkaConsumer(BackendConfig.INSTANCE.getKafkaUrl(), "internal.streamepipes.sec.stopped", new KafkaCallback());
        new Thread(this.kafkaConsumerGroup).start();
    }

    public static void main(String[] strArr) throws IOException {
        SepStoppedMonitoring sepStoppedMonitoring = new SepStoppedMonitoring();
        sepStoppedMonitoring.run();
        PipelineObserver pipelineObserver = new PipelineObserver("baaaf5b2-5412-4ac1-a7eb-04aeaf0e12b8");
        PipelineObserver pipelineObserver2 = new PipelineObserver("ef915142-2a08-4166-8bea-8d946ae31cd6");
        PipelineObserver pipelineObserver3 = new PipelineObserver("b3c0b6ad-05df-4670-a078-83775eeb550b");
        sepStoppedMonitoring.register(pipelineObserver);
        sepStoppedMonitoring.register(pipelineObserver2);
        sepStoppedMonitoring.register(pipelineObserver3);
        new BufferedReader(new InputStreamReader(System.in)).readLine();
        sepStoppedMonitoring.remove(pipelineObserver);
        sepStoppedMonitoring.remove(pipelineObserver2);
        sepStoppedMonitoring.remove(pipelineObserver3);
        System.out.println("laalal");
    }
}
