package co.cask.cdap.data2.transaction.stream;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data.file.FileReader;
import co.cask.cdap.data.file.ReadFilter;
import co.cask.cdap.data.file.filter.TTLReadFilter;
import co.cask.cdap.data.stream.MultiLiveStreamFileReader;
import co.cask.cdap.data.stream.StreamEventOffset;
import co.cask.cdap.data.stream.StreamFileOffset;
import co.cask.cdap.data.stream.StreamFileType;
import co.cask.cdap.data.stream.StreamUtils;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.transaction.queue.QueueConstants;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.StreamId;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/AbstractStreamFileConsumerFactory.class */
public abstract class AbstractStreamFileConsumerFactory implements StreamConsumerFactory {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamFileConsumerFactory.class);
    private final CConfiguration cConf;
    private final StreamAdmin streamAdmin;
    private final StreamConsumerStateStoreFactory stateStoreFactory;
    private final String tablePrefix = String.format("%s.%s", NamespaceId.SYSTEM.getEntityName(), QueueConstants.QueueType.STREAM.toString());

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStreamFileConsumerFactory(CConfiguration cConfiguration, StreamAdmin streamAdmin, StreamConsumerStateStoreFactory streamConsumerStateStoreFactory) {
        this.cConf = cConfiguration;
        this.streamAdmin = streamAdmin;
        this.stateStoreFactory = streamConsumerStateStoreFactory;
    }

    protected abstract StreamConsumer create(TableId tableId, StreamConfig streamConfig, ConsumerConfig consumerConfig, StreamConsumerStateStore streamConsumerStateStore, StreamConsumerState streamConsumerState, FileReader<StreamEventOffset, Iterable<StreamFileOffset>> fileReader, @Nullable ReadFilter readFilter) throws IOException;

    protected abstract void dropTable(TableId tableId) throws IOException;

    protected void getFileOffsets(Location location, Collection<? super StreamFileOffset> collection, int i) throws IOException {
        int i2 = this.cConf.getInt("stream.container.instances");
        String str = this.cConf.get("stream.file.prefix");
        for (int i3 = 0; i3 < i2; i3++) {
            collection.add(new StreamFileOffset(StreamUtils.createStreamLocation(location, str + '.' + i3, 0, StreamFileType.EVENT), 0L, i));
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumerFactory
    public final StreamConsumer create(StreamId streamId, String str, ConsumerConfig consumerConfig) throws IOException {
        StreamConfig ensureExists = StreamUtils.ensureExists(this.streamAdmin, streamId);
        TableId tableId = getTableId(streamId, str);
        StreamConsumerStateStore create = this.stateStoreFactory.create(ensureExists);
        StreamConsumerState streamConsumerState = create.get(consumerConfig.getGroupId(), consumerConfig.getInstanceId());
        return create(tableId, ensureExists, consumerConfig, create, streamConsumerState, createReader(ensureExists, streamConsumerState), new TTLReadFilter(ensureExists.getTTL()));
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumerFactory
    public void dropAll(StreamId streamId, String str, Iterable<Long> iterable) throws IOException {
        dropTable(getTableId(streamId, str));
        HashMap newHashMap = Maps.newHashMap();
        Iterator<Long> it = iterable.iterator();
        while (it.hasNext()) {
            newHashMap.put(it.next(), 0);
        }
        try {
            this.streamAdmin.configureGroups(streamId, newHashMap);
        } catch (Exception e) {
            Throwables.propagateIfPossible(e, IOException.class);
            throw new IOException(e);
        }
    }

    private TableId getTableId(StreamId streamId, String str) {
        return TableId.from(streamId.getNamespace(), String.format("%s.%s.%s", this.tablePrefix, streamId.getEntityName(), str));
    }

    private MultiLiveStreamFileReader createReader(final StreamConfig streamConfig, StreamConsumerState streamConsumerState) throws IOException {
        Location location = streamConfig.getLocation();
        Preconditions.checkNotNull(location, "Stream location is null for %s", new Object[]{streamConfig.getStreamId()});
        final int generation = StreamUtils.getGeneration(streamConfig);
        Location createGenerationLocation = StreamUtils.createGenerationLocation(location, generation);
        final long currentTimeMillis = System.currentTimeMillis();
        if (!Iterables.isEmpty(streamConsumerState.getState()) && Iterables.all(streamConsumerState.getState(), new Predicate<StreamFileOffset>() { // from class: co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumerFactory.1
            public boolean apply(StreamFileOffset streamFileOffset) {
                return !((streamFileOffset.getPartitionEnd() > (currentTimeMillis - streamConfig.getTTL()) ? 1 : (streamFileOffset.getPartitionEnd() == (currentTimeMillis - streamConfig.getTTL()) ? 0 : -1)) < 0) && (generation == streamFileOffset.getGeneration());
            }
        })) {
            LOG.info("Create file reader with consumer state: {}", streamConsumerState);
            MultiLiveStreamFileReader multiLiveStreamFileReader = new MultiLiveStreamFileReader(streamConfig, streamConsumerState.getState());
            multiLiveStreamFileReader.initialize();
            return multiLiveStreamFileReader;
        }
        long partitionStartTime = StreamUtils.getPartitionStartTime(currentTimeMillis - streamConfig.getPartitionDuration(), streamConfig.getPartitionDuration());
        long partitionStartTime2 = StreamUtils.getPartitionStartTime(currentTimeMillis - streamConfig.getTTL(), streamConfig.getPartitionDuration());
        for (Location location2 : createGenerationLocation.list()) {
            if (location2.isDirectory() && StreamUtils.isPartition(location2.getName())) {
                long partitionStartTime3 = StreamUtils.getPartitionStartTime(location2.getName());
                if (!(partitionStartTime3 < partitionStartTime2) && partitionStartTime3 < partitionStartTime) {
                    partitionStartTime = partitionStartTime3;
                }
            }
        }
        Location createPartitionLocation = StreamUtils.createPartitionLocation(createGenerationLocation, partitionStartTime, streamConfig.getPartitionDuration());
        ArrayList newArrayList = Lists.newArrayList();
        getFileOffsets(createPartitionLocation, newArrayList, generation);
        LOG.info("Empty consumer state. Create file reader with file offsets: groupId={}, instanceId={} states={}", new Object[]{Long.valueOf(streamConsumerState.getGroupId()), Integer.valueOf(streamConsumerState.getInstanceId()), newArrayList});
        MultiLiveStreamFileReader multiLiveStreamFileReader2 = new MultiLiveStreamFileReader(streamConfig, newArrayList);
        multiLiveStreamFileReader2.initialize();
        return multiLiveStreamFileReader2;
    }
}
