/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.statefun.testutils.function;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.metrics.Counter;
import org.apache.flink.statefun.sdk.metrics.Metrics;

class TestContext
implements Context {
    private final Address selfAddress;
    private final StatefulFunction function;
    private final Map<EgressIdentifier<?>, List<Object>> outputs;
    private final Queue<Envelope> messages;
    private final PriorityQueue<PendingMessage> pendingMessage;
    private Map<Address, List<Object>> responses;
    private Address from;
    private long watermark;

    TestContext(Address selfAddress, StatefulFunction function, Instant startTime) {
        this.selfAddress = Objects.requireNonNull(selfAddress);
        this.function = Objects.requireNonNull(function);
        this.watermark = startTime.toEpochMilli();
        this.messages = new ArrayDeque<Envelope>();
        this.pendingMessage = new PriorityQueue<PendingMessage>(Comparator.comparingLong(a -> a.timer));
        this.outputs = new HashMap();
    }

    public Address self() {
        return this.selfAddress;
    }

    public Address caller() {
        return this.from;
    }

    public void reply(Object message) {
        Address to = this.caller();
        if (to == null) {
            throw new IllegalStateException("The caller address is null");
        }
        this.send(to, message);
    }

    public void send(Address to, Object message) {
        if (to.equals((Object)this.selfAddress)) {
            this.messages.add(new Envelope(this.self(), to, message));
        }
        this.responses.computeIfAbsent(to, ignore -> new ArrayList()).add(message);
    }

    public <T> void send(EgressIdentifier<T> egress, T message) {
        this.outputs.computeIfAbsent(egress, ignore -> new ArrayList()).add(message);
    }

    public void sendAfter(Duration delay, Address to, Object message) {
        this.pendingMessage.add(new PendingMessage(new Envelope(this.self(), to, message), this.watermark + delay.toMillis(), null));
    }

    public void sendAfter(Duration delay, Address to, Object message, String cancellationToken) {
        Objects.requireNonNull(cancellationToken);
        this.pendingMessage.add(new PendingMessage(new Envelope(this.self(), to, message), this.watermark + delay.toMillis(), cancellationToken));
    }

    public void cancelDelayedMessage(String cancellationToken) {
        this.pendingMessage.removeIf(pendingMessage -> Objects.equals(pendingMessage.cancellationToken, cancellationToken));
    }

    public <M, T> void registerAsyncOperation(M metadata, CompletableFuture<T> future) {
        Object value = null;
        Throwable error = null;
        try {
            value = future.get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Failed to get results from async action", e);
        }
        catch (ExecutionException e) {
            error = e.getCause();
        }
        AsyncOperationResult.Status status = error == null ? AsyncOperationResult.Status.SUCCESS : AsyncOperationResult.Status.FAILURE;
        AsyncOperationResult result = new AsyncOperationResult(metadata, status, value, error);
        this.messages.add(new Envelope(this.self(), this.self(), result));
    }

    public Metrics metrics() {
        return name -> new Counter(){

            public void inc(long amount) {
            }

            public void dec(long amount) {
            }
        };
    }

    <T> List<T> getEgress(EgressIdentifier<T> identifier) {
        List values = this.outputs.getOrDefault(identifier, Collections.emptyList());
        return values;
    }

    Map<Address, List<Object>> invoke(Address from, Object message) {
        this.messages.add(new Envelope(from, null, message));
        return this.processAllMessages();
    }

    Map<Address, List<Object>> tick(Duration duration) {
        this.watermark += duration.toMillis();
        while (!this.pendingMessage.isEmpty() && this.pendingMessage.peek().timer <= this.watermark) {
            this.messages.add(this.pendingMessage.poll().envelope);
        }
        return this.processAllMessages();
    }

    private Map<Address, List<Object>> processAllMessages() {
        this.responses = new HashMap<Address, List<Object>>();
        while (!this.messages.isEmpty()) {
            Envelope envelope = this.messages.poll();
            if (envelope.to != null && !envelope.to.equals((Object)this.self())) {
                this.send(envelope.to, envelope.message);
                continue;
            }
            this.from = envelope.from;
            this.function.invoke((Context)this, envelope.message);
        }
        return this.responses;
    }

    private static class PendingMessage {
        Envelope envelope;
        String cancellationToken;
        long timer;

        PendingMessage(Envelope envelope, long timer, String cancellationToken) {
            this.envelope = envelope;
            this.timer = timer;
            this.cancellationToken = cancellationToken;
        }
    }

    private static class Envelope {
        Address from;
        Address to;
        Object message;

        Envelope(Address from, Address to, Object message) {
            this.from = from;
            this.to = to;
            this.message = message;
        }
    }
}

