/*
 * Decompiled with CFR 0.152.
 */
package nakadi;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import nakadi.BatchItemResponse;
import nakadi.BatchItemResponseCollection;
import nakadi.CompressionSupport;
import nakadi.ContentSupplier;
import nakadi.EventRecord;
import nakadi.EventResource;
import nakadi.JsonSupport;
import nakadi.MetricCollector;
import nakadi.NakadiClient;
import nakadi.NakadiException;
import nakadi.Problem;
import nakadi.ResourceLink;
import nakadi.ResourceOptions;
import nakadi.ResourceSupport;
import nakadi.Response;
import nakadi.ResponseBody;
import nakadi.RetryPolicy;
import nakadi.UriBuilder;
import nakadi.VisibleForTesting;
import nakadi.shadow.com.google.common.base.Charsets;
import nakadi.shadow.com.google.common.base.Joiner;
import nakadi.shadow.com.google.common.collect.Lists;
import nakadi.shadow.com.google.common.collect.Maps;
import nakadi.shadow.com.google.gson.reflect.TypeToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventResourceReal
implements EventResource {
    private static final Logger logger = LoggerFactory.getLogger((String)NakadiClient.class.getSimpleName());
    private static final String PATH_EVENT_TYPES = "event-types";
    private static final String PATH_COLLECTION = "events";
    private static final String APPLICATION_JSON = "application/json";
    private static final List<ResourceLink> LINKS_SENTINEL = Lists.newArrayList();
    private static final Map<String, Object> SENTINEL_HEADERS = Maps.newHashMap();
    private static Type TYPE_BIR = new TypeToken<List<BatchItemResponse>>(){}.getType();
    private final NakadiClient client;
    private final JsonSupport jsonSupport;
    private volatile RetryPolicy retryPolicy;
    private volatile String flowId;
    private boolean enablePublishingCompression;
    private final CompressionSupport compressionSupport;

    public EventResourceReal(NakadiClient client) {
        this(client, client.jsonSupport(), client.compressionSupport());
    }

    @VisibleForTesting
    EventResourceReal(NakadiClient client, JsonSupport jsonSupport, CompressionSupport compressionSupport) {
        this.client = client;
        this.jsonSupport = jsonSupport;
        this.compressionSupport = compressionSupport;
        if (client != null && client.enablePublishingCompression()) {
            this.enablePublishingCompression = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Response timed(Supplier<Response> sender, NakadiClient client, int eventCount) {
        long start = System.nanoTime();
        Response response = null;
        try {
            Response response2 = response = sender.get();
            return response2;
        }
        finally {
            if (response != null) {
                EventResourceReal.emitMetric(client, response, eventCount);
            }
            client.metricCollector().duration(MetricCollector.Timer.eventSend, System.nanoTime() - start, TimeUnit.NANOSECONDS);
        }
    }

    private static void emitMetric(NakadiClient client, Response response, int eventCount) {
        if (response.statusCode() >= 200 && response.statusCode() <= 204) {
            client.metricCollector().mark(MetricCollector.Meter.sent, eventCount);
        }
        if (response.statusCode() == 207) {
            client.metricCollector().mark(MetricCollector.Meter.http207);
        }
    }

    @Override
    @Deprecated
    public EventResource scope(String scope) {
        return this;
    }

    @Override
    public EventResource retryPolicy(RetryPolicy retryPolicy) {
        this.retryPolicy = retryPolicy;
        return this;
    }

    @Override
    public EventResource flowId(String flowId) {
        this.flowId = flowId;
        return this;
    }

    @Override
    public final <T> Response send(String eventTypeName, Collection<T> events) {
        return this.send(eventTypeName, (T)events, SENTINEL_HEADERS);
    }

    @Override
    public <T> Response send(String eventTypeName, Collection<T> events, Map<String, Object> headers) {
        NakadiException.throwNonNull(eventTypeName, "Please provide an event type name");
        NakadiException.throwNonNull(events, "Please provide one or more events");
        NakadiException.throwNonNull(headers, "Please provide some headers");
        if (events.size() == 0) {
            throw new NakadiException(Problem.localProblem("event send called with zero events", ""));
        }
        List<EventRecord<T>> collect = events.stream().map(e -> new EventRecord<Object>(eventTypeName, e)).collect(Collectors.toList());
        if (((EventRecord)collect.get(0)).event() instanceof String) {
            return this.sendUsingSupplier(eventTypeName, () -> ("[" + Joiner.on(",").join(events) + "]").getBytes(Charsets.UTF_8), headers);
        }
        return this.sendBatchOfEvents(collect, headers);
    }

    @Override
    public <T> Response send(String eventTypeName, T event) {
        return this.send(eventTypeName, event, SENTINEL_HEADERS);
    }

    @Override
    public <T> Response send(String eventTypeName, T event, Map<String, Object> headers) {
        NakadiException.throwNonNull(eventTypeName, "Please provide an event type name");
        NakadiException.throwNonNull(event, "Please provide an event");
        NakadiException.throwNonNull(headers, "Please provide some headers");
        if (event instanceof String) {
            ContentSupplier supplier = this.enablePublishingCompression ? this.supplyStringAsCompressedAndSetHeaders("[" + event + "]", headers) : () -> ("[" + event + "]").getBytes(Charsets.UTF_8);
            ContentSupplier finalSupplier = supplier;
            return this.sendUsingSupplier(eventTypeName, finalSupplier, headers);
        }
        ArrayList events = new ArrayList(1);
        Collections.addAll(events, event);
        return this.send(eventTypeName, (T)events);
    }

    @Override
    public <T> BatchItemResponseCollection sendBatch(String eventTypeName, List<T> events) {
        return this.sendBatch(eventTypeName, events, SENTINEL_HEADERS);
    }

    @Override
    public <T> BatchItemResponseCollection sendBatch(String eventTypeName, List<T> events, Map<String, Object> headers) {
        ArrayList<BatchItemResponse> items = Lists.newArrayList();
        try (Response send = this.send(eventTypeName, (T)events, headers);){
            if (send.statusCode() == 207 || send.statusCode() == 422) {
                ResponseBody responseBody = send.responseBody();
                items.addAll((Collection)this.jsonSupport.fromJson(responseBody.asReader(), TYPE_BIR));
            }
        }
        return new BatchItemResponseCollection((List<BatchItemResponse>)items, LINKS_SENTINEL, this.client);
    }

    private Response sendUsingSupplier(String eventTypeName, ContentSupplier supplier, Map<String, Object> headers) {
        return EventResourceReal.timed(() -> this.client.resourceProvider().newResource().retryPolicy(this.retryPolicy).postEventsThrowing(this.collectionUri(eventTypeName).buildString(), this.options(headers), supplier), this.client, 1);
    }

    private <T> Response sendBatchOfEvents(List<EventRecord<T>> events, Map<String, Object> headers) {
        NakadiException.throwNonNull(events, "Please provide one or more event records");
        String topic = events.get(0).eventType();
        List eventList = events.stream().map(this::mapEventRecordToSerdes).collect(Collectors.toList());
        ContentSupplier supplier = this.enablePublishingCompression ? this.supplyObjectAsCompressedAndSetHeaders(eventList, headers) : () -> this.jsonSupport.toJsonBytesCompressed(eventList);
        ContentSupplier finalSupplier = supplier;
        return EventResourceReal.timed(() -> this.client.resourceProvider().newResource().retryPolicy(this.retryPolicy).postEventsThrowing(this.collectionUri(topic).buildString(), this.options(headers), finalSupplier), this.client, eventList.size());
    }

    @VisibleForTesting
    <T> Object mapEventRecordToSerdes(EventRecord<T> er) {
        return this.jsonSupport.transformEventRecord(er);
    }

    private ResourceOptions options(Map<String, Object> headers) {
        ResourceOptions options = ResourceSupport.options(APPLICATION_JSON);
        options.tokenProvider(this.client.resourceTokenProvider());
        if (this.flowId != null) {
            options.flowId(this.flowId);
        }
        options.headers(headers);
        return options;
    }

    private UriBuilder collectionUri(String topic) {
        return UriBuilder.builder(this.client.baseURI()).path(PATH_EVENT_TYPES).path(topic).path(PATH_COLLECTION);
    }

    private <T> ContentSupplier supplyObjectAsCompressedAndSetHeaders(T sending, Map<String, Object> headers) {
        byte[] json = this.jsonSupport.toJsonBytesCompressed(sending);
        return this.supplyBytesAsCompressedAndSetHeaders(json, headers);
    }

    private <T> ContentSupplier supplyStringAsCompressedAndSetHeaders(String sending, Map<String, Object> headers) {
        byte[] json = sending.getBytes(Charsets.UTF_8);
        return this.supplyBytesAsCompressedAndSetHeaders(json, headers);
    }

    private ContentSupplier supplyBytesAsCompressedAndSetHeaders(byte[] json, Map<String, Object> headers) {
        byte[] compressed = this.compressionSupport.compress(json);
        ContentSupplier supplier = () -> compressed;
        headers.put("Content-Length", compressed.length);
        headers.put("Content-Encoding", this.compressionSupport.name());
        return supplier;
    }
}

