/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core.test;

import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.internal.serialization.impl.DefaultSerializationServiceBuilder;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.execution.OutboxImpl;
import com.hazelcast.jet.impl.execution.OutboxInternal;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import java.time.LocalTime;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

public final class TestOutbox
implements OutboxInternal {
    private final Queue<Object>[] buckets;
    private final Queue<Map.Entry<Object, Object>> snapshotQueue = new ArrayDeque<Map.Entry<Object, Object>>();
    private final OutboxImpl outbox;
    private final SerializationService serializationService;
    private final int[] allOrdinals;

    public TestOutbox(int ... capacities) {
        this(capacities, 0);
    }

    public TestOutbox(int[] edgeCapacities, int snapshotCapacity) {
        Preconditions.checkNotNegative(snapshotCapacity, "snapshotCapacity must be >= 0 (0 for no snapshot queue)");
        this.buckets = new Queue[edgeCapacities.length];
        Arrays.setAll(this.buckets, i -> new ArrayDeque());
        this.allOrdinals = IntStream.range(0, edgeCapacities.length).toArray();
        OutboundCollector[] outstreams = new OutboundCollector[edgeCapacities.length + (snapshotCapacity > 0 ? 1 : 0)];
        Arrays.setAll(outstreams, i -> i < edgeCapacities.length ? e -> TestOutbox.addToQueue(this.buckets[i], edgeCapacities[i], e) : e -> TestOutbox.addToQueue(this.snapshotQueue, snapshotCapacity, this.deserializeSnapshotEntry((Map.Entry)e)));
        this.serializationService = new DefaultSerializationServiceBuilder().build();
        this.outbox = new OutboxImpl(outstreams, snapshotCapacity > 0, new ProgressTracker(), this.serializationService, Integer.MAX_VALUE, new AtomicLongArray(outstreams.length + (snapshotCapacity > 0 ? 1 : 0)));
        this.outbox.reset();
    }

    private static <E> ProgressState addToQueue(Queue<? super E> queue, int capacity, E o) {
        if (capacity > queue.size()) {
            queue.offer(o);
            return ProgressState.DONE;
        }
        return ProgressState.NO_PROGRESS;
    }

    @Override
    public int bucketCount() {
        return this.outbox.bucketCount();
    }

    @Override
    public boolean offer(int ordinal, @Nonnull Object item) {
        int[] nArray;
        if (ordinal == -1) {
            nArray = this.allOrdinals;
        } else {
            int[] nArray2 = new int[1];
            nArray = nArray2;
            nArray2[0] = ordinal;
        }
        return this.offer(nArray, item);
    }

    @Override
    public boolean offer(@Nonnull Object item) {
        return this.offer(this.allOrdinals, item);
    }

    @Override
    public boolean offer(@Nonnull int[] ordinals, @Nonnull Object item) {
        return this.outbox.offer(ordinals, item);
    }

    @Override
    public boolean offerToSnapshot(@Nonnull Object key, @Nonnull Object value) {
        return this.outbox.offerToSnapshot(key, value);
    }

    public <T> Queue<T> queue(int ordinal) {
        return this.buckets[ordinal];
    }

    public Queue<Map.Entry<Object, Object>> snapshotQueue() {
        return this.snapshotQueue;
    }

    public <T> void drainQueueAndReset(int queueOrdinal, Collection<T> target, boolean logItems) {
        this.drainInternal(this.queue(queueOrdinal), target, logItems, "Output-" + queueOrdinal);
    }

    public <T> void drainQueuesAndReset(List<? extends Collection<T>> target, boolean logItems) {
        for (int ordinal : this.allOrdinals) {
            this.drainQueueAndReset(ordinal, target.get(ordinal), logItems);
        }
    }

    public <K, V> void drainSnapshotQueueAndReset(Collection<? super Map.Entry<K, V>> target, boolean logItems) {
        this.drainInternal(this.snapshotQueue(), target, logItems, "Output-ss");
    }

    private <K, V> Map.Entry<K, V> deserializeSnapshotEntry(Map.Entry<Data, Data> t) {
        return Util.entry(this.serializationService.toObject(t.getKey()), this.serializationService.toObject(t.getValue()));
    }

    private <T> void drainInternal(Queue<? extends T> q, Collection<? super T> target, boolean logItems, String prefix) {
        T o;
        while ((o = q.poll()) != null) {
            target.add(o);
            if (!logItems) continue;
            System.out.println(LocalTime.now() + " " + prefix + ": " + o);
        }
        this.reset();
    }

    @Override
    public void reset() {
        this.outbox.reset();
    }

    @Override
    public boolean hasUnfinishedItem() {
        return this.outbox.hasUnfinishedItem();
    }

    @Override
    public void block() {
        this.outbox.block();
    }

    @Override
    public void unblock() {
        this.outbox.unblock();
    }

    @Override
    public long lastForwardedWm(byte key) {
        return this.outbox.lastForwardedWm(key);
    }

    public String toString() {
        return Arrays.toString(this.buckets);
    }
}

