package org.apache.fluo.recipes.core.export;

import com.google.common.collect.Iterators;
import java.util.Iterator;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.recipes.core.serialization.SimpleSerializer;

/* loaded from: input_file:org/apache/fluo/recipes/core/export/ExportObserverImpl.class */
class ExportObserverImpl<K, V> implements Observer {
    private String queueId;
    private Class<K> keyType;
    private Class<V> valType;
    SimpleSerializer serializer;
    private org.apache.fluo.recipes.core.export.function.Exporter<K, V> exporter;
    private long memLimit;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ExportObserverImpl(String str, FluentConfigurator fluentConfigurator, SimpleSerializer simpleSerializer, org.apache.fluo.recipes.core.export.function.Exporter<K, V> exporter) throws Exception {
        this.queueId = str;
        this.keyType = (Class<K>) getClass().getClassLoader().loadClass(fluentConfigurator.keyType);
        this.valType = (Class<V>) getClass().getClassLoader().loadClass(fluentConfigurator.valueType);
        this.exporter = exporter;
        this.serializer = simpleSerializer;
        this.memLimit = fluentConfigurator.getBufferSize();
    }

    public void process(TransactionBase transactionBase, Bytes bytes, Column column) throws Exception {
        ExportBucket exportBucket = new ExportBucket(transactionBase, bytes);
        Bytes continueRow = exportBucket.getContinueRow();
        Iterator<ExportEntry> exportIterator = exportBucket.getExportIterator(continueRow);
        MemLimitIterator memLimitIterator = new MemLimitIterator(exportIterator, this.memLimit, 8 + this.queueId.length());
        this.exporter.export(Iterators.consumingIterator(Iterators.transform(memLimitIterator, exportEntry -> {
            return new SequencedExport(this.serializer.deserialize(exportEntry.key, this.keyType), this.serializer.deserialize(exportEntry.value, this.valType), exportEntry.seq);
        })));
        if (exportIterator.hasNext() || continueRow != null) {
            exportBucket.notifyExportObserver();
        }
        if (exportIterator.hasNext() && !memLimitIterator.hasNext()) {
            exportBucket.setContinueRow(exportIterator.next());
            continueRow = null;
        }
        if (continueRow != null) {
            exportBucket.clearContinueRow();
        }
    }
}
