package org.apache.flink.statefun.flink.core.logger;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.class */
public final class UnboundedFeedbackLogger<T> implements FeedbackLogger<T> {
    private final Supplier<KeyGroupStream<T>> supplier;
    private final ToIntFunction<T> keyGroupAssigner;
    private final Map<Integer, KeyGroupStream<T>> keyGroupStreams = new TreeMap();
    private final CheckpointedStreamOperations checkpointedStreamOperations;

    @Nullable
    private OutputStream keyedStateOutputStream;
    private TypeSerializer<T> serializer;
    private Closeable snapshotLease;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger$Header.class */
    public static final class Header {
        private static final int STATEFUN_VERSION = 0;
        private static final int STATEFUN_MAGIC = 710818519;
        private static final byte[] HEADER_BYTES = headerBytes();

        Header() {
        }

        public static void writeHeader(DataOutputView dataOutputView) throws IOException {
            dataOutputView.write(HEADER_BYTES);
        }

        public static InputStream skipHeaderSilently(InputStream inputStream) throws IOException {
            byte[] bArr = new byte[HEADER_BYTES.length];
            PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, bArr.length);
            int tryReadFully = InputStreamUtils.tryReadFully(pushbackInputStream, bArr);
            if (tryReadFully > 0 && !Arrays.equals(bArr, HEADER_BYTES)) {
                pushbackInputStream.unread(bArr, 0, tryReadFully);
            }
            return pushbackInputStream;
        }

        private static byte[] headerBytes() {
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(8);
            try {
                dataOutputSerializer.writeInt(0);
                dataOutputSerializer.writeInt(STATEFUN_MAGIC);
                return dataOutputSerializer.getCopyOfBuffer();
            } catch (IOException e) {
                throw new IllegalStateException("Unable to compute the header bytes");
            }
        }
    }

    public UnboundedFeedbackLogger(Supplier<KeyGroupStream<T>> supplier, ToIntFunction<T> toIntFunction, CheckpointedStreamOperations checkpointedStreamOperations, TypeSerializer<T> typeSerializer) {
        this.supplier = (Supplier) Objects.requireNonNull(supplier);
        this.keyGroupAssigner = (ToIntFunction) Objects.requireNonNull(toIntFunction);
        this.serializer = (TypeSerializer) Objects.requireNonNull(typeSerializer);
        this.checkpointedStreamOperations = (CheckpointedStreamOperations) Objects.requireNonNull(checkpointedStreamOperations);
    }

    @Override // org.apache.flink.statefun.flink.core.logger.FeedbackLogger
    public void startLogging(OutputStream outputStream) {
        this.checkpointedStreamOperations.requireKeyedStateCheckpointed(outputStream);
        this.keyedStateOutputStream = (OutputStream) Objects.requireNonNull(outputStream);
        this.snapshotLease = this.checkpointedStreamOperations.acquireLease(outputStream);
    }

    @Override // org.apache.flink.statefun.flink.core.logger.FeedbackLogger
    public void append(T t) {
        if (this.keyedStateOutputStream == null) {
            return;
        }
        keyGroupStreamFor(t).append(t);
    }

    @Override // org.apache.flink.statefun.flink.core.logger.FeedbackLogger
    public void commit() {
        try {
            try {
                flushToKeyedStateOutputStream();
                this.keyGroupStreams.clear();
                IOUtils.closeQuietly(this.snapshotLease);
                this.snapshotLease = null;
                this.keyedStateOutputStream = null;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            this.keyGroupStreams.clear();
            IOUtils.closeQuietly(this.snapshotLease);
            this.snapshotLease = null;
            this.keyedStateOutputStream = null;
            throw th;
        }
    }

    private void flushToKeyedStateOutputStream() throws IOException {
        Preconditions.checkState(this.keyedStateOutputStream != null, "Trying to flush envelopes not in a logging state");
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(this.keyedStateOutputStream);
        for (Integer num : this.checkpointedStreamOperations.keyGroupList(this.keyedStateOutputStream)) {
            this.checkpointedStreamOperations.startNewKeyGroup(this.keyedStateOutputStream, num.intValue());
            Header.writeHeader(dataOutputViewStreamWrapper);
            KeyGroupStream<T> keyGroupStream = this.keyGroupStreams.get(num);
            if (keyGroupStream == null) {
                KeyGroupStream.writeEmptyTo(dataOutputViewStreamWrapper);
            } else {
                keyGroupStream.writeTo(dataOutputViewStreamWrapper);
            }
        }
    }

    public void replyLoggedEnvelops(InputStream inputStream, FeedbackConsumer<T> feedbackConsumer) throws Exception {
        KeyGroupStream.readFrom(new DataInputViewStreamWrapper(Header.skipHeaderSilently(inputStream)), this.serializer, feedbackConsumer);
    }

    @Nonnull
    private KeyGroupStream<T> keyGroupStreamFor(T t) {
        int applyAsInt = this.keyGroupAssigner.applyAsInt(t);
        KeyGroupStream<T> keyGroupStream = this.keyGroupStreams.get(Integer.valueOf(applyAsInt));
        if (keyGroupStream == null) {
            Map<Integer, KeyGroupStream<T>> map = this.keyGroupStreams;
            Integer valueOf = Integer.valueOf(applyAsInt);
            KeyGroupStream<T> keyGroupStream2 = this.supplier.get();
            keyGroupStream = keyGroupStream2;
            map.put(valueOf, keyGroupStream2);
        }
        return keyGroupStream;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        IOUtils.closeQuietly(this.snapshotLease);
        this.snapshotLease = null;
        this.keyedStateOutputStream = null;
        this.keyGroupStreams.clear();
    }
}
