package co.cask.cdap.data2.transaction.stream;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data.stream.StreamFileOffset;
import co.cask.cdap.data.stream.StreamUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/StreamConsumerStateStore.class */
public abstract class StreamConsumerStateStore implements ConsumerStateStore<StreamConsumerState, Iterable<StreamFileOffset>> {
    protected final StreamConfig streamConfig;
    protected final QueueName name;

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamConsumerStateStore(StreamConfig streamConfig) {
        this.streamConfig = streamConfig;
        this.name = QueueName.fromStream(streamConfig.getName());
    }

    @Override // co.cask.cdap.data2.transaction.stream.ConsumerStateStore
    public final void getAll(Collection<? super StreamConsumerState> collection) throws IOException {
        TreeMap newTreeMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
        fetchAll(this.name.toBytes(), newTreeMap);
        for (Map.Entry entry : newTreeMap.entrySet()) {
            byte[] bArr = (byte[]) entry.getKey();
            byte[] bArr2 = (byte[]) entry.getValue();
            if (bArr2 != null) {
                collection.add(new StreamConsumerState(getGroupId(bArr), getInstanceId(bArr), decodeOffsets(bArr2)));
            }
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.ConsumerStateStore
    public final void getByGroup(long j, Collection<? super StreamConsumerState> collection) throws IOException {
        byte[] bArr;
        TreeMap newTreeMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
        fetchAll(this.name.toBytes(), Bytes.toBytes(j), newTreeMap);
        for (Map.Entry entry : newTreeMap.entrySet()) {
            byte[] bArr2 = (byte[]) entry.getKey();
            if (getGroupId(bArr2) == j && (bArr = (byte[]) entry.getValue()) != null) {
                collection.add(new StreamConsumerState(j, getInstanceId(bArr2), decodeOffsets(bArr)));
            }
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // co.cask.cdap.data2.transaction.stream.ConsumerStateStore
    public final StreamConsumerState get(long j, int i) throws IOException {
        byte[] fetch = fetch(this.name.toBytes(), getColumn(j, i));
        if (fetch == null) {
            return null;
        }
        return new StreamConsumerState(j, i, decodeOffsets(fetch));
    }

    @Override // co.cask.cdap.data2.transaction.stream.ConsumerStateStore
    public final void save(StreamConsumerState streamConsumerState) throws IOException {
        store(this.name.toBytes(), getColumn(streamConsumerState.getGroupId(), streamConsumerState.getInstanceId()), encodeOffsets(streamConsumerState.getState()));
    }

    @Override // co.cask.cdap.data2.transaction.stream.ConsumerStateStore
    public final void save(Iterable<? extends StreamConsumerState> iterable) throws IOException {
        ImmutableSortedMap.Builder orderedBy = ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        for (StreamConsumerState streamConsumerState : iterable) {
            byteArrayOutputStream.reset();
            encodeOffsets(streamConsumerState.getState(), dataOutputStream);
            orderedBy.put(getColumn(streamConsumerState.getGroupId(), streamConsumerState.getInstanceId()), byteArrayOutputStream.toByteArray());
        }
        store(this.name.toBytes(), orderedBy.build());
    }

    @Override // co.cask.cdap.data2.transaction.stream.ConsumerStateStore
    public final void remove(Iterable<? extends StreamConsumerState> iterable) throws IOException {
        TreeSet newTreeSet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
        for (StreamConsumerState streamConsumerState : iterable) {
            newTreeSet.add(getColumn(streamConsumerState.getGroupId(), streamConsumerState.getInstanceId()));
        }
        delete(this.name.toBytes(), newTreeSet);
    }

    protected abstract byte[] fetch(byte[] bArr, byte[] bArr2) throws IOException;

    protected abstract void fetchAll(byte[] bArr, Map<byte[], byte[]> map) throws IOException;

    protected abstract void fetchAll(byte[] bArr, byte[] bArr2, Map<byte[], byte[]> map) throws IOException;

    protected abstract void store(byte[] bArr, byte[] bArr2, byte[] bArr3) throws IOException;

    protected abstract void store(byte[] bArr, Map<byte[], byte[]> map) throws IOException;

    protected abstract void delete(byte[] bArr, Set<byte[]> set) throws IOException;

    private byte[] encodeOffsets(Iterable<StreamFileOffset> iterable) throws IOException {
        ByteArrayDataOutput newDataOutput = ByteStreams.newDataOutput(320);
        encodeOffsets(iterable, newDataOutput);
        return newDataOutput.toByteArray();
    }

    private void encodeOffsets(Iterable<StreamFileOffset> iterable, DataOutput dataOutput) throws IOException {
        Iterator<StreamFileOffset> it = iterable.iterator();
        while (it.hasNext()) {
            StreamUtils.encodeOffset(dataOutput, it.next());
        }
    }

    private Iterable<StreamFileOffset> decodeOffsets(byte[] bArr) throws IOException {
        ImmutableList.Builder builder = ImmutableList.builder();
        if (bArr != null && bArr.length > 0) {
            DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bArr));
            while (dataInputStream.available() > 0) {
                builder.add(StreamUtils.decodeOffset(this.streamConfig, dataInputStream));
            }
        }
        return builder.build();
    }

    private byte[] getColumn(long j, int i) {
        byte[] bArr = new byte[12];
        Bytes.putLong(bArr, 0, j);
        Bytes.putInt(bArr, 8, i);
        return bArr;
    }

    private long getGroupId(byte[] bArr) {
        return Bytes.toLong(bArr);
    }

    private int getInstanceId(byte[] bArr) {
        return Bytes.toInt(bArr, 8);
    }
}
