package org.apache.rya.streams.kafka.processors.join;

import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.streams.kafka.processors.ProcessorResult;
import org.eclipse.rdf4j.query.impl.MapBindingSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.class */
public class KeyValueJoinStateStore implements JoinStateStore {
    private static final Logger log = LoggerFactory.getLogger(KeyValueJoinStateStore.class);
    private static final String START_RANGE_SUFFIX = new String(new byte[]{0}, Charsets.UTF_8);
    private static final String END_RANGE_SUFFIX = new String(new byte[]{-1}, Charsets.UTF_8);
    private static final String JOIN_VAR_END_MARKER = new String(new byte[]{6, 16, 3, 4}, Charsets.UTF_8);
    private static final VisibilityBindingSet RANGE_MARKER_VALUE = new VisibilityBindingSet(new MapBindingSet(), "");
    private final KeyValueStore<String, VisibilityBindingSet> store;
    private final String id;
    private final List<String> joinVars;
    private final List<String> allVars;

    public KeyValueJoinStateStore(KeyValueStore<String, VisibilityBindingSet> keyValueStore, String str, List<String> list, List<String> list2) throws IllegalArgumentException {
        this.store = (KeyValueStore) Objects.requireNonNull(keyValueStore);
        this.id = (String) Objects.requireNonNull(str);
        this.joinVars = (List) Objects.requireNonNull(list);
        this.allVars = (List) Objects.requireNonNull(list2);
        for (int i = 0; i < list.size(); i++) {
            if (!list.get(i).equals(list2.get(i))) {
                throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. Join Vars: " + list + ", All Vars: " + list2);
            }
        }
    }

    @Override // org.apache.rya.streams.kafka.processors.join.JoinStateStore
    public void store(ProcessorResult.BinaryResult binaryResult) {
        Objects.requireNonNull(binaryResult);
        ProcessorResult.BinaryResult.Side side = binaryResult.getSide();
        VisibilityBindingSet result = binaryResult.getResult();
        String makeCommaDelimitedValues = makeCommaDelimitedValues(side, this.joinVars, result, this.joinVars.size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(makeCommaDelimitedValues + START_RANGE_SUFFIX, RANGE_MARKER_VALUE));
        arrayList.add(new KeyValue(makeCommaDelimitedValues(side, this.allVars, result, this.joinVars.size()), result));
        arrayList.add(new KeyValue(makeCommaDelimitedValues + END_RANGE_SUFFIX, RANGE_MARKER_VALUE));
        log.debug("\nStoring the following values: {}\n", arrayList);
        this.store.putAll(arrayList);
    }

    @Override // org.apache.rya.streams.kafka.processors.join.JoinStateStore
    public CloseableIterator<VisibilityBindingSet> getJoinedValues(ProcessorResult.BinaryResult binaryResult) {
        Objects.requireNonNull(binaryResult);
        String makeCommaDelimitedValues = makeCommaDelimitedValues(binaryResult.getSide() == ProcessorResult.BinaryResult.Side.LEFT ? ProcessorResult.BinaryResult.Side.RIGHT : ProcessorResult.BinaryResult.Side.LEFT, this.joinVars, binaryResult.getResult(), this.joinVars.size());
        final KeyValueIterator range = this.store.range(makeCommaDelimitedValues + START_RANGE_SUFFIX, makeCommaDelimitedValues + END_RANGE_SUFFIX);
        return new CloseableIterator<VisibilityBindingSet>() { // from class: org.apache.rya.streams.kafka.processors.join.KeyValueJoinStateStore.1
            private Optional<VisibilityBindingSet> next = null;

            public boolean hasNext() {
                if (this.next == null) {
                    this.next = readNext();
                }
                return this.next.isPresent();
            }

            /* renamed from: next, reason: merged with bridge method [inline-methods] */
            public VisibilityBindingSet m10next() {
                if (this.next == null) {
                    this.next = readNext();
                }
                if (!this.next.isPresent()) {
                    throw new IllegalStateException("May not invoke next() when there is nothing left in the Iterator.");
                }
                VisibilityBindingSet visibilityBindingSet = this.next.get();
                KeyValueJoinStateStore.log.debug("\nReturning: {}", visibilityBindingSet);
                this.next = readNext();
                return visibilityBindingSet;
            }

            private Optional<VisibilityBindingSet> readNext() {
                if (!range.hasNext()) {
                    return Optional.empty();
                }
                KeyValue keyValue = (KeyValue) range.next();
                if (this.next == null) {
                    if (!((String) keyValue.key).endsWith(KeyValueJoinStateStore.START_RANGE_SUFFIX)) {
                        throw new IllegalStateException("The first key encountered must be a start of range key.");
                    }
                    KeyValueJoinStateStore.log.debug("Read the start of range markers.\n");
                    if (!range.hasNext()) {
                        throw new IllegalStateException("There must be another entry after the start of range key.");
                    }
                    keyValue = (KeyValue) range.next();
                } else if (((String) keyValue.key).endsWith(KeyValueJoinStateStore.END_RANGE_SUFFIX)) {
                    KeyValueJoinStateStore.log.debug("Read the end of range marker.\n");
                    if (range.hasNext()) {
                        throw new IllegalStateException("The end of range marker must be the last key in the iterator.");
                    }
                    return Optional.empty();
                }
                return Optional.of(keyValue.value);
            }

            public void close() throws Exception {
                range.close();
            }
        };
    }

    private String makeCommaDelimitedValues(ProcessorResult.BinaryResult.Side side, List<String> list, VisibilityBindingSet visibilityBindingSet, int i) {
        Objects.requireNonNull(side);
        Objects.requireNonNull(list);
        Objects.requireNonNull(visibilityBindingSet);
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.id);
        arrayList.add(side.toString());
        int i2 = 0;
        for (String str : list) {
            i2++;
            String obj = visibilityBindingSet.hasBinding(str) ? visibilityBindingSet.getBinding(str).getValue().toString() : "";
            if (i2 == i) {
                obj = obj + JOIN_VAR_END_MARKER;
            }
            arrayList.add(obj);
        }
        return Joiner.on(",").join(arrayList);
    }

    private void printStateStoreRange(String str, String str2) {
        printStateStoreKeyValueIterator(this.store.range(str, str2));
    }

    private void printStateStoreAll() {
        printStateStoreKeyValueIterator(this.store.all());
    }

    private static void printStateStoreKeyValueIterator(KeyValueIterator<String, VisibilityBindingSet> keyValueIterator) {
        log.info("----------------");
        while (keyValueIterator.hasNext()) {
            KeyValue keyValue = (KeyValue) keyValueIterator.next();
            log.info(((String) keyValue.key) + " :::: " + keyValue.value);
        }
        log.info("----------------\n\n");
        if (keyValueIterator != null) {
            keyValueIterator.close();
        }
    }
}
