/*
 * Decompiled with CFR 0.152.
 */
package io.fluxzero.proxy;

import io.fluxzero.common.Guarantee;
import io.fluxzero.common.MessageType;
import io.fluxzero.common.Registration;
import io.fluxzero.common.api.Metadata;
import io.fluxzero.common.api.SerializedMessage;
import io.fluxzero.proxy.ProxySerializer;
import io.fluxzero.sdk.common.Message;
import io.fluxzero.sdk.common.serialization.Serializer;
import io.fluxzero.sdk.common.serialization.jackson.JacksonSerializer;
import io.fluxzero.sdk.configuration.client.Client;
import io.fluxzero.sdk.publishing.client.GatewayClient;
import io.fluxzero.sdk.publishing.correlation.DefaultCorrelationDataProvider;
import io.fluxzero.sdk.tracking.ConsumerConfiguration;
import io.fluxzero.sdk.tracking.Tracker;
import io.fluxzero.sdk.tracking.client.DefaultTracker;
import io.fluxzero.sdk.tracking.metrics.HandleMessageEvent;
import io.fluxzero.sdk.tracking.metrics.ProcessBatchEvent;
import io.fluxzero.sdk.web.WebRequest;
import io.fluxzero.sdk.web.WebRequestSettings;
import io.fluxzero.sdk.web.WebResponse;
import java.beans.ConstructorProperties;
import java.io.ByteArrayInputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForwardProxyConsumer
implements Consumer<List<SerializedMessage>> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ForwardProxyConsumer.class);
    private static final HttpClient httpClient = HttpClient.newBuilder().followRedirects(HttpClient.Redirect.NORMAL).connectTimeout(Duration.ofSeconds(5L)).build();
    protected static final WebRequestSettings defaultSettings = WebRequestSettings.builder().build();
    protected static final Serializer serializer = new ProxySerializer();
    protected static final Serializer metricsSerializer = new JacksonSerializer();
    protected final Map<String, Registration> runningConsumers = new ConcurrentHashMap<String, Registration>();
    private final Client client;
    private final String consumerName;
    private final Long minIndex;
    private final AtomicReference<Object> mainConsumer = new AtomicReference();

    public static Registration start(Client client) {
        ForwardProxyConsumer consumer = new ForwardProxyConsumer(client, defaultSettings.getConsumer(), null);
        consumer.runningConsumers.computeIfAbsent(defaultSettings.getConsumer(), c -> consumer.start());
        return () -> {
            Collection<Registration> running = consumer.runningConsumers.values();
            running.forEach(Registration::cancel);
            running.clear();
        };
    }

    protected Registration start() {
        log.info(this.isMainConsumer() ? "Starting consumer {}" : "Starting consumer {} at {}", (Object)this.consumerName, (Object)this.minIndex);
        return DefaultTracker.start((Consumer)this, (MessageType)MessageType.WEBREQUEST, (ConsumerConfiguration)ConsumerConfiguration.builder().name(this.consumerName).minIndex(this.minIndex).threads(4).build(), (Client)this.client);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void accept(List<SerializedMessage> serializedMessages) {
        Instant start = Instant.now();
        try {
            for (SerializedMessage s : serializedMessages) {
                try {
                    WebRequestSettings settings = this.getSettings(s);
                    if (this.consumerName.equals(settings.getConsumer())) {
                        URI uri = URI.create(WebRequest.getUrl((Metadata)s.getMetadata()));
                        if (!uri.isAbsolute()) continue;
                        this.handle(s, uri, settings);
                        continue;
                    }
                    if (!this.isMainConsumer()) continue;
                    this.runningConsumers.computeIfAbsent(settings.getConsumer(), c -> new ForwardProxyConsumer(this.client, (String)c, s.getIndex()).start());
                }
                catch (Throwable e) {
                    log.error("Failed to handle external request {}. Continuing..", (Object)s.getMessageId(), (Object)e);
                    try {
                        this.sendResponse(this.asWebResponse(e), s);
                    }
                    catch (Throwable e2) {
                        e2.addSuppressed(e);
                        log.error("Failed to send error response. Continuing..", e2);
                    }
                }
            }
        }
        finally {
            this.publishProcessBatchMetrics(start);
        }
    }

    protected void handle(SerializedMessage request, URI uri, WebRequestSettings settings) {
        WebResponse webResponse;
        Instant start = Instant.now();
        try {
            HttpRequest httpRequest = this.asHttpRequest(request, uri, settings);
            webResponse = this.executeRequest(httpRequest);
        }
        catch (Throwable e) {
            this.publishHandleMessageMetrics(request, true, start);
            throw e;
        }
        this.publishHandleMessageMetrics(request, false, start);
        this.sendResponse(webResponse, request);
    }

    protected HttpRequest asHttpRequest(SerializedMessage request, URI uri, WebRequestSettings settings) {
        HttpRequest.Builder builder = HttpRequest.newBuilder().version(HttpClient.Version.valueOf(settings.getHttpVersion().name())).timeout(settings.getTimeout());
        WebRequest.getHeaders((Metadata)request.getMetadata()).forEach((name, values) -> values.forEach(v -> builder.header((String)name, (String)v)));
        builder.uri(uri).method(WebRequest.getMethod((Metadata)request.getMetadata()), this.getBodyPublisher(request));
        return builder.build();
    }

    protected WebRequestSettings getSettings(SerializedMessage request) {
        return Optional.ofNullable((WebRequestSettings)request.getMetadata().get((Object)"settings", WebRequestSettings.class)).orElse(defaultSettings);
    }

    protected WebResponse executeRequest(HttpRequest httpRequest) {
        try {
            HttpResponse<byte[]> response = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofByteArray());
            return this.asWebResponse(response);
        }
        catch (Throwable e) {
            log.error("Failed to handle external request. Returning error.. ", e);
            return this.asWebResponse(e);
        }
    }

    protected void sendResponse(WebResponse response, SerializedMessage request) {
        Metadata responseMetadata = response.getMetadata().addIfAbsent(DefaultCorrelationDataProvider.INSTANCE.getCorrelationData(this.client, request, MessageType.WEBREQUEST));
        SerializedMessage serializedResponse = new SerializedMessage(serializer.serialize(response.getPayload()).withFormat("application/octet-stream"), responseMetadata, response.getMessageId(), Long.valueOf(response.getTimestamp().toEpochMilli()));
        serializedResponse.setRequestId(request.getRequestId());
        serializedResponse.setTarget(request.getSource());
        this.client.getGatewayClient(MessageType.WEBRESPONSE).append(Guarantee.NONE, new SerializedMessage[]{serializedResponse});
    }

    protected WebResponse asWebResponse(HttpResponse<byte[]> response) {
        WebResponse.Builder builder = WebResponse.builder();
        response.headers().map().forEach((name, values) -> values.forEach(v -> builder.header(name, v)));
        return builder.status(Integer.valueOf(response.statusCode())).payload((Object)response.body()).build();
    }

    protected WebResponse asWebResponse(Throwable e) {
        return WebResponse.builder().status(Integer.valueOf(502)).payload((Object)Optional.ofNullable(e.getMessage()).orElse("Exception while handling request in proxy").getBytes()).build();
    }

    protected HttpRequest.BodyPublisher getBodyPublisher(SerializedMessage request) {
        String type = request.getData().getType();
        if (type == null || Void.class.getName().equals(type) || ((byte[])request.getData().getValue()).length == 0) {
            return HttpRequest.BodyPublishers.noBody();
        }
        return HttpRequest.BodyPublishers.ofInputStream(() -> new ByteArrayInputStream((byte[])request.data().getValue()));
    }

    protected void publishHandleMessageMetrics(SerializedMessage request, boolean exceptionalResult, Instant start) {
        try {
            Metadata metadata = Metadata.of((Map)DefaultCorrelationDataProvider.INSTANCE.getCorrelationData(this.client, request, MessageType.WEBREQUEST));
            Message metricsMessage = new Message((Object)new HandleMessageEvent(this.consumerName, ForwardProxyConsumer.class.getSimpleName(), request.getIndex(), MessageType.WEBREQUEST, null, this.formatType(request), exceptionalResult, start.until(Instant.now(), ChronoUnit.NANOS), true), metadata);
            GatewayClient metricsGateway = this.client.getGatewayClient(MessageType.METRICS);
            metricsGateway.append(Guarantee.NONE, new SerializedMessage[]{metricsMessage.serialize(metricsSerializer)});
        }
        catch (Throwable e) {
            log.error("Failed to publish HandleMessage metrics", e);
        }
    }

    protected String formatType(SerializedMessage request) {
        try {
            return "%s %s".formatted(WebRequest.getMethod((Metadata)request.getMetadata()), WebRequest.getUrl((Metadata)request.getMetadata()));
        }
        catch (Exception ignored) {
            return request.getType();
        }
    }

    protected void publishProcessBatchMetrics(Instant start) {
        try {
            Metadata metadata = Metadata.of((Map)DefaultCorrelationDataProvider.INSTANCE.getCorrelationData(this.client, null, null));
            Tracker tracker = (Tracker)Tracker.current().orElseThrow();
            Message metricsMessage = new Message((Object)new ProcessBatchEvent(this.consumerName, tracker.getTrackerId(), MessageType.WEBREQUEST, null, tracker.getMessageBatch().getSegment(), tracker.getMessageBatch().getLastIndex(), tracker.getMessageBatch().getSize(), start.until(Instant.now(), ChronoUnit.NANOS)), metadata);
            GatewayClient metricsGateway = this.client.getGatewayClient(MessageType.METRICS);
            metricsGateway.append(Guarantee.NONE, new SerializedMessage[]{metricsMessage.serialize(metricsSerializer)});
        }
        catch (Throwable e) {
            log.error("Failed to publish HandleMessage metrics", e);
        }
    }

    @ConstructorProperties(value={"client", "consumerName", "minIndex"})
    @Generated
    protected ForwardProxyConsumer(Client client, String consumerName, Long minIndex) {
        this.client = client;
        this.consumerName = consumerName;
        this.minIndex = minIndex;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Generated
    protected boolean isMainConsumer() {
        Object $value = this.mainConsumer.get();
        if ($value == null) {
            AtomicReference<Object> atomicReference = this.mainConsumer;
            synchronized (atomicReference) {
                $value = this.mainConsumer.get();
                if ($value == null) {
                    boolean actualValue = this.minIndex == null;
                    $value = actualValue;
                    this.mainConsumer.set($value);
                }
            }
        }
        return (Boolean)$value;
    }
}

