package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
@ThreadSafe
/* loaded from: input_file:org/apache/flink/changelog/fs/FsStateChangelogStorage.class */
public class FsStateChangelogStorage implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
    private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogStorage.class);
    private final StateChangeUploadScheduler uploader;
    private final long preEmptivePersistThresholdInBytes;
    private final AtomicInteger logIdGenerator;

    public FsStateChangelogStorage(Configuration configuration, TaskManagerJobMetricGroup taskManagerJobMetricGroup) throws IOException {
        this(StateChangeUploadScheduler.fromConfig(configuration, new ChangelogStorageMetricGroup(taskManagerJobMetricGroup)), ((MemorySize) configuration.get(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD)).getBytes());
    }

    @VisibleForTesting
    public FsStateChangelogStorage(Path path, boolean z, int i, ChangelogStorageMetricGroup changelogStorageMetricGroup) throws IOException {
        this(StateChangeUploadScheduler.directScheduler(new StateChangeFsUploader(path, path.getFileSystem(), z, i, changelogStorageMetricGroup)), ((MemorySize) FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD.defaultValue()).getBytes());
    }

    @VisibleForTesting
    public FsStateChangelogStorage(StateChangeUploadScheduler stateChangeUploadScheduler, long j) {
        this.logIdGenerator = new AtomicInteger(0);
        this.uploader = stateChangeUploadScheduler;
        this.preEmptivePersistThresholdInBytes = j;
    }

    /* renamed from: createWriter, reason: merged with bridge method [inline-methods] */
    public FsStateChangelogWriter m3createWriter(String str, KeyGroupRange keyGroupRange, MailboxExecutor mailboxExecutor) {
        UUID uuid = new UUID(0L, this.logIdGenerator.getAndIncrement());
        LOG.info("createWriter for operator {}/{}: {}", new Object[]{str, keyGroupRange, uuid});
        return new FsStateChangelogWriter(uuid, keyGroupRange, this.uploader, this.preEmptivePersistThresholdInBytes, mailboxExecutor);
    }

    public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader() {
        return new StateChangelogHandleStreamHandleReader(new StateChangeFormat());
    }

    public void close() throws Exception {
        this.uploader.close();
    }

    public AvailabilityProvider getAvailabilityProvider() {
        return this.uploader.getAvailabilityProvider();
    }
}
