/*
 * Decompiled with CFR 0.152.
 */
package io.aleph0.yap.messaging.test;

import io.aleph0.yap.core.Sink;
import io.aleph0.yap.core.Source;
import io.aleph0.yap.messaging.core.RelayMetrics;
import io.aleph0.yap.messaging.core.RelayProcessorWorker;
import io.aleph0.yap.messaging.test.Scheduler;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestRelayProcessorWorker<ValueT>
implements RelayProcessorWorker<ValueT> {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestRelayProcessorWorker.class);
    private final AtomicLong submittedMetrics = new AtomicLong(0L);
    private final AtomicLong acknowledgedMetrics = new AtomicLong(0L);
    private final AtomicLong awaitingMetrics = new AtomicLong(0L);
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    private final Scheduler scheduler;

    public TestRelayProcessorWorker() {
        this(Scheduler.defaultScheduler());
    }

    public TestRelayProcessorWorker(Scheduler scheduler) {
        this.scheduler = Objects.requireNonNull(scheduler, "scheduler");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(Source<ValueT> source, Sink<ValueT> sink) throws IOException, InterruptedException {
        try {
            AtomicReference<Object> failureCause = new AtomicReference<Object>(null);
            try {
                Object value = source.take();
                while (value != null) {
                    this.throwIfPresent(failureCause);
                    Duration delay = this.scheduler.schedule();
                    if (delay.isNegative()) {
                        throw new IllegalArgumentException("scheduler returned negative delay");
                    }
                    Object thevalue = value;
                    this.executor.schedule(() -> {
                        try {
                            sink.put(thevalue);
                            this.acknowledgedMetrics.incrementAndGet();
                            this.awaitingMetrics.decrementAndGet();
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            LOGGER.atError().setCause((Throwable)e).log("Interrupted while trying to put delayed message. Failing task...");
                            failureCause.compareAndSet(null, e);
                        }
                        catch (Throwable e) {
                            LOGGER.atError().setCause(e).log("Failed to put delayed message. Failing task...");
                            failureCause.compareAndSet(null, e);
                        }
                    }, delay.toNanos(), TimeUnit.NANOSECONDS);
                    this.submittedMetrics.incrementAndGet();
                    this.awaitingMetrics.incrementAndGet();
                    value = source.take();
                }
            }
            finally {
                this.executor.shutdown();
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
            }
            this.throwIfPresent(failureCause);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.atError().setCause((Throwable)e).log("Simulated relay interrupted. Failing task...");
            throw e;
        }
        catch (RuntimeException e) {
            LOGGER.atError().setCause((Throwable)e).log("Simulated relay failed. Failing task...");
            throw e;
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            LOGGER.atError().setCause(cause).log("Simulated relay failed. Failing task...");
            if (cause instanceof Error) {
                Error x = (Error)cause;
                throw x;
            }
            if (cause instanceof IOException) {
                IOException x = (IOException)cause;
                throw x;
            }
            if (cause instanceof RuntimeException) {
                RuntimeException x = (RuntimeException)cause;
                throw x;
            }
            if (cause instanceof Exception) {
                Exception x = (Exception)cause;
                throw new IOException("Simulated relay failed", x);
            }
            throw new AssertionError("Unexpected error", e);
        }
    }

    private void throwIfPresent(AtomicReference<Throwable> failureCause) throws InterruptedException, ExecutionException {
        Throwable fc = failureCause.get();
        if (fc != null) {
            if (fc instanceof Error) {
                Error e = (Error)fc;
                throw e;
            }
            if (fc instanceof InterruptedException) {
                InterruptedException e = (InterruptedException)fc;
                throw e;
            }
            if (fc instanceof Exception) {
                Exception e = (Exception)fc;
                throw new ExecutionException(e);
            }
            throw new AssertionError("Unexpected error", fc);
        }
    }

    public RelayMetrics checkMetrics() {
        long submitted = this.submittedMetrics.get();
        long acknowledged = this.acknowledgedMetrics.get();
        long awaiting = this.awaitingMetrics.get();
        return new RelayMetrics(submitted, acknowledged, awaiting);
    }

    public RelayMetrics flushMetrics() {
        RelayMetrics result = this.checkMetrics();
        this.submittedMetrics.set(0L);
        this.acknowledgedMetrics.set(0L);
        return result;
    }
}

