package org.apache.camel.component.wal;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.wal.EntryInfo;
import org.apache.camel.component.wal.LogEntry;
import org.apache.camel.resume.Deserializable;
import org.apache.camel.resume.Offset;
import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.resume.ResumeStrategyConfiguration;
import org.apache.camel.spi.annotations.JdkService;
import org.apache.camel.support.resume.OffsetKeys;
import org.apache.camel.support.resume.Offsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JdkService("write-ahead-resume-strategy")
/* loaded from: input_file:org/apache/camel/component/wal/WriteAheadResumeStrategy.class */
public class WriteAheadResumeStrategy implements ResumeStrategy, CamelContextAware {
    private static final Logger LOG = LoggerFactory.getLogger(WriteAheadResumeStrategy.class);
    private File logFile;
    private LogWriter logWriter;
    private ResumeStrategy resumeStrategy;
    private WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration;
    private CamelContext camelContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/camel/component/wal/WriteAheadResumeStrategy$DelegateCallback.class */
    public static class DelegateCallback implements ResumeStrategy.UpdateCallBack {
        private final ResumeStrategy.UpdateCallBack updateCallBack;
        private final ResumeStrategy.UpdateCallBack flushCallBack;

        public DelegateCallback(ResumeStrategy.UpdateCallBack updateCallBack, ResumeStrategy.UpdateCallBack updateCallBack2) {
            this.updateCallBack = updateCallBack;
            this.flushCallBack = updateCallBack2;
        }

        public void onUpdate(Throwable th) {
            this.flushCallBack.onUpdate(th);
            this.updateCallBack.onUpdate(th);
        }
    }

    public WriteAheadResumeStrategy() {
    }

    public WriteAheadResumeStrategy(WriteAheadResumeStrategyConfiguration writeAheadResumeStrategyConfiguration) {
        this.resumeStrategyConfiguration = writeAheadResumeStrategyConfiguration;
    }

    public void setAdapter(ResumeAdapter resumeAdapter) {
        this.resumeStrategy.setAdapter(resumeAdapter);
    }

    public ResumeAdapter getAdapter() {
        return this.resumeStrategy.getAdapter();
    }

    public <T extends Resumable> void updateLastOffset(T t) throws Exception {
        updateLastOffset((WriteAheadResumeStrategy) t, (ResumeStrategy.UpdateCallBack) null);
    }

    public <T extends Resumable> void updateLastOffset(T t, ResumeStrategy.UpdateCallBack updateCallBack) throws Exception {
        OffsetKey<?> offsetKey = t.getOffsetKey();
        Offset<?> lastOffset = t.getLastOffset();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Updating offset on Kafka with key {} to {}", offsetKey.getValue(), lastOffset.getValue());
        }
        updateLastOffset(offsetKey, lastOffset, updateCallBack);
    }

    private void handleResult(EntryInfo.CachedEntryInfo cachedEntryInfo, Throwable th) {
        try {
            if (th == null) {
                this.logWriter.updateState(cachedEntryInfo, LogEntry.EntryState.PROCESSED);
            } else {
                this.logWriter.updateState(cachedEntryInfo, LogEntry.EntryState.FAILED);
            }
        } catch (IOException e) {
            if (th == null) {
                LOG.error("Unable to update state: {}", e.getMessage(), e);
            } else {
                LOG.error("Unable to mark the record as failed: {}", e.getMessage(), e);
            }
        }
    }

    private void handleResult(PersistedLogEntry persistedLogEntry, Throwable th) {
        try {
            if (th == null) {
                this.logWriter.updateState(persistedLogEntry, LogEntry.EntryState.PROCESSED);
            } else {
                this.logWriter.updateState(persistedLogEntry, LogEntry.EntryState.FAILED);
            }
        } catch (IOException e) {
            if (th == null) {
                LOG.error("Unable to update state: {}", e.getMessage(), e);
            } else {
                LOG.error("Unable to mark the record as failed: {}", e.getMessage(), e);
            }
        }
    }

    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception {
        updateLastOffset(offsetKey, offset, null);
    }

    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, ResumeStrategy.UpdateCallBack updateCallBack) throws Exception {
        try {
            tryUpdateDelegate(offsetKey, offset, this.logWriter.append(new LogEntry(LogEntry.EntryState.NEW, 0, offsetKey.serialize().array(), 0, offset.serialize().array())), updateCallBack);
        } catch (IOException e) {
            LOG.error("Unable to append a new record to the transaction log. The system will try to update the record on the delegate strategy before forcing the failure");
            tryUpdateDelegate(offsetKey, offset, (EntryInfo.CachedEntryInfo) null, updateCallBack);
            throw e;
        }
    }

    private void tryUpdateDelegate(OffsetKey<?> offsetKey, Offset<?> offset, EntryInfo.CachedEntryInfo cachedEntryInfo, ResumeStrategy.UpdateCallBack updateCallBack) throws Exception {
        try {
            this.resumeStrategy.updateLastOffset(offsetKey, offset, resolveUpdateCallBack(cachedEntryInfo, updateCallBack));
        } catch (Throwable th) {
            if (cachedEntryInfo != null) {
                this.logWriter.updateState(cachedEntryInfo, LogEntry.EntryState.FAILED);
            } else {
                LOG.warn("Not updating the state on the transaction log before there's no entry information: it's likely that a previous attempt to append the record has failed and the system is now in error");
            }
            throw th;
        }
    }

    private void tryUpdateDelegate(OffsetKey<?> offsetKey, Offset<?> offset, PersistedLogEntry persistedLogEntry, ResumeStrategy.UpdateCallBack updateCallBack) throws Exception {
        try {
            this.resumeStrategy.updateLastOffset(offsetKey, offset, resolveUpdateCallBack(persistedLogEntry, updateCallBack));
        } catch (Throwable th) {
            this.logWriter.updateState(persistedLogEntry, LogEntry.EntryState.FAILED);
            throw th;
        }
    }

    private ResumeStrategy.UpdateCallBack resolveUpdateCallBack(EntryInfo.CachedEntryInfo cachedEntryInfo, ResumeStrategy.UpdateCallBack updateCallBack) {
        return updateCallBack == null ? th -> {
            handleResult(cachedEntryInfo, th);
        } : new DelegateCallback(updateCallBack, th2 -> {
            handleResult(cachedEntryInfo, th2);
        });
    }

    private ResumeStrategy.UpdateCallBack resolveUpdateCallBack(PersistedLogEntry persistedLogEntry, ResumeStrategy.UpdateCallBack updateCallBack) {
        return updateCallBack == null ? th -> {
            handleResult(persistedLogEntry, th);
        } : new DelegateCallback(updateCallBack, th2 -> {
            handleResult(persistedLogEntry, th2);
        });
    }

    public void loadCache() throws Exception {
        PersistedLogEntry readEntry;
        LogEntry.EntryState entryState;
        LOG.debug("Loading cache for the delegate strategy");
        this.resumeStrategy.loadCache();
        LOG.debug("Done loading cache for the delegate strategy");
        LogReader logReader = new LogReader(this.logFile);
        try {
            int i = 0;
            LOG.trace("Starting to read log entries");
            do {
                readEntry = logReader.readEntry();
                if (readEntry != null && ((entryState = readEntry.getEntryState()) == LogEntry.EntryState.NEW || entryState == LogEntry.EntryState.FAILED)) {
                    Deserializable adapter = this.resumeStrategy.getAdapter();
                    if (adapter instanceof Deserializable) {
                        Deserializable deserializable = adapter;
                        tryUpdateDelegate(OffsetKeys.of(deserializable.deserializeKey(ByteBuffer.wrap(readEntry.getKey()))), Offsets.of(deserializable.deserializeValue(ByteBuffer.wrap(readEntry.getValue()))), readEntry, (ResumeStrategy.UpdateCallBack) null);
                        i++;
                    }
                }
            } while (readEntry != null);
            LOG.trace("Finished reading log entries");
            if (i == 0) {
                this.logWriter.reset();
            }
            logReader.close();
        } catch (Throwable th) {
            try {
                logReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public void start() {
        try {
            this.logFile = this.resumeStrategyConfiguration.getLogFile();
            this.resumeStrategy = this.resumeStrategyConfiguration.getDelegateResumeStrategy();
            this.logWriter = new LogWriter(this.logFile, new DefaultLogSupervisor(this.resumeStrategyConfiguration.getSupervisorInterval(), this.camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "SingleNodeKafkaResumeStrategy", 1)));
            this.resumeStrategy.start();
        } catch (Exception e) {
            throw new RuntimeCamelException(e);
        }
    }

    public void stop() {
        LOG.trace("Stopping the delegate strategy");
        this.resumeStrategy.stop();
        LOG.trace("Done stopping the delegate strategy");
        LOG.trace("Closing the writer");
        this.logWriter.close();
        LOG.trace("Writer is closed");
    }

    public void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration) {
        this.resumeStrategyConfiguration = (WriteAheadResumeStrategyConfiguration) resumeStrategyConfiguration;
    }

    public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
        return this.resumeStrategyConfiguration;
    }

    public CamelContext getCamelContext() {
        return this.camelContext;
    }

    public void setCamelContext(CamelContext camelContext) {
        this.camelContext = camelContext;
    }
}
