package org.apache.fluo.core.worker;

import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.core.async.AsyncCommitObserver;
import org.apache.fluo.core.async.AsyncTransaction;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.impl.TransactionImpl;
import org.apache.fluo.core.log.TracingTransaction;
import org.apache.fluo.core.util.Hex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/fluo/core/worker/WorkTaskAsync.class */
public class WorkTaskAsync implements Runnable {
    private static Logger log = LoggerFactory.getLogger(WorkTaskAsync.class);
    private Environment env;
    private Notification notification;
    private Observers observers;
    private NotificationFinder notificationFinder;
    private NotificationProcessor notificationProcessor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/fluo/core/worker/WorkTaskAsync$WorkTaskCommitObserver.class */
    public class WorkTaskCommitObserver implements AsyncCommitObserver {
        WorkTaskCommitObserver() {
        }

        @Override // org.apache.fluo.core.async.AsyncCommitObserver
        public void committed() {
            WorkTaskAsync.this.notificationProcessor.notificationProcessed(WorkTaskAsync.this.notification);
        }

        @Override // org.apache.fluo.core.async.AsyncCommitObserver
        public void failed(Throwable th) {
            WorkTaskAsync.this.notificationFinder.failedToProcess(WorkTaskAsync.this.notification, TxResult.ERROR);
            WorkTaskAsync.this.notificationProcessor.notificationProcessed(WorkTaskAsync.this.notification);
            WorkTaskAsync.log.error("Failed to process work " + Hex.encNonAscii(WorkTaskAsync.this.notification), th);
        }

        @Override // org.apache.fluo.core.async.AsyncCommitObserver
        public void alreadyAcknowledged() {
            WorkTaskAsync.this.notificationFinder.failedToProcess(WorkTaskAsync.this.notification, TxResult.AACKED);
            WorkTaskAsync.this.notificationProcessor.notificationProcessed(WorkTaskAsync.this.notification);
        }

        @Override // org.apache.fluo.core.async.AsyncCommitObserver
        public void commitFailed() {
            WorkTaskAsync.this.notificationProcessor.requeueNotification(WorkTaskAsync.this.notificationFinder, WorkTaskAsync.this.notification);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WorkTaskAsync(NotificationProcessor notificationProcessor, NotificationFinder notificationFinder, Environment environment, Notification notification, Observers observers) {
        this.notificationProcessor = notificationProcessor;
        this.notificationFinder = notificationFinder;
        this.env = environment;
        this.notification = notification;
        this.observers = observers;
    }

    @Override // java.lang.Runnable
    public void run() {
        Observer observer = this.observers.getObserver(this.notification.getColumn());
        try {
            try {
                AsyncTransaction transactionImpl = new TransactionImpl(this.env, this.notification);
                if (TracingTransaction.isTracingEnabled()) {
                    transactionImpl = new TracingTransaction(transactionImpl, this.notification, observer.getClass());
                }
                observer.process(transactionImpl, this.notification.getRow(), this.notification.getColumn());
                this.env.getSharedResources().getCommitManager().beginCommit(transactionImpl, observer.getClass(), new WorkTaskCommitObserver());
                this.observers.returnObserver(observer);
            } catch (Exception e) {
                log.error("Failed to process work " + Hex.encNonAscii(this.notification), e);
                this.observers.returnObserver(observer);
            }
        } catch (Throwable th) {
            this.observers.returnObserver(observer);
            throw th;
        }
    }
}
