package co.cask.cdap.data2.transaction.stream;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data.file.FileReader;
import co.cask.cdap.data.file.ReadFilter;
import co.cask.cdap.data.file.ReadFilters;
import co.cask.cdap.data.stream.StreamEventOffset;
import co.cask.cdap.data.stream.StreamFileOffset;
import co.cask.cdap.data.stream.StreamUtils;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.KeyValue;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.DequeueResult;
import co.cask.cdap.data2.queue.DequeueStrategy;
import co.cask.cdap.data2.transaction.queue.ConsumerEntryState;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.proto.Id;
import co.cask.tephra.Transaction;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/AbstractStreamFileConsumer.class */
public abstract class AbstractStreamFileConsumer implements StreamConsumer {
    protected static final int MAX_SCAN_ROWS = 1000;
    protected final byte[] stateColumnName;
    private final long txTimeoutNano;
    private final Id.Stream streamName;
    private final StreamConfig streamConfig;
    private final ConsumerConfig consumerConfig;
    private final StreamConsumerStateStore consumerStateStore;
    private final FileReader<StreamEventOffset, Iterable<StreamFileOffset>> reader;
    private final ReadFilter readFilter;
    private final Map<byte[], SortedMap<byte[], byte[]>> entryStates;
    private final Set<byte[]> entryStatesScanCompleted;
    private final StreamConsumerState consumerState;
    private final List<StreamEventOffset> eventCache;
    private Transaction transaction;
    private List<PollStreamEvent> polledEvents;
    private long nextPersistStateTime;
    private boolean committed;
    private boolean closed;
    private StreamConsumerState lastPersistedState;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamFileConsumer.class);
    private static final HashFunction ROUND_ROBIN_HASHER = Hashing.murmur3_32();
    private static final long STATE_PERSIST_MIN_INTERVAL = TimeUnit.SECONDS.toNanos(1);
    private static final DequeueResult<StreamEvent> EMPTY_RESULT = DequeueResult.Empty.result();
    private static final Function<PollStreamEvent, byte[]> EVENT_ROW_KEY = new Function<PollStreamEvent, byte[]>() { // from class: co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer.1
        public byte[] apply(PollStreamEvent pollStreamEvent) {
            return pollStreamEvent.getStateRow();
        }
    };
    private static final Comparator<byte[]> ROW_PREFIX_COMPARATOR = new Comparator<byte[]>() { // from class: co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer.2
        @Override // java.util.Comparator
        public int compare(byte[] bArr, byte[] bArr2) {
            return Bytes.compareTo(bArr, 0, bArr.length - 8, bArr2, 0, bArr2.length - 8);
        }
    };
    private static final Function<PollStreamEvent, StreamEventOffset> CONVERT_STREAM_EVENT_OFFSET = new Function<PollStreamEvent, StreamEventOffset>() { // from class: co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer.3
        public StreamEventOffset apply(PollStreamEvent pollStreamEvent) {
            return pollStreamEvent.getStreamEventOffset();
        }
    };
    private static final Function<PollStreamEvent, StreamEvent> CONVERT_STREAM_EVENT = new Function<PollStreamEvent, StreamEvent>() { // from class: co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer.4
        public StreamEvent apply(PollStreamEvent pollStreamEvent) {
            return pollStreamEvent;
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data2/transaction/stream/AbstractStreamFileConsumer$PollStreamEvent.class */
    public static final class PollStreamEvent extends StreamEvent {
        private final byte[] stateRow;
        private final StreamEventOffset streamEventOffset;

        protected PollStreamEvent(StreamEventOffset streamEventOffset, byte[] bArr) {
            super(streamEventOffset);
            this.streamEventOffset = streamEventOffset;
            this.stateRow = bArr;
        }

        public StreamEventOffset getStreamEventOffset() {
            return this.streamEventOffset;
        }

        /* renamed from: getBody, reason: merged with bridge method [inline-methods] */
        public ByteBuffer m179getBody() {
            return ((ByteBuffer) this.streamEventOffset.getBody()).slice();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public byte[] getStateRow() {
            return this.stateRow;
        }
    }

    /* loaded from: input_file:co/cask/cdap/data2/transaction/stream/AbstractStreamFileConsumer$SimpleDequeueResult.class */
    private final class SimpleDequeueResult implements DequeueResult<StreamEvent> {
        private final List<PollStreamEvent> events;

        private SimpleDequeueResult(List<PollStreamEvent> list) {
            this.events = ImmutableList.copyOf(list);
        }

        @Override // co.cask.cdap.data2.queue.DequeueResult
        public boolean isEmpty() {
            return this.events.isEmpty();
        }

        @Override // co.cask.cdap.data2.queue.DequeueResult
        public void reclaim() {
            AbstractStreamFileConsumer.this.polledEvents.clear();
            AbstractStreamFileConsumer.this.polledEvents.addAll(this.events);
            AbstractStreamFileConsumer.this.eventCache.removeAll(Lists.transform(this.events, AbstractStreamFileConsumer.CONVERT_STREAM_EVENT_OFFSET));
        }

        @Override // co.cask.cdap.data2.queue.DequeueResult
        public int size() {
            return this.events.size();
        }

        @Override // java.lang.Iterable
        public Iterator<StreamEvent> iterator() {
            return Iterators.transform(this.events.iterator(), AbstractStreamFileConsumer.CONVERT_STREAM_EVENT);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:co/cask/cdap/data2/transaction/stream/AbstractStreamFileConsumer$StateScanner.class */
    public interface StateScanner extends Closeable {
        boolean nextStateRow() throws IOException;

        byte[] getRow();

        byte[] getState();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStreamFileConsumer(CConfiguration cConfiguration, StreamConfig streamConfig, ConsumerConfig consumerConfig, FileReader<StreamEventOffset, Iterable<StreamFileOffset>> fileReader, StreamConsumerStateStore streamConsumerStateStore, StreamConsumerState streamConsumerState, @Nullable ReadFilter readFilter) {
        LOG.info("Create consumer {}, reader offsets: {}", consumerConfig, fileReader.getPosition());
        this.txTimeoutNano = TimeUnit.SECONDS.toNanos(cConfiguration.getInt("data.tx.timeout", 30));
        this.streamName = streamConfig.getStreamId();
        this.streamConfig = streamConfig;
        this.consumerConfig = consumerConfig;
        this.consumerStateStore = streamConsumerStateStore;
        this.reader = fileReader;
        this.readFilter = createReadFilter(consumerConfig, readFilter);
        this.entryStates = Maps.newTreeMap(ROW_PREFIX_COMPARATOR);
        this.entryStatesScanCompleted = Sets.newTreeSet(ROW_PREFIX_COMPARATOR);
        this.eventCache = Lists.newArrayList();
        this.consumerState = streamConsumerState;
        this.lastPersistedState = new StreamConsumerState(streamConsumerState);
        this.stateColumnName = Bytes.add(QueueEntryRow.STATE_COLUMN_PREFIX, Bytes.toBytes(consumerConfig.getGroupId()));
    }

    protected void doClose() throws IOException {
    }

    protected abstract boolean claimFifoEntry(byte[] bArr, byte[] bArr2, byte[] bArr3) throws IOException;

    protected abstract void updateState(Iterable<byte[]> iterable, int i, byte[] bArr) throws IOException;

    protected abstract void undoState(Iterable<byte[]> iterable, int i) throws IOException;

    protected abstract StateScanner scanStates(byte[] bArr, byte[] bArr2) throws IOException;

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumer
    public final Id.Stream getStreamId() {
        return this.streamName;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumer
    public final ConsumerConfig getConsumerConfig() {
        return this.consumerConfig;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumer
    public final DequeueResult<StreamEvent> poll(int i, long j, TimeUnit timeUnit) throws IOException, InterruptedException {
        byte[] bArr = null;
        if (this.consumerConfig.getDequeueStrategy() == DequeueStrategy.FIFO && this.consumerConfig.getGroupSize() > 1) {
            bArr = encodeStateColumn(ConsumerEntryState.CLAIMED);
        }
        if (!this.eventCache.isEmpty()) {
            getEvents(this.eventCache, this.polledEvents, i, bArr);
        }
        if (this.polledEvents.size() == i) {
            return new SimpleDequeueResult(this.polledEvents);
        }
        int groupSize = i * this.consumerConfig.getGroupSize();
        long nanos = timeUnit.toNanos(j);
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.start();
        this.consumerState.setState(this.reader.getPosition());
        while (this.polledEvents.size() < i) {
            int read = this.reader.read(this.eventCache, groupSize, nanos, TimeUnit.NANOSECONDS, this.readFilter);
            long elapsedTime = stopwatch.elapsedTime(TimeUnit.NANOSECONDS);
            nanos -= elapsedTime;
            if (read <= 0 || getEvents(this.eventCache, this.polledEvents, i - this.polledEvents.size(), bArr) != 0 || !this.polledEvents.isEmpty() || elapsedTime >= this.txTimeoutNano / 2) {
                if (nanos <= 0) {
                    break;
                }
            }
        }
        return this.polledEvents.isEmpty() ? EMPTY_RESULT : new SimpleDequeueResult(this.polledEvents);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public final void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        try {
            persistConsumerState();
            doClose();
            try {
                this.reader.close();
                this.consumerStateStore.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                this.reader.close();
                this.consumerStateStore.close();
                throw th;
            } finally {
            }
        }
    }

    public final void startTx(Transaction transaction) {
        this.transaction = transaction;
        if (this.polledEvents == null) {
            this.polledEvents = Lists.newArrayList();
        } else {
            this.polledEvents.clear();
        }
        this.committed = false;
    }

    public final void updateTx(Transaction transaction) {
        this.transaction = transaction;
    }

    public final Collection<byte[]> getTxChanges() {
        return ImmutableList.of();
    }

    public final boolean commitTx() throws Exception {
        if (this.polledEvents.isEmpty()) {
            return true;
        }
        updateState(Iterables.transform(this.polledEvents, EVENT_ROW_KEY), this.polledEvents.size(), encodeStateColumn(ConsumerEntryState.PROCESSED));
        this.committed = true;
        return true;
    }

    public void postTxCommit() {
        long nanoTime = System.nanoTime();
        if (nanoTime >= this.nextPersistStateTime) {
            this.nextPersistStateTime = nanoTime + STATE_PERSIST_MIN_INTERVAL;
            persistConsumerState();
        }
        for (PollStreamEvent pollStreamEvent : this.polledEvents) {
            SortedMap<byte[], byte[]> sortedMap = this.entryStates.get(pollStreamEvent.getStateRow());
            if (sortedMap != null) {
                sortedMap.headMap(pollStreamEvent.getStateRow()).clear();
            }
        }
    }

    public boolean rollbackTx() throws Exception {
        if (this.polledEvents.isEmpty()) {
            return true;
        }
        this.consumerState.setState(this.lastPersistedState.getState());
        this.eventCache.addAll(0, Lists.transform(this.polledEvents, CONVERT_STREAM_EVENT_OFFSET));
        byte[] bArr = null;
        if (this.consumerConfig.getDequeueStrategy() == DequeueStrategy.FIFO && this.consumerConfig.getGroupSize() > 1) {
            bArr = encodeStateColumn(ConsumerEntryState.CLAIMED);
            for (PollStreamEvent pollStreamEvent : this.polledEvents) {
                this.entryStates.get(pollStreamEvent.getStateRow()).put(pollStreamEvent.getStateRow(), bArr);
            }
        }
        if (!this.committed) {
            return true;
        }
        if (this.consumerConfig.getDequeueStrategy() != DequeueStrategy.FIFO || this.consumerConfig.getGroupSize() <= 1) {
            undoState(Iterables.transform(this.polledEvents, EVENT_ROW_KEY), this.polledEvents.size());
            return true;
        }
        updateState(Iterables.transform(this.polledEvents, EVENT_ROW_KEY), this.polledEvents.size(), bArr);
        return true;
    }

    public String getTransactionAwareName() {
        return toString();
    }

    public String toString() {
        return Objects.toStringHelper(this).add("stream", this.streamConfig).add("consumer", this.consumerConfig).toString();
    }

    private ReadFilter createReadFilter(ConsumerConfig consumerConfig, @Nullable ReadFilter readFilter) {
        ReadFilter createBaseReadFilter = createBaseReadFilter(consumerConfig);
        return readFilter != null ? ReadFilters.and(readFilter, createBaseReadFilter) : createBaseReadFilter;
    }

    private ReadFilter createBaseReadFilter(ConsumerConfig consumerConfig) {
        final int groupSize = consumerConfig.getGroupSize();
        final DequeueStrategy dequeueStrategy = consumerConfig.getDequeueStrategy();
        if (groupSize == 1 || dequeueStrategy == DequeueStrategy.FIFO) {
            return ReadFilter.ALWAYS_ACCEPT;
        }
        final int instanceId = consumerConfig.getInstanceId();
        return new ReadFilter() { // from class: co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer.5
            @Override // co.cask.cdap.data.file.ReadFilter
            public boolean acceptOffset(long j) {
                return instanceId == Math.abs(dequeueStrategy == DequeueStrategy.HASH ? 0 : AbstractStreamFileConsumer.ROUND_ROBIN_HASHER.hashLong(j).hashCode()) % groupSize;
            }
        };
    }

    private int getEvents(List<? extends StreamEventOffset> list, List<? super PollStreamEvent> list2, int i, byte[] bArr) throws IOException {
        Iterator consumingIterator = Iterators.consumingIterator(list.iterator());
        int i2 = 0;
        while (list2.size() < i && consumingIterator.hasNext()) {
            StreamEventOffset streamEventOffset = (StreamEventOffset) consumingIterator.next();
            byte[] claimEntry = claimEntry(streamEventOffset.getOffset(), bArr);
            if (claimEntry != null) {
                list2.add(new PollStreamEvent(streamEventOffset, claimEntry));
                i2++;
            }
        }
        return i2;
    }

    private void persistConsumerState() {
        try {
            if (this.lastPersistedState == null || !this.consumerState.equals(this.lastPersistedState)) {
                this.consumerStateStore.save(this.consumerState);
                this.lastPersistedState = new StreamConsumerState(this.consumerState);
            }
        } catch (IOException e) {
            LOG.error("Failed to persist consumer state for consumer {} of stream {}", new Object[]{this.consumerConfig, getStreamId(), e});
        }
    }

    private byte[] encodeStateColumn(ConsumerEntryState consumerEntryState) {
        byte[] bArr = new byte[13];
        Bytes.putLong(bArr, 0, this.transaction.getWritePointer());
        Bytes.putInt(bArr, 8, this.consumerConfig.getInstanceId());
        Bytes.putByte(bArr, 12, consumerEntryState.getState());
        return bArr;
    }

    private byte[] claimEntry(StreamFileOffset streamFileOffset, byte[] bArr) throws IOException {
        ByteArrayDataOutput newDataOutput = ByteStreams.newDataOutput(50);
        newDataOutput.writeLong(this.consumerConfig.getGroupId());
        StreamUtils.encodeOffset(newDataOutput, streamFileOffset);
        byte[] byteArray = newDataOutput.toByteArray();
        SortedMap<byte[], byte[]> initRowStates = getInitRowStates(byteArray);
        byte[] bArr2 = initRowStates.get(byteArray);
        if (initRowStates.containsKey(byteArray) && bArr2 == null) {
            return null;
        }
        if (this.consumerConfig.getDequeueStrategy() != DequeueStrategy.FIFO || this.consumerConfig.getGroupSize() <= 1) {
            return byteArray;
        }
        if (claimFifoEntry(byteArray, bArr, bArr2)) {
            return byteArray;
        }
        return null;
    }

    private SortedMap<byte[], byte[]> getInitRowStates(byte[] bArr) throws IOException {
        SortedMap<byte[], byte[]> sortedMap = this.entryStates.get(bArr);
        if (sortedMap == null) {
            sortedMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
            this.entryStates.put(bArr, sortedMap);
        } else if (this.entryStatesScanCompleted.contains(bArr) || !sortedMap.tailMap(bArr).isEmpty()) {
            return sortedMap;
        }
        byte[] copyOf = Arrays.copyOf(bArr, bArr.length);
        Bytes.putLong(copyOf, copyOf.length - 8, KeyValue.LATEST_TIMESTAMP);
        StateScanner scanStates = scanStates(bArr, copyOf);
        Throwable th = null;
        int i = 0;
        while (scanStates.nextStateRow() && i < MAX_SCAN_ROWS) {
            try {
                try {
                    if (storeInitState(scanStates.getRow(), scanStates.getState(), sortedMap)) {
                        i++;
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (scanStates != null) {
                    if (th != null) {
                        try {
                            scanStates.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scanStates.close();
                    }
                }
                throw th2;
            }
        }
        if (i == 0) {
            this.entryStatesScanCompleted.add(bArr);
        }
        if (scanStates != null) {
            if (0 != 0) {
                try {
                    scanStates.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scanStates.close();
            }
        }
        return sortedMap;
    }

    private boolean storeInitState(byte[] bArr, byte[] bArr2, Map<byte[], byte[]> map) {
        if (bArr2 == null) {
            return false;
        }
        long j = Bytes.toLong(bArr, bArr.length - 8);
        long stateWritePointer = QueueEntryRow.getStateWritePointer(bArr2);
        if (!this.readFilter.acceptOffset(j) || stateWritePointer >= this.transaction.getWritePointer()) {
            return false;
        }
        if (QueueEntryRow.getState(bArr2) == ConsumerEntryState.PROCESSED && this.transaction.isVisible(stateWritePointer)) {
            map.put(bArr, null);
            return true;
        }
        if (this.consumerConfig.getDequeueStrategy() != DequeueStrategy.FIFO || this.consumerConfig.getGroupSize() <= 1) {
            return false;
        }
        int stateInstanceId = QueueEntryRow.getStateInstanceId(bArr2);
        if (stateInstanceId >= this.consumerConfig.getGroupSize() || stateInstanceId == this.consumerConfig.getInstanceId()) {
            map.put(bArr, bArr2);
            return true;
        }
        map.put(bArr, null);
        return true;
    }
}
