/*
 * Decompiled with CFR 0.152.
 */
package io.castled.core;

import io.castled.exceptions.CastledRuntimeException;
import io.castled.utils.ThreadUtils;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CastledBlockingQueue<T> {
    private static final Logger log = LoggerFactory.getLogger(CastledBlockingQueue.class);
    private final Consumer<T> consumer;
    private final ExecutorService executorService;
    private final BlockingQueue<T> blockingQueue;
    private volatile boolean shutDown = false;
    private boolean exitOnError = false;
    private volatile Exception failureException;

    public CastledBlockingQueue(Consumer<T> childConsumer, int parallelism, int maxCapacity, boolean exitOnError) {
        this.consumer = this.decorateConsumer(childConsumer);
        this.executorService = Executors.newFixedThreadPool(parallelism);
        log.info("Parallelism {} , Max Capacity {}", (Object)parallelism, (Object)maxCapacity);
        this.blockingQueue = new ArrayBlockingQueue<T>(maxCapacity);
        this.exitOnError = exitOnError;
        for (int i = 0; i < parallelism; ++i) {
            this.executorService.execute(() -> {
                while (!this.shutDown) {
                    try {
                        T payload = this.blockingQueue.poll(1L, TimeUnit.SECONDS);
                        if (payload == null) continue;
                        this.consumer.accept(payload);
                    }
                    catch (Exception e) {
                        log.error("Blocking queue Consumer failed", (Throwable)e);
                        if (!exitOnError) continue;
                        this.failureException = e;
                        break;
                    }
                }
            });
        }
    }

    public Consumer<T> decorateConsumer(Consumer<T> consumer) {
        return consumer;
    }

    public void writePayload(T payload, int timeout, TimeUnit timeUnit) throws TimeoutException {
        if (this.failureException != null) {
            this.shutdownNow();
            throw new CastledRuntimeException(this.failureException);
        }
        try {
            boolean success = this.blockingQueue.offer(payload, timeout, timeUnit);
            if (!success) {
                throw new TimeoutException();
            }
        }
        catch (InterruptedException e) {
            log.error("Blocking queue write payload interrupted", (Throwable)e);
            throw new CastledRuntimeException(e);
        }
    }

    public void flush(long timeOutMs) throws TimeoutException {
        while (!this.blockingQueue.isEmpty()) {
            if (this.exitOnError && this.failureException != null) {
                this.shutdownNow();
                throw new CastledRuntimeException(this.failureException);
            }
            ThreadUtils.interruptIgnoredSleep(500L);
        }
        this.shutDown = true;
        this.executorService.shutdown();
        try {
            this.executorService.awaitTermination(60L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            this.executorService.shutdownNow();
        }
        if (this.exitOnError && this.failureException != null) {
            this.shutdownNow();
            throw new CastledRuntimeException(this.failureException);
        }
    }

    private void shutdownNow() {
        this.shutDown = true;
        this.executorService.shutdownNow();
    }
}

