package com.github.bishoku.chunkprocessor;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:com/github/bishoku/chunkprocessor/ChunkProcessorHelper.class */
public class ChunkProcessorHelper<T> {
    private CompletableFuture<Void>[] taskList;
    private final BlockingQueue<T> source;
    private final AtomicBoolean finished;
    private final AtomicBoolean forceToFinish;
    private final AtomicBoolean started;
    private final AtomicInteger totalProcessedRecordCount;
    private final FinishListener finishListener;
    private final int chunkSize;
    private final long chunkInterval;
    private final int workerCount;
    private final ChunkProcessor<T> chunkProcessor;
    private final RecordProcessor<T> recordProcessor;
    private ExecutorService executor;

    @FunctionalInterface
    /* loaded from: input_file:com/github/bishoku/chunkprocessor/ChunkProcessorHelper$ChunkProcessor.class */
    public interface ChunkProcessor<T> {
        int process(List<T> list) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/github/bishoku/chunkprocessor/ChunkProcessorHelper$FinishListener.class */
    public interface FinishListener {
        void finished(Void r1);
    }

    @FunctionalInterface
    /* loaded from: input_file:com/github/bishoku/chunkprocessor/ChunkProcessorHelper$RecordProcessor.class */
    public interface RecordProcessor<T> {
        void process(T t) throws Exception;
    }

    public static <T> ChunkProcessorHelper<T> newInstance(RecordProcessor<T> recordProcessor) {
        return new ChunkProcessorHelper<>(recordProcessor);
    }

    public static <T> ChunkProcessorHelper<T> newInstance(ChunkProcessor<T> chunkProcessor) {
        return new ChunkProcessorHelper<>(chunkProcessor);
    }

    public static <T> ChunkProcessorHelper<T> newInstance(int i, RecordProcessor<T> recordProcessor) {
        return new ChunkProcessorHelper<>(i, recordProcessor);
    }

    public static <T> ChunkProcessorHelper<T> newInstance(int i, ChunkProcessor<T> chunkProcessor) {
        return new ChunkProcessorHelper<>(i, chunkProcessor);
    }

    private ChunkProcessorHelper(RecordProcessor<T> recordProcessor) {
        this(Runtime.getRuntime().availableProcessors(), recordProcessor);
    }

    private ChunkProcessorHelper(ChunkProcessor<T> chunkProcessor) {
        this(Runtime.getRuntime().availableProcessors(), chunkProcessor);
    }

    private ChunkProcessorHelper(int i, RecordProcessor<T> recordProcessor) {
        this(100, 500L, i, null, recordProcessor);
    }

    private ChunkProcessorHelper(int i, ChunkProcessor<T> chunkProcessor) {
        this(100, 500L, i, chunkProcessor, null);
    }

    private ChunkProcessorHelper(int i, long j, int i2, ChunkProcessor<T> chunkProcessor, RecordProcessor<T> recordProcessor) {
        this.source = new LinkedBlockingQueue();
        this.finished = new AtomicBoolean(false);
        this.forceToFinish = new AtomicBoolean(false);
        this.started = new AtomicBoolean(false);
        this.totalProcessedRecordCount = new AtomicInteger(0);
        this.chunkSize = i;
        this.chunkInterval = j;
        this.workerCount = i2;
        this.chunkProcessor = chunkProcessor;
        this.recordProcessor = recordProcessor;
        this.finishListener = r9 -> {
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(j * 2, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace(System.out);
            }
        };
    }

    public void start() {
        this.taskList = new CompletableFuture[this.workerCount];
        this.executor = Executors.newWorkStealingPool(this.workerCount);
        for (int i = 0; i < this.workerCount; i++) {
            Worker worker = null;
            if (this.chunkProcessor != null) {
                worker = new Worker(this.chunkProcessor, this.source, this.finished, this.forceToFinish, this.totalProcessedRecordCount, this.chunkSize, this.chunkInterval);
            } else if (this.recordProcessor != null) {
                worker = new Worker(this.recordProcessor, this.source, this.finished, this.forceToFinish, this.totalProcessedRecordCount, this.chunkSize, this.chunkInterval);
            }
            if (worker != null) {
                this.taskList[i] = CompletableFuture.runAsync(worker);
            }
        }
        this.started.set(true);
    }

    public void add(T t) {
        if (!this.started.get()) {
            throw new UnsupportedOperationException("start() method must be invoked first");
        }
        this.source.add(t);
    }

    public void add(List<T> list) {
        if (!this.started.get()) {
            throw new UnsupportedOperationException("start() method must be invoked first");
        }
        this.source.addAll(list);
    }

    public void finish() throws Exception {
        try {
            finishAsync().get();
        } catch (InterruptedException | ExecutionException e) {
            this.forceToFinish.set(true);
            throw e;
        }
    }

    public CompletableFuture<Void> finishAsync() {
        return internalFinish();
    }

    private CompletableFuture<Void> internalFinish() {
        this.finished.set(true);
        CompletableFuture<Void> allOf = CompletableFuture.allOf(this.taskList);
        FinishListener finishListener = this.finishListener;
        allOf.thenAccept(finishListener::finished);
        return allOf;
    }

    public int getProcessedRecordCount() {
        return this.totalProcessedRecordCount.get();
    }
}
