package org.apache.giraph.block_app.framework.api.local;

import com.google.common.collect.AbstractIterator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.MessageClasses;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.types.ops.TypeOps;
import org.apache.giraph.types.ops.TypeOpsUtils;
import org.apache.giraph.utils.ExtendedDataInput;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/giraph/block_app/framework/api/local/InternalMessageStore.class */
public interface InternalMessageStore<I extends WritableComparable, M extends Writable> {

    /* loaded from: input_file:org/apache/giraph/block_app/framework/api/local/InternalMessageStore$InternalByteMessageStore.class */
    public static class InternalByteMessageStore<I extends WritableComparable, M extends Writable> extends InternalConcurrentMessageStore<I, M, ExtendedDataOutput> {
        private final MessageValueFactory<M> messageFactory;
        private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;

        public InternalByteMessageStore(Class<I> cls, MessageValueFactory<M> messageValueFactory, ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration) {
            super(cls);
            this.messageFactory = messageValueFactory;
            this.conf = immutableClassesGiraphConfiguration;
        }

        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public Iterable<M> takeMessages(I i) {
            final ExtendedDataOutput removeFor = removeFor(i);
            if (removeFor == null) {
                return null;
            }
            return (Iterable<M>) new Iterable<M>() { // from class: org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalByteMessageStore.1
                @Override // java.lang.Iterable
                public Iterator<M> iterator() {
                    final ExtendedDataInput createExtendedDataInput = InternalByteMessageStore.this.conf.createExtendedDataInput(removeFor.getByteArray(), 0, removeFor.getPos());
                    final Writable newInstance = InternalByteMessageStore.this.messageFactory.newInstance();
                    return new AbstractIterator<M>() { // from class: org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalByteMessageStore.1.1
                        /* JADX INFO: Access modifiers changed from: protected */
                        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
                        public M m3computeNext() {
                            if (createExtendedDataInput.available() == 0) {
                                return (M) endOfData();
                            }
                            try {
                                newInstance.readFields(createExtendedDataInput);
                                return (M) newInstance;
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    };
                }
            };
        }

        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public void sendMessage(I i, M m) {
            ExtendedDataOutput receiverFor = getReceiverFor(i);
            synchronized (receiverFor) {
                try {
                    m.write(receiverFor);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalConcurrentMessageStore
        public ExtendedDataOutput createNewReceiver() {
            return this.conf.createExtendedDataOutput();
        }
    }

    /* loaded from: input_file:org/apache/giraph/block_app/framework/api/local/InternalMessageStore$InternalChecksMessageStore.class */
    public static class InternalChecksMessageStore<I extends WritableComparable, M extends Writable> implements InternalMessageStore<I, M> {
        private final InternalMessageStore<I, M> messageStore;
        private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
        private final MessageValueFactory<M> messageFactory;

        public InternalChecksMessageStore(InternalMessageStore<I, M> internalMessageStore, ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration, MessageValueFactory<M> messageValueFactory) {
            this.messageStore = internalMessageStore;
            this.conf = immutableClassesGiraphConfiguration;
            this.messageFactory = messageValueFactory;
        }

        private M maybeMessageCopy(M m) {
            return ThreadLocalRandom.current().nextBoolean() ? (M) WritableUtils.createCopy(m, this.messageFactory, this.conf) : m;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void checkIdCopy(I i) {
            WritableUtils.createCopy(i, this.conf.getVertexIdFactory(), this.conf);
        }

        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public void sendMessage(I i, M m) {
            checkIdCopy(i);
            this.messageStore.sendMessage(i, maybeMessageCopy(m));
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public void sendMessageToMultipleEdges(final Iterator<I> it, M m) {
            this.messageStore.sendMessageToMultipleEdges(new Iterator<I>() { // from class: org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalChecksMessageStore.1
                @Override // java.util.Iterator
                public boolean hasNext() {
                    return it.hasNext();
                }

                @Override // java.util.Iterator
                public I next() {
                    I i = (I) it.next();
                    InternalChecksMessageStore.this.checkIdCopy(i);
                    return i;
                }

                @Override // java.util.Iterator
                public void remove() {
                    it.remove();
                }
            }, maybeMessageCopy(m));
        }

        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public Iterable<M> takeMessages(I i) {
            checkIdCopy(i);
            return this.messageStore.takeMessages(i);
        }

        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public Set<I> targetsSet() {
            return this.messageStore.targetsSet();
        }
    }

    /* loaded from: input_file:org/apache/giraph/block_app/framework/api/local/InternalMessageStore$InternalCombinerMessageStore.class */
    public static class InternalCombinerMessageStore<I extends WritableComparable, M extends Writable> extends InternalConcurrentMessageStore<I, M, M> {
        private final MessageCombiner<? super I, M> messageCombiner;

        public InternalCombinerMessageStore(Class<I> cls, MessageCombiner<? super I, M> messageCombiner) {
            super(cls);
            this.messageCombiner = messageCombiner;
        }

        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public Iterable<M> takeMessages(I i) {
            Writable writable = (Writable) removeFor(i);
            if (writable != null) {
                return Collections.singleton(writable);
            }
            return null;
        }

        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public void sendMessage(I i, M m) {
            Writable writable = (Writable) getReceiverFor(i);
            synchronized (writable) {
                this.messageCombiner.combine(i, writable, m);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalConcurrentMessageStore
        public M createNewReceiver() {
            return (M) this.messageCombiner.createInitialMessage();
        }
    }

    /* loaded from: input_file:org/apache/giraph/block_app/framework/api/local/InternalMessageStore$InternalConcurrentMessageStore.class */
    public static abstract class InternalConcurrentMessageStore<I extends WritableComparable, M extends Writable, R> implements InternalMessageStore<I, M> {
        private final ConcurrentHashMap<I, R> received = new ConcurrentHashMap<>();
        private final Class<I> idClass;
        private final TypeOps<I> idTypeOps;

        InternalConcurrentMessageStore(Class<I> cls) {
            this.idClass = cls;
            this.idTypeOps = TypeOpsUtils.getTypeOpsOrNull(cls);
        }

        public I copyId(I i) {
            return this.idTypeOps != null ? (I) this.idTypeOps.createCopy(i) : WritableUtils.createCopy(i, this.idClass, (ImmutableClassesGiraphConfiguration) null);
        }

        R getReceiverFor(I i) {
            R r = this.received.get(i);
            if (r == null) {
                I copyId = copyId(i);
                r = createNewReceiver();
                R putIfAbsent = this.received.putIfAbsent(copyId, r);
                if (putIfAbsent != null) {
                    r = putIfAbsent;
                }
            }
            return r;
        }

        R removeFor(I i) {
            return this.received.remove(i);
        }

        abstract R createNewReceiver();

        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public Set<I> targetsSet() {
            return this.received.keySet();
        }

        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public void sendMessageToMultipleEdges(Iterator<I> it, M m) {
            while (it.hasNext()) {
                sendMessage(it.next(), m);
            }
        }

        public static <I extends WritableComparable, M extends Writable> InternalMessageStore<I, M> createMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration, MessageClasses<I, M> messageClasses) {
            MessageCombiner createMessageCombiner = messageClasses.createMessageCombiner(immutableClassesGiraphConfiguration);
            return createMessageCombiner != null ? new InternalCombinerMessageStore(immutableClassesGiraphConfiguration.getVertexIdClass(), createMessageCombiner) : messageClasses.getMessageEncodeAndStoreType().equals(MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX) ? new InternalSharedByteMessageStore(immutableClassesGiraphConfiguration.getVertexIdClass(), messageClasses.createMessageValueFactory(immutableClassesGiraphConfiguration)) : new InternalByteMessageStore(immutableClassesGiraphConfiguration.getVertexIdClass(), messageClasses.createMessageValueFactory(immutableClassesGiraphConfiguration), immutableClassesGiraphConfiguration);
        }

        public static <I extends WritableComparable, M extends Writable> InternalMessageStore<I, M> createMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration, BlockWorkerPieces blockWorkerPieces, boolean z) {
            MessageClasses outgoingMessageClasses = blockWorkerPieces.getOutgoingMessageClasses(immutableClassesGiraphConfiguration);
            InternalMessageStore<I, M> createMessageStore = createMessageStore(immutableClassesGiraphConfiguration, outgoingMessageClasses);
            return z ? new InternalChecksMessageStore(createMessageStore, immutableClassesGiraphConfiguration, outgoingMessageClasses.createMessageValueFactory(immutableClassesGiraphConfiguration)) : createMessageStore;
        }
    }

    /* loaded from: input_file:org/apache/giraph/block_app/framework/api/local/InternalMessageStore$InternalSharedByteMessageStore.class */
    public static class InternalSharedByteMessageStore<I extends WritableComparable, M extends Writable> extends InternalConcurrentMessageStore<I, M, List<byte[]>> {
        private final MessageValueFactory<M> messageFactory;

        public InternalSharedByteMessageStore(Class<I> cls, MessageValueFactory<M> messageValueFactory) {
            super(cls);
            this.messageFactory = messageValueFactory;
        }

        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public Iterable<M> takeMessages(I i) {
            final List<byte[]> removeFor = removeFor(i);
            if (removeFor == null) {
                return null;
            }
            return (Iterable<M>) new Iterable<M>() { // from class: org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalSharedByteMessageStore.1
                @Override // java.lang.Iterable
                public Iterator<M> iterator() {
                    final Iterator it = removeFor.iterator();
                    final Writable newInstance = InternalSharedByteMessageStore.this.messageFactory.newInstance();
                    final UnsafeReusableByteArrayInput unsafeReusableByteArrayInput = new UnsafeReusableByteArrayInput();
                    return (Iterator<M>) new Iterator<M>() { // from class: org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalSharedByteMessageStore.1.1
                        @Override // java.util.Iterator
                        public boolean hasNext() {
                            return it.hasNext();
                        }

                        @Override // java.util.Iterator
                        public M next() {
                            WritableUtils.fromByteArrayUnsafe((byte[]) it.next(), newInstance, unsafeReusableByteArrayInput);
                            return (M) newInstance;
                        }

                        @Override // java.util.Iterator
                        public void remove() {
                            it.remove();
                        }
                    };
                }
            };
        }

        private void storeMessage(I i, byte[] bArr) {
            List<byte[]> receiverFor = getReceiverFor(i);
            synchronized (receiverFor) {
                receiverFor.add(bArr);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalConcurrentMessageStore
        public List<byte[]> createNewReceiver() {
            return new ArrayList();
        }

        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public void sendMessage(I i, M m) {
            storeMessage(i, WritableUtils.toByteArrayUnsafe(m));
        }

        @Override // org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalConcurrentMessageStore, org.apache.giraph.block_app.framework.api.local.InternalMessageStore
        public void sendMessageToMultipleEdges(Iterator<I> it, M m) {
            byte[] byteArrayUnsafe = WritableUtils.toByteArrayUnsafe(m);
            while (it.hasNext()) {
                storeMessage(it.next(), byteArrayUnsafe);
            }
        }
    }

    Set<I> targetsSet();

    Iterable<M> takeMessages(I i);

    void sendMessage(I i, M m);

    void sendMessageToMultipleEdges(Iterator<I> it, M m);
}
