package dev.responsive.kafka.internal.db.rs3.client.grpc;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import dev.responsive.kafka.internal.db.rs3.client.CurrentOffsets;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.Put;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RS3Exception;
import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.rs3.RS3Grpc;
import dev.responsive.rs3.Rs3;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.class */
public class GrpcRS3Client implements RS3Client {
    static final Logger LOG = LoggerFactory.getLogger(GrpcRS3Client.class);
    static final long WAL_OFFSET_NONE = Long.MAX_VALUE;
    private final ManagedChannel channel;
    private final RS3Grpc.RS3BlockingStub stub;
    private final RS3Grpc.RS3Stub asyncStub;
    private final Stats stats = new Stats();

    /* loaded from: input_file:dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client$Stats.class */
    private static class Stats {
        private long totalReads = 0;
        private long totalReadsElapsedUs = 0;
        private Instant lastLog = Instant.EPOCH;

        private Stats() {
        }

        public synchronized void recordRead(Duration duration) {
            this.totalReads++;
            this.totalReadsElapsedUs += duration.toNanos() / 1000;
        }

        public synchronized void log() {
            Instant now = Instant.now();
            if (now.isBefore(this.lastLog.plus((TemporalAmount) Duration.ofSeconds(10L)))) {
                return;
            }
            this.lastLog = now;
            GrpcRS3Client.LOG.info("rs3 client read statistics: {} {}", Long.valueOf(this.totalReads), Long.valueOf(this.totalReadsElapsedUs));
        }
    }

    @VisibleForTesting
    GrpcRS3Client(ManagedChannel managedChannel, RS3Grpc.RS3BlockingStub rS3BlockingStub, RS3Grpc.RS3Stub rS3Stub) {
        this.channel = (ManagedChannel) Objects.requireNonNull(managedChannel);
        this.stub = (RS3Grpc.RS3BlockingStub) Objects.requireNonNull(rS3BlockingStub);
        this.asyncStub = (RS3Grpc.RS3Stub) Objects.requireNonNull(rS3Stub);
    }

    @Override // dev.responsive.kafka.internal.db.rs3.client.RS3Client
    public void close() {
        this.channel.shutdownNow();
    }

    public static GrpcRS3Client connect(String str) {
        ManagedChannel build = Grpc.newChannelBuilder(str, InsecureChannelCredentials.create()).build();
        return new GrpcRS3Client(build, RS3Grpc.newBlockingStub(build), RS3Grpc.newStub(build));
    }

    @Override // dev.responsive.kafka.internal.db.rs3.client.RS3Client
    public CurrentOffsets getCurrentOffsets(UUID uuid, LssId lssId, int i) {
        try {
            Rs3.GetOffsetsResult offsets = this.stub.getOffsets(Rs3.GetOffsetsRequest.newBuilder().setStoreId(uuidProto(uuid)).setLssId(lssIdProto(lssId)).setPssId(i).m122build());
            Objects.requireNonNull(offsets);
            checkField(offsets::hasWrittenOffset, "writtenOffset");
            Objects.requireNonNull(offsets);
            checkField(offsets::hasFlushedOffset, "flushedOffset");
            return new CurrentOffsets(offsets.getWrittenOffset() == WAL_OFFSET_NONE ? Optional.empty() : Optional.of(Long.valueOf(offsets.getWrittenOffset())), offsets.getFlushedOffset() == WAL_OFFSET_NONE ? Optional.empty() : Optional.of(Long.valueOf(offsets.getFlushedOffset())));
        } catch (StatusRuntimeException e) {
            throw new RS3Exception(e);
        }
    }

    @Override // dev.responsive.kafka.internal.db.rs3.client.RS3Client
    public StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsync(UUID uuid, LssId lssId, int i, Optional<Long> optional, long j) {
        GrpcMessageReceiver grpcMessageReceiver = new GrpcMessageReceiver();
        return new StreamSenderMessageReceiver<>(new GrpcStreamSender(walEntry -> {
            Rs3.WriteWALSegmentRequest.Builder expectedWrittenOffset = Rs3.WriteWALSegmentRequest.newBuilder().setStoreId(uuidProto(uuid)).setLssId(lssIdProto(lssId)).setPssId(i).setEndOffset(j).setExpectedWrittenOffset(((Long) optional.orElse(Long.valueOf(WAL_OFFSET_NONE))).longValue());
            addWalEntryToSegment(walEntry, expectedWrittenOffset);
            return expectedWrittenOffset.m332build();
        }, this.asyncStub.writeWALSegmentStream(grpcMessageReceiver)), grpcMessageReceiver.message().thenApply(writeWALSegmentResult -> {
            Objects.requireNonNull(writeWALSegmentResult);
            checkField(writeWALSegmentResult::hasFlushedOffset, "flushedOffset");
            return writeWALSegmentResult.getFlushedOffset() == WAL_OFFSET_NONE ? Optional.empty() : Optional.of(Long.valueOf(writeWALSegmentResult.getFlushedOffset()));
        }));
    }

    @Override // dev.responsive.kafka.internal.db.rs3.client.RS3Client
    public Optional<Long> writeWalSegment(UUID uuid, LssId lssId, int i, Optional<Long> optional, long j, List<WalEntry> list) {
        StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsync = writeWalSegmentAsync(uuid, lssId, i, optional, j);
        Iterator<WalEntry> it = list.iterator();
        while (it.hasNext()) {
            writeWalSegmentAsync.sender().sendNext(it.next());
        }
        writeWalSegmentAsync.sender().finish();
        try {
            return writeWalSegmentAsync.receiver().toCompletableFuture().get();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof RuntimeException) {
                throw ((RuntimeException) e2.getCause());
            }
            throw new RuntimeException(e2.getCause());
        }
    }

    @Override // dev.responsive.kafka.internal.db.rs3.client.RS3Client
    public Optional<byte[]> get(UUID uuid, LssId lssId, int i, Optional<Long> optional, byte[] bArr) {
        Instant now = Instant.now();
        Rs3.GetRequest.Builder key = Rs3.GetRequest.newBuilder().setStoreId(uuidProto(uuid)).setLssId(lssIdProto(lssId)).setPssId(i).setKey(ByteString.copyFrom(bArr));
        Objects.requireNonNull(key);
        optional.ifPresent((v1) -> {
            r1.setExpectedWrittenOffset(v1);
        });
        try {
            Rs3.GetResult getResult = this.stub.get(key.m182build());
            this.stats.recordRead(Duration.between(now, Instant.now()));
            this.stats.log();
            if (!getResult.hasResult()) {
                return Optional.empty();
            }
            Rs3.KeyValue result = getResult.getResult();
            Objects.requireNonNull(result);
            checkField(result::hasValue, "value");
            return Optional.of(result.getValue().toByteArray());
        } catch (StatusRuntimeException e) {
            throw new RS3Exception(e);
        }
    }

    private Rs3.UUID uuidProto(UUID uuid) {
        return Rs3.UUID.newBuilder().setHigh(uuid.getMostSignificantBits()).setLow(uuid.getLeastSignificantBits()).m302build();
    }

    private Rs3.LSSId lssIdProto(LssId lssId) {
        return Rs3.LSSId.newBuilder().setId(lssId.id()).m272build();
    }

    private void addWalEntryToSegment(WalEntry walEntry, Rs3.WriteWALSegmentRequest.Builder builder) {
        if (walEntry instanceof Put) {
            Put put = (Put) walEntry;
            Rs3.WriteWALSegmentRequest.Put.Builder key = Rs3.WriteWALSegmentRequest.Put.newBuilder().setKey(ByteString.copyFrom(put.key()));
            if (put.value().isPresent()) {
                key.setValue(ByteString.copyFrom(put.value().get()));
            }
            builder.setPut(key.m362build());
        }
    }

    private void checkField(Supplier<Boolean> supplier, String str) {
        if (!supplier.get().booleanValue()) {
            throw new RuntimeException("rs3 resp proto missing field " + str);
        }
    }
}
