package fun.langel.datawharf.drive;

import fun.langel.datawharf.Channels;
import fun.langel.datawharf.Consumer;
import fun.langel.datawharf.ConsumerThread;
import java.lang.reflect.Constructor;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:fun/langel/datawharf/drive/ConsumerDrive.class */
public class ConsumerDrive implements Drive {
    private ConsumerThread[] consumerThreads;
    private Class<? extends Consumer> consumerKlass;
    private int consumeThreadNum;
    private int consumeIdle;
    private AtomicBoolean running = new AtomicBoolean(false);
    private Channels channels;

    public ConsumerDrive(Class<? extends Consumer> cls, int i, int i2, Channels channels) {
        this.consumeThreadNum = 0;
        this.consumeIdle = 10;
        this.consumeThreadNum = i;
        this.consumerKlass = cls;
        this.consumeIdle = i2;
        this.channels = channels;
    }

    @Override // fun.langel.datawharf.drive.Drive
    public boolean running() {
        return this.running.get();
    }

    @Override // fun.langel.datawharf.drive.Drive
    public void begin() {
        createConsumeThreadsAndAllocateBuffer();
        this.running.set(true);
    }

    @Override // fun.langel.datawharf.drive.Drive
    public void finish() {
        shutdownConsumeThreads();
        this.running.set(false);
    }

    private void createConsumeThreadsAndAllocateBuffer() {
        this.consumerThreads = new ConsumerThread[this.consumeThreadNum];
        int length = this.consumerThreads.length;
        for (int i = 0; i < length; i++) {
            try {
                Constructor<? extends Consumer> constructor = this.consumerKlass.getConstructor(new Class[0]);
                constructor.setAccessible(true);
                ConsumerThread consumerThread = new ConsumerThread("data-wharf-consumer-" + i, constructor.newInstance(new Object[0]), this.consumeIdle);
                consumerThread.addDataSource(this.channels.getQueueBuffer(i % this.channels.getChannelSize()));
                consumerThread.start();
                this.consumerThreads[i] = consumerThread;
            } catch (Throwable th) {
            }
        }
    }

    private void shutdownConsumeThreads() {
        if (this.running.get()) {
            for (ConsumerThread consumerThread : this.consumerThreads) {
                consumerThread.shutdown();
                consumerThread.interrupt();
            }
            this.consumerThreads = null;
        }
    }
}
