/*
 * Decompiled with CFR 0.152.
 */
package de.stklcode.pubtrans.ura.reader;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.stklcode.pubtrans.ura.UraClientConfiguration;
import de.stklcode.pubtrans.ura.model.Trip;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

public class AsyncUraTripReader
implements AutoCloseable {
    private static final Integer RES_TYPE_PREDICTION = 1;
    private static final Integer RES_TYPE_URA_VERSION = 4;
    private final List<Consumer<Trip>> consumers;
    private final URI uri;
    private final UraClientConfiguration config;
    private JsonLineSubscriber subscriber;
    private CompletableFuture<Void> future;

    public AsyncUraTripReader(URI uri, Consumer<Trip> consumer) {
        this(uri, null, new ArrayList<Consumer<Trip>>(0));
        this.consumers.add(consumer);
    }

    public AsyncUraTripReader(URI uri, List<Consumer<Trip>> consumers) {
        this(uri, null, consumers);
    }

    public AsyncUraTripReader(URI uri, UraClientConfiguration config, List<Consumer<Trip>> consumers) {
        this.uri = uri;
        this.config = config;
        this.consumers = new ArrayList<Consumer<Trip>>(consumers);
    }

    public void open() {
        if (this.future != null) {
            throw new IllegalStateException("Reader already opened");
        }
        this.subscriber = new JsonLineSubscriber();
        HttpClient.Builder clientBuilder = HttpClient.newBuilder();
        if (this.config != null && this.config.getConnectTimeout() != null) {
            clientBuilder.connectTimeout(this.config.getConnectTimeout());
        }
        HttpRequest.Builder reqBuilder = HttpRequest.newBuilder(this.uri).GET();
        if (this.config != null && this.config.getTimeout() != null) {
            reqBuilder.timeout(this.config.getTimeout());
        }
        clientBuilder.build().sendAsync(reqBuilder.build(), HttpResponse.BodyHandlers.fromLineSubscriber(this.subscriber)).exceptionally(throwable -> {
            this.subscriber.onError((Throwable)throwable);
            return null;
        });
        this.future = this.subscriber.getState();
    }

    public void addConsumer(Consumer<Trip> consumer) {
        this.consumers.add(consumer);
    }

    @Override
    public void close() {
        if (this.future == null) {
            return;
        }
        this.subscriber.cancel();
        try {
            this.future.get(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            throw new IllegalStateException("Failed to close API connection", e);
        }
        catch (TimeoutException e) {
            this.future.cancel(true);
        }
        finally {
            this.future = null;
        }
    }

    private class JsonLineSubscriber
    implements Flow.Subscriber<String> {
        private final ObjectMapper mapper = new ObjectMapper();
        private final CompletableFuture<Void> state = new CompletableFuture();
        private Flow.Subscription subscription;
        private String version = null;

        private JsonLineSubscriber() {
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            this.subscription.request(1L);
        }

        @Override
        public void onNext(String item) {
            try {
                List l = (List)this.mapper.readValue(item, (JavaType)this.mapper.getTypeFactory().constructCollectionType(List.class, Serializable.class));
                if (l != null && !l.isEmpty()) {
                    if (((Serializable)l.get(0)).equals(RES_TYPE_URA_VERSION)) {
                        this.version = ((Serializable)l.get(1)).toString();
                    } else if (((Serializable)l.get(0)).equals(RES_TYPE_PREDICTION)) {
                        Trip trip = new Trip(l, this.version);
                        AsyncUraTripReader.this.consumers.forEach(c -> c.accept(trip));
                    }
                }
                this.subscription.request(1L);
            }
            catch (IOException e) {
                this.onError(e);
            }
        }

        @Override
        public void onError(Throwable throwable) {
            this.state.completeExceptionally(throwable);
        }

        @Override
        public void onComplete() {
            this.state.complete(null);
        }

        public CompletableFuture<Void> getState() {
            return this.state;
        }

        public void cancel() {
            this.state.complete(null);
            if (this.subscription != null) {
                this.subscription.cancel();
            }
        }
    }
}

