package org.apache.eventmesh.connector.standalone.broker.task;

import io.cloudevents.CloudEvent;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.connector.standalone.broker.StandaloneBroker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/connector/standalone/broker/task/SubScribeTask.class */
public class SubScribeTask implements Runnable {
    private String topicName;
    private StandaloneBroker standaloneBroker;
    private EventListener listener;
    private AtomicInteger offset;
    private final Logger logger = LoggerFactory.getLogger(SubScribeTask.class);
    private volatile boolean isRunning = true;

    /* renamed from: org.apache.eventmesh.connector.standalone.broker.task.SubScribeTask$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/eventmesh/connector/standalone/broker/task/SubScribeTask$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$eventmesh$api$EventMeshAction = new int[EventMeshAction.values().length];

        static {
            try {
                $SwitchMap$org$apache$eventmesh$api$EventMeshAction[EventMeshAction.CommitMessage.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$eventmesh$api$EventMeshAction[EventMeshAction.ReconsumeLater.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$eventmesh$api$EventMeshAction[EventMeshAction.ManualAck.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public SubScribeTask(String str, StandaloneBroker standaloneBroker, EventListener eventListener) {
        this.topicName = str;
        this.standaloneBroker = standaloneBroker;
        this.listener = eventListener;
    }

    @Override // java.lang.Runnable
    public void run() {
        CloudEvent message;
        CloudEvent message2;
        while (this.isRunning) {
            try {
                this.logger.debug("execute subscribe task, topic: {}, offset: {}", this.topicName, this.offset);
                if (this.offset == null && (message2 = this.standaloneBroker.getMessage(this.topicName)) != null) {
                    if (message2.getExtension("offset") != null) {
                        this.offset = new AtomicInteger(((Integer) message2.getExtension("offset")).intValue());
                    } else {
                        this.offset = new AtomicInteger(0);
                    }
                }
                if (this.offset != null && (message = this.standaloneBroker.getMessage(this.topicName, this.offset.get())) != null) {
                    this.listener.consume(message, new EventMeshAsyncConsumeContext() { // from class: org.apache.eventmesh.connector.standalone.broker.task.SubScribeTask.1
                        public void commit(EventMeshAction eventMeshAction) {
                            switch (AnonymousClass2.$SwitchMap$org$apache$eventmesh$api$EventMeshAction[eventMeshAction.ordinal()]) {
                                case 1:
                                    SubScribeTask.this.logger.info("message commit, topic: {}, current offset:{}", SubScribeTask.this.topicName, Integer.valueOf(SubScribeTask.this.offset.get()));
                                    return;
                                case 2:
                                default:
                                    return;
                                case 3:
                                    SubScribeTask.this.offset.incrementAndGet();
                                    SubScribeTask.this.logger.info("message ack, topic: {}, current offset:{}", SubScribeTask.this.topicName, Integer.valueOf(SubScribeTask.this.offset.get()));
                                    return;
                            }
                        }
                    });
                }
            } catch (Exception e) {
                Logger logger = this.logger;
                Object[] objArr = new Object[3];
                objArr[0] = this.topicName;
                objArr[1] = this.offset == null ? null : Integer.valueOf(this.offset.get());
                objArr[2] = e;
                logger.error("consumer error, topic: {}, offset: {}", objArr);
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e2) {
                Logger logger2 = this.logger;
                Object[] objArr2 = new Object[4];
                objArr2[0] = this.topicName;
                objArr2[1] = this.offset == null ? null : Integer.valueOf(this.offset.get());
                objArr2[2] = Thread.currentThread().getName();
                objArr2[3] = e2;
                logger2.error("Thread is interrupted, topic: {}, offset: {} thread name: {}", objArr2);
                Thread.currentThread().interrupt();
            }
        }
    }

    public void shutdown() {
        this.isRunning = false;
    }
}
