/*
 * Decompiled with CFR 0.152.
 */
package io.cellery.observability.tracing.receiver;

import com.google.gson.Gson;
import com.sun.net.httpserver.HttpContext;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import io.cellery.observability.tracing.receiver.internal.Codec;
import io.cellery.observability.tracing.receiver.internal.ZipkinSpan;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;

@Extension(name="tracing-receiver", namespace="source", description="This is the tracing Receiver for Cellery. This accepts Zipkin encoded tracing data. The event source outputs a map of attributes. Therefore a key-value mapper needs to be used.", parameters={@Parameter(name="ip", type={DataType.STRING}, description="IP to which the server connector should be bound to", optional=true, defaultValue="0.0.0.0"), @Parameter(name="port", type={DataType.INT}, description="Port on which the server connector should listen on", optional=true, defaultValue="9411")}, examples={@Example(syntax="@source(type='tracing-receiver', @map(type='keyvalue', fail.on.missing.attribute='false'))\ndefine stream ZipkinStreamIn (traceId string, id string, parentId string, name string, serviceName string, kind string, timestamp long, duration long, tags string)", description="This produced events when Zipkin tracing data is received on amy interface on port 9411. The stream definition of the event source is fixed since it depends on the Zipkin format")})
public class TracingEventSource
extends Source {
    private static final Logger logger = Logger.getLogger((String)TracingEventSource.class.getName());
    private static final Gson gson = new Gson();
    private HttpServer httpServer;
    private HttpServerListener httpServerListener;
    private String host;
    private String apiContext;
    private int port;

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.host = optionHolder.validateAndGetStaticValue("host", "0.0.0.0");
        this.port = Integer.parseInt(optionHolder.validateAndGetStaticValue("port", "9411"));
        this.apiContext = optionHolder.validateAndGetStaticValue("api.context", "/api/v1/spans");
        this.httpServerListener = new HttpServerListener(sourceEventListener);
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        try {
            this.httpServer = HttpServer.create(new InetSocketAddress(this.host, this.port), 0);
        }
        catch (IOException e) {
            throw new ConnectionUnavailableException("Failed to instantiate HTTP Server");
        }
        HttpContext context = this.httpServer.createContext(this.apiContext);
        context.setHandler(this.httpServerListener);
        this.httpServer.start();
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Started HTTP Server started and receiving requests on http://" + this.host + ":" + this.port + this.apiContext));
        }
    }

    public void disconnect() {
        if (this.httpServer != null) {
            this.httpServer.stop(0);
            if (logger.isDebugEnabled()) {
                logger.debug((Object)"HTTP Server Shutdown");
            }
            this.httpServer = null;
        }
    }

    public void destroy() {
    }

    public void pause() {
    }

    public void resume() {
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> state) {
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{Map.class};
    }

    public static class HttpServerListener
    implements HttpHandler {
        private static final Logger logger = Logger.getLogger((String)HttpServerListener.class.getName());
        private SourceEventListener sourceEventListener;

        HttpServerListener(SourceEventListener sourceEventListener) {
            this.sourceEventListener = sourceEventListener;
        }

        @Override
        public void handle(HttpExchange httpExchange) throws IOException {
            try (InputStream inputStream = httpExchange.getRequestBody();){
                byte[] byteArray = IOUtils.toByteArray((InputStream)inputStream);
                String contentType = httpExchange.getRequestHeaders().getFirst("Content-Type");
                this.handleEventReceive(byteArray, contentType);
                httpExchange.sendResponseHeaders(200, -1L);
            }
            catch (IOException e) {
                httpExchange.sendResponseHeaders(500, -1L);
            }
            httpExchange.close();
        }

        private void handleEventReceive(byte[] byteArray, String contentType) {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Received message of type " + contentType));
            }
            List<ZipkinSpan> spans = null;
            try {
                spans = Codec.decodeData(byteArray);
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Decoded " + spans.size() + " Zipkin Spans"));
                }
            }
            catch (Throwable t) {
                logger.error((Object)"Failed to parse received tracing data", t);
            }
            if (spans != null) {
                for (ZipkinSpan span : spans) {
                    HashMap<String, Object> attributes = new HashMap<String, Object>();
                    attributes.put("traceId", span.getTraceId());
                    attributes.put("id", span.getId());
                    attributes.put("parentId", span.getParentId());
                    attributes.put("operationName", span.getName());
                    attributes.put("serviceName", span.getServiceName());
                    attributes.put("spanKind", span.getKind());
                    attributes.put("timestamp", span.getTimestamp() / 1000L);
                    attributes.put("duration", span.getDuration() / 1000L);
                    attributes.put("tags", gson.toJson(span.getTags()));
                    this.sourceEventListener.onEvent(attributes, new String[0]);
                    if (!logger.isDebugEnabled()) continue;
                    logger.debug((Object)("Emitted event - span " + span.getTraceId() + "-" + span.getId() + " to event source listener"));
                }
            }
        }
    }
}

