package org.apache.flink.statefun.flink.core.logger;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.statefun.flink.core.di.ObjectContainer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/logger/Loggers.class */
public final class Loggers {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/logger/Loggers$KeyAssigner.class */
    public static final class KeyAssigner<T> implements ToIntFunction<T> {
        private final Function<T, ?> keySelector;
        private final int maxParallelism;

        private KeyAssigner(Function<T, ?> function, int i) {
            this.keySelector = (Function) Objects.requireNonNull(function);
            this.maxParallelism = i;
        }

        @Override // java.util.function.ToIntFunction
        public int applyAsInt(T t) {
            return KeyGroupRangeAssignment.assignToKeyGroup(this.keySelector.apply(t), this.maxParallelism);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/logger/Loggers$KeyedStateCheckpointOutputStreamOps.class */
    public enum KeyedStateCheckpointOutputStreamOps implements CheckpointedStreamOperations {
        INSTANCE;

        @Override // org.apache.flink.statefun.flink.core.logger.CheckpointedStreamOperations
        public void requireKeyedStateCheckpointed(OutputStream outputStream) {
            if (!(outputStream instanceof KeyedStateCheckpointOutputStream)) {
                throw new IllegalStateException("Not a KeyedStateCheckpointOutputStream");
            }
        }

        @Override // org.apache.flink.statefun.flink.core.logger.CheckpointedStreamOperations
        public Iterable<Integer> keyGroupList(OutputStream outputStream) {
            return cast(outputStream).getKeyGroupList();
        }

        @Override // org.apache.flink.statefun.flink.core.logger.CheckpointedStreamOperations
        public void startNewKeyGroup(OutputStream outputStream, int i) throws IOException {
            cast(outputStream).startNewKeyGroup(i);
        }

        @Override // org.apache.flink.statefun.flink.core.logger.CheckpointedStreamOperations
        public Closeable acquireLease(OutputStream outputStream) {
            Preconditions.checkState(outputStream instanceof KeyedStateCheckpointOutputStream);
            try {
                ResourceGuard.Lease acquireLease = cast(outputStream).acquireLease();
                acquireLease.getClass();
                return acquireLease::close;
            } catch (IOException e) {
                throw new IllegalStateException("Unable to obtain a lease for the input stream.", e);
            }
        }

        private static KeyedStateCheckpointOutputStream cast(OutputStream outputStream) {
            Preconditions.checkState(outputStream instanceof KeyedStateCheckpointOutputStream);
            return (KeyedStateCheckpointOutputStream) outputStream;
        }
    }

    private Loggers() {
    }

    public static UnboundedFeedbackLoggerFactory<?> unboundedSpillableLoggerFactory(IOManager iOManager, int i, long j, TypeSerializer<?> typeSerializer, Function<?, ?> function) {
        return (UnboundedFeedbackLoggerFactory) unboundedSpillableLoggerContainer(iOManager, i, j, typeSerializer, function).get(UnboundedFeedbackLoggerFactory.class);
    }

    @VisibleForTesting
    static ObjectContainer unboundedSpillableLoggerContainer(IOManager iOManager, int i, long j, TypeSerializer<?> typeSerializer, Function<?, ?> function) {
        ObjectContainer objectContainer = new ObjectContainer();
        objectContainer.add("max-parallelism", (Class<? super Class>) Integer.TYPE, (Class) Integer.valueOf(i));
        objectContainer.add("in-memory-max-buffer-size", (Class<? super Class>) Long.TYPE, (Class) Long.valueOf(j));
        objectContainer.add("io-manager", (Class<? super Class>) IOManager.class, (Class) iOManager);
        objectContainer.add("key-group-supplier", Supplier.class, KeyGroupStreamFactory.class);
        objectContainer.add("key-group-assigner", (Class<? super Class>) ToIntFunction.class, (Class) new KeyAssigner(function, i));
        objectContainer.add("envelope-serializer", (Class<? super Class>) TypeSerializer.class, (Class) typeSerializer);
        objectContainer.add("checkpoint-stream-ops", (Class<? super Class>) CheckpointedStreamOperations.class, (Class) KeyedStateCheckpointOutputStreamOps.INSTANCE);
        objectContainer.add(UnboundedFeedbackLoggerFactory.class);
        return objectContainer;
    }
}
