package co.cask.cdap.api.dataset.lib.partitioned;

import ch.qos.logback.classic.Level;
import co.cask.cdap.api.dataset.lib.Partition;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import co.cask.cdap.api.dataset.lib.partitioned.PartitionAcceptor;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.util.List;
import javax.annotation.Nullable;

/* loaded from: input_file:co/cask/cdap/api/dataset/lib/partitioned/AbstractPartitionConsumer.class */
public abstract class AbstractPartitionConsumer implements PartitionConsumer {
    private final PartitionedFileSet partitionedFileSet;
    private final ConsumerConfiguration configuration;
    private final StatePersistor statePersistor;

    public abstract PartitionConsumerResult doConsume(ConsumerWorkingSet consumerWorkingSet, PartitionAcceptor partitionAcceptor);

    public abstract void doFinish(ConsumerWorkingSet consumerWorkingSet, List<PartitionKey> list, boolean z);

    public AbstractPartitionConsumer(PartitionedFileSet partitionedFileSet, StatePersistor statePersistor) {
        this(partitionedFileSet, statePersistor, ConsumerConfiguration.DEFAULT);
    }

    public AbstractPartitionConsumer(PartitionedFileSet partitionedFileSet, StatePersistor statePersistor, ConsumerConfiguration consumerConfiguration) {
        this.partitionedFileSet = partitionedFileSet;
        this.configuration = consumerConfiguration;
        this.statePersistor = statePersistor;
    }

    public PartitionedFileSet getPartitionedFileSet() {
        return this.partitionedFileSet;
    }

    public ConsumerConfiguration getConfiguration() {
        return this.configuration;
    }

    public StatePersistor getStatePersistor() {
        return this.statePersistor;
    }

    @Override // co.cask.cdap.api.dataset.lib.partitioned.PartitionConsumer
    public PartitionConsumerResult consumePartitions() {
        return consumePartitions(Level.OFF_INT);
    }

    @Override // co.cask.cdap.api.dataset.lib.partitioned.PartitionConsumer
    public PartitionConsumerResult consumePartitions(int i) {
        return consumePartitions(new PartitionAcceptor.Limit(i));
    }

    @Override // co.cask.cdap.api.dataset.lib.partitioned.PartitionConsumer
    public PartitionConsumerResult consumePartitions(PartitionAcceptor partitionAcceptor) {
        ConsumerWorkingSet readState = readState();
        PartitionConsumerResult doConsume = doConsume(readState, partitionAcceptor);
        this.statePersistor.persistState(readState.toBytes());
        return doConsume;
    }

    @Override // co.cask.cdap.api.dataset.lib.partitioned.PartitionConsumer
    public void onFinish(List<? extends Partition> list, boolean z) {
        List<PartitionKey> transform = Lists.transform(list, new Function<Partition, PartitionKey>() { // from class: co.cask.cdap.api.dataset.lib.partitioned.AbstractPartitionConsumer.1
            @Override // com.google.common.base.Function
            @Nullable
            public PartitionKey apply(Partition partition) {
                return partition.getPartitionKey();
            }
        });
        ConsumerWorkingSet readState = readState();
        doFinish(readState, transform, z);
        this.statePersistor.persistState(readState.toBytes());
    }

    private ConsumerWorkingSet readState() {
        byte[] readState = this.statePersistor.readState();
        return readState == null ? new ConsumerWorkingSet() : ConsumerWorkingSet.fromBytes(readState);
    }
}
