package org.apache.rocketmq.streams.core;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RStreamsException;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
import org.apache.rocketmq.streams.core.running.WorkerThread;
import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/RocketMQStream.class */
public class RocketMQStream {
    private static final Logger logger = LoggerFactory.getLogger(RocketMQStream.class.getName());
    private static final AtomicInteger index = new AtomicInteger(1);
    private final TopologyBuilder topologyBuilder;
    private final Properties properties;
    private final List<WorkerThread> workerThreads = new ArrayList();
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(StreamConfig.SCHEDULED_THREAD_NUM.intValue(), runnable -> {
        return new Thread(runnable, "ScanIdleWindowThread_" + index.getAndIncrement());
    });

    public RocketMQStream(TopologyBuilder topologyBuilder, Properties properties) {
        this.topologyBuilder = topologyBuilder;
        this.properties = properties;
    }

    public synchronized void start() {
        String jobId = this.topologyBuilder.getJobId();
        if (this.started.get()) {
            logger.info("RocketMQStream has been started, jobId=[{}].", jobId);
            return;
        }
        this.started.compareAndSet(false, true);
        try {
            int intValue = StreamConfig.STREAMS_PARALLEL_THREAD_NUM.intValue();
            for (int i = 0; i < intValue; i++) {
                WorkerThread workerThread = new WorkerThread(String.join("_", Constant.WORKER_THREAD_NAME, jobId, String.valueOf(i)), this.topologyBuilder, this.properties, this.executor);
                workerThread.start();
                this.workerThreads.add(workerThread);
            }
        } catch (Throwable th) {
            logger.error("start RocketMQStream error, jobId=[{}].", jobId, th);
            throw new RStreamsException(th);
        }
    }

    public void stop() {
        Iterator<WorkerThread> it = this.workerThreads.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        this.workerThreads.clear();
        this.started.set(false);
    }

    public boolean isRunning() {
        return this.started.get();
    }
}
