package com.oracle.coherence.grpc.client.common.topics;

import com.google.protobuf.BoolValue;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Message;
import com.google.protobuf.util.Timestamps;
import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.grpc.BinaryHelper;
import com.oracle.coherence.grpc.ErrorsHelper;
import com.oracle.coherence.grpc.MessageHelper;
import com.oracle.coherence.grpc.TopicHelper;
import com.oracle.coherence.grpc.client.common.GrpcConnection;
import com.oracle.coherence.grpc.client.common.SimpleStreamObserver;
import com.oracle.coherence.grpc.client.common.v1.GrpcConnectionV1;
import com.oracle.coherence.grpc.messages.common.v1.CollectionOfInt32;
import com.oracle.coherence.grpc.messages.topic.v1.ChannelAndPosition;
import com.oracle.coherence.grpc.messages.topic.v1.CommitResponse;
import com.oracle.coherence.grpc.messages.topic.v1.CommitResponseStatus;
import com.oracle.coherence.grpc.messages.topic.v1.EnsureSubscriptionRequest;
import com.oracle.coherence.grpc.messages.topic.v1.InitializeSubscriptionRequest;
import com.oracle.coherence.grpc.messages.topic.v1.InitializeSubscriptionResponse;
import com.oracle.coherence.grpc.messages.topic.v1.MapOfChannelAndPosition;
import com.oracle.coherence.grpc.messages.topic.v1.ReceiveRequest;
import com.oracle.coherence.grpc.messages.topic.v1.ReceiveResponse;
import com.oracle.coherence.grpc.messages.topic.v1.ReceiveStatus;
import com.oracle.coherence.grpc.messages.topic.v1.SeekRequest;
import com.oracle.coherence.grpc.messages.topic.v1.SeekResponse;
import com.oracle.coherence.grpc.messages.topic.v1.SeekedPositions;
import com.oracle.coherence.grpc.messages.topic.v1.TopicElement;
import com.oracle.coherence.grpc.messages.topic.v1.TopicServiceRequestType;
import com.oracle.coherence.grpc.messages.topic.v1.TopicServiceResponse;
import com.tangosol.internal.net.topic.BaseRemoteSubscriber;
import com.tangosol.internal.net.topic.ReceiveResult;
import com.tangosol.internal.net.topic.SeekResult;
import com.tangosol.internal.net.topic.SimpleReceiveResult;
import com.tangosol.internal.net.topic.SubscriberConnector;
import com.tangosol.internal.net.topic.TopicSubscription;
import com.tangosol.internal.net.topic.impl.paged.agent.SeekProcessor;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberId;
import com.tangosol.io.Serializer;
import com.tangosol.net.RequestIncompleteException;
import com.tangosol.net.RequestTimeoutException;
import com.tangosol.net.topic.Position;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.net.topic.TopicDependencies;
import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;

/* loaded from: input_file:com/oracle/coherence/grpc/client/common/topics/GrpcSubscriberConnector.class */
public class GrpcSubscriberConnector<V> extends BaseRemoteSubscriber<V> implements GrpcConnection.ConnectionListener {
    private final GrpcNamedTopicConnector<?> f_connector;
    private final TopicServiceGrpcConnection f_connection;
    private final GrpcConnection.Listener<TopicServiceResponse> f_listener;
    private final int f_nProxyId;
    private final Serializer f_serializer;
    private SubscriberConnector.ConnectedSubscriber<V> m_subscriber;

    /* renamed from: com.oracle.coherence.grpc.client.common.topics.GrpcSubscriberConnector$1, reason: invalid class name */
    /* loaded from: input_file:com/oracle/coherence/grpc/client/common/topics/GrpcSubscriberConnector$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$ReceiveStatus;
        static final /* synthetic */ int[] $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$CommitResponseStatus = new int[CommitResponseStatus.values().length];

        static {
            try {
                $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$CommitResponseStatus[CommitResponseStatus.Committed.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$CommitResponseStatus[CommitResponseStatus.AlreadyCommitted.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$CommitResponseStatus[CommitResponseStatus.Rejected.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$CommitResponseStatus[CommitResponseStatus.Unowned.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$CommitResponseStatus[CommitResponseStatus.NothingToCommit.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$CommitResponseStatus[CommitResponseStatus.UNRECOGNIZED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$ReceiveStatus = new int[ReceiveStatus.values().length];
            try {
                $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$ReceiveStatus[ReceiveStatus.ReceiveSuccess.ordinal()] = 1;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$ReceiveStatus[ReceiveStatus.ChannelExhausted.ordinal()] = 2;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$ReceiveStatus[ReceiveStatus.ChannelNotAllocatedChannel.ordinal()] = 3;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$ReceiveStatus[ReceiveStatus.UnknownSubscriber.ordinal()] = 4;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$ReceiveStatus[ReceiveStatus.UNRECOGNIZED.ordinal()] = 5;
            } catch (NoSuchFieldError e11) {
            }
        }
    }

    public GrpcSubscriberConnector(GrpcNamedTopicConnector<?> grpcNamedTopicConnector, int i, TopicServiceGrpcConnection topicServiceGrpcConnection, String str, SubscriberId subscriberId, SubscriberGroupId subscriberGroupId) {
        super(str, subscriberId, subscriberGroupId);
        this.f_connector = grpcNamedTopicConnector;
        this.f_nProxyId = i;
        this.f_connection = topicServiceGrpcConnection;
        this.f_serializer = grpcNamedTopicConnector.getTopicService().getSerializer();
        this.f_listener = new GrpcConnection.Listener<>(new SimpleStreamObserver(this::onEvent), topicServiceResponse -> {
            return topicServiceResponse.getProxyId() == i;
        });
        topicServiceGrpcConnection.addResponseObserver(this.f_listener);
        topicServiceGrpcConnection.addConnectionListener(this);
    }

    public void postConstruct(SubscriberConnector.ConnectedSubscriber<V> connectedSubscriber) {
        this.m_subscriber = connectedSubscriber;
    }

    public boolean isActive() {
        return this.f_connection.isConnected();
    }

    public void ensureConnected() {
    }

    public void close() {
        this.f_connection.removeConnectionListener(this);
        if (this.f_listener != null) {
            this.f_connection.removeResponseObserver(this.f_listener);
        }
    }

    public Position[] initialize(SubscriberConnector.ConnectedSubscriber<V> connectedSubscriber, boolean z, boolean z2, boolean z3) {
        InitializeSubscriptionResponse send = send(TopicServiceRequestType.InitializeSubscription, InitializeSubscriptionRequest.newBuilder().setDisconnected(z3).setForceReconnect(z).setReconnect(z2).build(), InitializeSubscriptionResponse.class);
        this.m_subscriptionId = send.getSubscriptionId();
        this.m_connectionTimestamp = Timestamps.toMillis(send.getTimestamp());
        return (Position[]) send.getHeadsList().stream().map(TopicHelper::fromProtobufPosition).toArray(i -> {
            return new Position[i];
        });
    }

    public boolean ensureSubscription(SubscriberConnector.ConnectedSubscriber<V> connectedSubscriber, long j, boolean z) {
        BoolValue send = send(TopicServiceRequestType.EnsureSubscription, EnsureSubscriptionRequest.newBuilder().setSubscriptionId(j).setForceReconnect(z).build(), BoolValue.class);
        return send != null && send.getValue();
    }

    protected void sendHeartbeat(boolean z) {
        if (z) {
            poll(TopicServiceRequestType.SubscriberHeartbeat, (Message) BoolValue.of(z));
        } else {
            send(TopicServiceRequestType.SubscriberHeartbeat, BoolValue.of(z));
        }
    }

    protected SimpleReceiveResult receiveInternal(int i, Position position, long j, int i2) {
        CompletableFuture thenApply = poll(TopicServiceRequestType.Receive, ReceiveRequest.newBuilder().setChannel(i).setMaxMessages(i2).build(), ReceiveResponse.class).thenApply((Function<? super M, ? extends U>) this::onReceiveResponse);
        try {
            return (SimpleReceiveResult) thenApply.get(getRequestTimeoutMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
            thenApply.cancel(true);
            throw new RequestTimeoutException(e);
        } catch (ExecutionException e2) {
            throw Exceptions.ensureRuntimeException(e2);
        }
    }

    protected void onEvent(TopicServiceResponse topicServiceResponse) {
        dispatchEvent(TopicHelper.fromProtobufSubscriberEvent(this, topicServiceResponse));
    }

    protected SimpleReceiveResult onReceiveResponse(ReceiveResponse receiveResponse) {
        ReceiveResult.Status status;
        Position fromProtobufPosition = TopicHelper.fromProtobufPosition(receiveResponse.getHeadPosition());
        switch (AnonymousClass1.$SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$ReceiveStatus[receiveResponse.getStatus().ordinal()]) {
            case GrpcConnectionV1.SERVICE_VERSION /* 1 */:
                status = ReceiveResult.Status.Success;
                break;
            case 2:
                status = ReceiveResult.Status.Exhausted;
                break;
            case 3:
                status = ReceiveResult.Status.NotAllocatedChannel;
                break;
            case 4:
                status = ReceiveResult.Status.UnknownSubscriber;
                break;
            case 5:
                throw new IllegalArgumentException("Unknown subscriber status: " + String.valueOf(receiveResponse.getStatus()));
            default:
                throw new IncompatibleClassChangeError();
        }
        return new SimpleReceiveResult((Queue) receiveResponse.getValuesList().stream().map(BinaryHelper::toBinary).collect(Collectors.toCollection(LinkedList::new)), receiveResponse.getRemainingValues(), status, fromProtobufPosition);
    }

    protected void commitInternal(int i, Position position, BaseRemoteSubscriber.CommitHandler commitHandler) {
        Subscriber.CommitResultStatus commitResultStatus;
        CommitResponse send = send(TopicServiceRequestType.CommitPosition, ChannelAndPosition.newBuilder().setChannel(i).setPosition(TopicHelper.toProtobufPosition(position)).build(), CommitResponse.class);
        Position fromProtobufPosition = TopicHelper.fromProtobufPosition(send.getPosition());
        switch (AnonymousClass1.$SwitchMap$com$oracle$coherence$grpc$messages$topic$v1$CommitResponseStatus[send.getStatus().ordinal()]) {
            case GrpcConnectionV1.SERVICE_VERSION /* 1 */:
                commitResultStatus = Subscriber.CommitResultStatus.Committed;
                break;
            case 2:
                commitResultStatus = Subscriber.CommitResultStatus.AlreadyCommitted;
                break;
            case 3:
                commitResultStatus = Subscriber.CommitResultStatus.Rejected;
                break;
            case 4:
                commitResultStatus = Subscriber.CommitResultStatus.Unowned;
                break;
            case 5:
                commitResultStatus = Subscriber.CommitResultStatus.NothingToCommit;
                break;
            case 6:
                throw new IllegalArgumentException("Unknown subscriber status: " + String.valueOf(send.getStatus()));
            default:
                throw new IncompatibleClassChangeError();
        }
        Subscriber.CommitResultStatus commitResultStatus2 = commitResultStatus;
        RequestIncompleteException requestIncompleteException = null;
        if (send.hasError()) {
            requestIncompleteException = ErrorsHelper.createException(send.getError(), this.f_serializer);
        }
        commitHandler.committed(new Subscriber.CommitResult(i, position, commitResultStatus2, requestIncompleteException), fromProtobufPosition);
    }

    public Subscriber.Element<V> peek(int i, Position position) {
        CompletableFuture thenApply = poll(TopicServiceRequestType.PeekAtPosition, ChannelAndPosition.newBuilder().setChannel(i).setPosition(TopicHelper.toProtobufPosition(position)).build(), TopicElement.class).thenApply((Function<? super M, ? extends U>) this::createElement);
        try {
            return (Subscriber.Element) thenApply.get(getRequestTimeoutMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
            thenApply.cancel(true);
            throw new RequestTimeoutException(e);
        } catch (ExecutionException e2) {
            throw Exceptions.ensureRuntimeException(e2);
        }
    }

    protected Subscriber.Element<V> createElement(TopicElement topicElement) {
        return this.m_subscriber.createElement(BinaryHelper.toBinary(topicElement.getValue()), topicElement.getChannel());
    }

    public int getRemainingMessages(SubscriberGroupId subscriberGroupId, int[] iArr) {
        return this.f_connector.getRemainingMessages(subscriberGroupId.getGroupName(), iArr);
    }

    public boolean isCommitted(SubscriberGroupId subscriberGroupId, int i, Position position) {
        CompletableFuture thenApply = poll(TopicServiceRequestType.IsPositionCommitted, ChannelAndPosition.newBuilder().setChannel(i).setPosition(TopicHelper.toProtobufPosition(position)).build(), BoolValue.class).thenApply((Function<? super M, ? extends U>) boolValue -> {
            return Boolean.valueOf(boolValue != null && boolValue.getValue());
        });
        try {
            return ((Boolean) thenApply.get(getRequestTimeoutMillis(), TimeUnit.MILLISECONDS)).booleanValue();
        } catch (InterruptedException | TimeoutException e) {
            thenApply.cancel(true);
            throw new RequestTimeoutException(e);
        } catch (ExecutionException e2) {
            throw Exceptions.ensureRuntimeException(e2);
        }
    }

    public TopicSubscription getSubscription(SubscriberConnector.ConnectedSubscriber<V> connectedSubscriber, long j) {
        throw new UnsupportedOperationException();
    }

    public SortedSet<Integer> getOwnedChannels(SubscriberConnector.ConnectedSubscriber<V> connectedSubscriber) {
        CompletableFuture thenApply = poll(TopicServiceRequestType.GetOwnedChannels, CollectionOfInt32.class).thenApply((Function<? super M, ? extends U>) collectionOfInt32 -> {
            return new TreeSet(collectionOfInt32.getValuesList());
        });
        try {
            return (SortedSet) thenApply.get(getRequestTimeoutMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
            thenApply.cancel(true);
            throw new RequestTimeoutException(e);
        } catch (ExecutionException e2) {
            throw Exceptions.ensureRuntimeException(e2);
        }
    }

    public Map<Integer, Position> getTopicHeads(int[] iArr) {
        CompletableFuture thenApply = poll(TopicServiceRequestType.GetSubscriberHeads, MessageHelper.toCollectionOfInt32(iArr), MapOfChannelAndPosition.class).thenApply((Function<? super M, ? extends U>) TopicHelper::fromProtobufChannelAndPosition);
        try {
            return (Map) thenApply.get(getRequestTimeoutMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
            thenApply.cancel(true);
            throw new RequestTimeoutException(e);
        } catch (ExecutionException e2) {
            throw Exceptions.ensureRuntimeException(e2);
        }
    }

    public Map<Integer, Position> getTopicTails() {
        CompletableFuture thenApply = poll(TopicServiceRequestType.GetTails, MapOfChannelAndPosition.class).thenApply((Function<? super M, ? extends U>) TopicHelper::fromProtobufChannelAndPosition);
        try {
            return (Map) thenApply.get(getRequestTimeoutMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
            thenApply.cancel(true);
            throw new RequestTimeoutException(e);
        } catch (ExecutionException e2) {
            throw Exceptions.ensureRuntimeException(e2);
        }
    }

    public Map<Integer, Position> getLastCommittedInGroup(SubscriberGroupId subscriberGroupId) {
        CompletableFuture thenApply = poll(TopicServiceRequestType.GetLastCommited, MapOfChannelAndPosition.class).thenApply((Function<? super M, ? extends U>) TopicHelper::fromProtobufChannelAndPosition);
        try {
            return (Map) thenApply.get(getRequestTimeoutMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
            thenApply.cancel(true);
            throw new RequestTimeoutException(e);
        } catch (ExecutionException e2) {
            throw Exceptions.ensureRuntimeException(e2);
        }
    }

    public Map<Integer, SeekResult> seekToPosition(SubscriberConnector.ConnectedSubscriber<V> connectedSubscriber, Map<Integer, Position> map) {
        CompletableFuture thenApply = poll(TopicServiceRequestType.SeekSubscriber, SeekRequest.newBuilder().setByPosition(TopicHelper.toProtobufChannelAndPosition(map)).build(), SeekResponse.class).thenApply((Function<? super M, ? extends U>) this::handleSeekResponse);
        try {
            return (Map) thenApply.get(getRequestTimeoutMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
            thenApply.cancel(true);
            throw new RequestTimeoutException(e);
        } catch (ExecutionException e2) {
            throw Exceptions.ensureRuntimeException(e2);
        }
    }

    public Map<Integer, SeekResult> seekToTimestamp(SubscriberConnector.ConnectedSubscriber<V> connectedSubscriber, Map<Integer, Instant> map) {
        CompletableFuture thenApply = poll(TopicServiceRequestType.SeekSubscriber, SeekRequest.newBuilder().setByTimestamp(TopicHelper.toProtobufChannelAndTimestamp(map)).build(), SeekResponse.class).thenApply((Function<? super M, ? extends U>) this::handleSeekResponse);
        try {
            return (Map) thenApply.get(getRequestTimeoutMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
            thenApply.cancel(true);
            throw new RequestTimeoutException(e);
        } catch (ExecutionException e2) {
            throw Exceptions.ensureRuntimeException(e2);
        }
    }

    protected Map<Integer, SeekResult> handleSeekResponse(SeekResponse seekResponse) {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : seekResponse.getPositionsMap().entrySet()) {
            SeekedPositions seekedPositions = (SeekedPositions) entry.getValue();
            hashMap.put((Integer) entry.getKey(), new SeekProcessor.Result(TopicHelper.fromProtobufPosition(seekedPositions.getHead()), TopicHelper.fromProtobufPosition(seekedPositions.getSeekedTo())));
        }
        return hashMap;
    }

    public void closeSubscription(SubscriberConnector.ConnectedSubscriber<V> connectedSubscriber, boolean z) {
        if (this.f_connection.isConnected()) {
            this.f_connection.send(0, TopicServiceRequestType.DestroySubscriber, Int32Value.of(this.f_nProxyId));
            this.f_connection.close();
        }
    }

    public TopicDependencies getTopicDependencies() {
        return this.f_connector.getTopicService().getTopicBackingMapManager().getTopicDependencies(this.f_sTopicName);
    }

    @Override // com.oracle.coherence.grpc.client.common.GrpcConnection.ConnectionListener
    public void onConnectionEvent(GrpcConnection.ConnectionEvent connectionEvent) {
        if (connectionEvent.getType() == GrpcConnection.ConnectionEvent.Type.Disconnected) {
            dispatchEvent(new SubscriberConnector.SubscriberEvent(this, SubscriberConnector.SubscriberEvent.Type.Disconnected));
        }
    }

    protected long getRequestTimeoutMillis() {
        return this.f_connector.getTopicService().getDependencies().getRequestTimeoutMillis();
    }

    protected CompletableFuture<TopicServiceResponse> poll(TopicServiceRequestType topicServiceRequestType) {
        return this.f_connection.poll(this.f_nProxyId, topicServiceRequestType);
    }

    protected CompletableFuture<TopicServiceResponse> poll(TopicServiceRequestType topicServiceRequestType, Message message) {
        return this.f_connection.poll(this.f_nProxyId, topicServiceRequestType, message);
    }

    protected <M extends Message> CompletableFuture<M> poll(TopicServiceRequestType topicServiceRequestType, Class<M> cls) {
        return (CompletableFuture<M>) this.f_connection.poll(this.f_nProxyId, topicServiceRequestType).thenApply(topicServiceResponse -> {
            return this.f_connection.unpackMessage(topicServiceResponse, cls);
        });
    }

    protected <M extends Message> CompletableFuture<M> poll(TopicServiceRequestType topicServiceRequestType, Message message, Class<M> cls) {
        return (CompletableFuture<M>) this.f_connection.poll(this.f_nProxyId, topicServiceRequestType, message).thenApply(topicServiceResponse -> {
            return this.f_connection.unpackMessage(topicServiceResponse, cls);
        });
    }

    protected TopicServiceResponse send(TopicServiceRequestType topicServiceRequestType, Message message) {
        CompletableFuture<TopicServiceResponse> poll = poll(topicServiceRequestType, message);
        try {
            return poll.get(getRequestTimeoutMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
            poll.cancel(true);
            throw new RequestTimeoutException(e);
        } catch (ExecutionException e2) {
            throw Exceptions.ensureRuntimeException(e2);
        }
    }

    protected <M extends Message> M send(TopicServiceRequestType topicServiceRequestType, Message message, Class<M> cls) {
        CompletableFuture<M> poll = poll(topicServiceRequestType, message, cls);
        try {
            return poll.get(getRequestTimeoutMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
            poll.cancel(true);
            throw new RequestTimeoutException(e);
        } catch (ExecutionException e2) {
            throw Exceptions.ensureRuntimeException(e2);
        }
    }
}
