package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.class */
public class RocketMqConsumerThread implements Runnable {
    private final DefaultLitePullConsumer consumer;
    private final ConsumerMetadata metadata;
    private final LinkedBlockingQueue<Consumer<DefaultLitePullConsumer>> tasks = new LinkedBlockingQueue<>();

    public RocketMqConsumerThread(ConsumerMetadata consumerMetadata) {
        this.metadata = consumerMetadata;
        this.consumer = RocketMqAdminUtil.initDefaultLitePullConsumer(this.metadata.getBaseConfig(), !consumerMetadata.isEnabledCommitCheckpoint());
        try {
            this.consumer.start();
        } catch (MQClientException e) {
            throw new RocketMqConnectorException(RocketMqConnectorErrorCode.CONSUMER_START_ERROR, e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                try {
                    Consumer<DefaultLitePullConsumer> poll = this.tasks.poll(1L, TimeUnit.SECONDS);
                    if (poll != null) {
                        poll.accept(this.consumer);
                    }
                } catch (InterruptedException e) {
                    throw new RocketMqConnectorException(RocketMqConnectorErrorCode.CONSUME_THREAD_RUN_ERROR, e);
                }
            } finally {
                this.consumer.shutdown();
            }
        }
    }

    public LinkedBlockingQueue<Consumer<DefaultLitePullConsumer>> getTasks() {
        return this.tasks;
    }
}
