package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/changelog/fs/StateChangeUploadScheduler.class */
public interface StateChangeUploadScheduler extends AutoCloseable {

    @ThreadSafe
    /* loaded from: input_file:org/apache/flink/changelog/fs/StateChangeUploadScheduler$UploadTask.class */
    public static final class UploadTask {
        final Collection<StateChangeSet> changeSets;
        final Consumer<List<UploadResult>> successCallback;
        final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
        final AtomicBoolean finished = new AtomicBoolean();

        public UploadTask(Collection<StateChangeSet> collection, Consumer<List<UploadResult>> consumer, BiConsumer<List<SequenceNumber>, Throwable> biConsumer) {
            this.changeSets = new ArrayList(collection);
            this.successCallback = consumer;
            this.failureCallback = biConsumer;
        }

        public void complete(List<UploadResult> list) {
            if (this.finished.compareAndSet(false, true)) {
                this.successCallback.accept(list);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void fail(Throwable th) {
            if (this.finished.compareAndSet(false, true)) {
                this.failureCallback.accept(this.changeSets.stream().map((v0) -> {
                    return v0.getSequenceNumber();
                }).collect(Collectors.toList()), th);
            }
        }

        public long getSize() {
            long j = 0;
            Iterator<StateChangeSet> it = this.changeSets.iterator();
            while (it.hasNext()) {
                j += it.next().getSize();
            }
            return j;
        }

        public Collection<StateChangeSet> getChangeSets() {
            return this.changeSets;
        }

        public String toString() {
            return "changeSets=" + this.changeSets;
        }

        public boolean isFinished() {
            return this.finished.get();
        }
    }

    void upload(UploadTask uploadTask) throws IOException;

    static StateChangeUploadScheduler directScheduler(final StateChangeUploader stateChangeUploader) {
        return new StateChangeUploadScheduler() { // from class: org.apache.flink.changelog.fs.StateChangeUploadScheduler.1
            @Override // org.apache.flink.changelog.fs.StateChangeUploadScheduler
            public void upload(UploadTask uploadTask) throws IOException {
                StateChangeUploader.this.upload(Collections.singletonList(uploadTask)).complete();
            }

            @Override // java.lang.AutoCloseable
            public void close() throws Exception {
                StateChangeUploader.this.close();
            }
        };
    }

    static StateChangeUploadScheduler fromConfig(JobID jobID, ReadableConfig readableConfig, ChangelogStorageMetricGroup changelogStorageMetricGroup, TaskChangelogRegistry taskChangelogRegistry, LocalRecoveryConfig localRecoveryConfig) throws IOException {
        Path path = new Path((String) readableConfig.get(FsStateChangelogOptions.BASE_PATH));
        long bytes = ((MemorySize) readableConfig.get(FsStateChangelogOptions.UPLOAD_BUFFER_SIZE)).getBytes();
        Preconditions.checkArgument(bytes <= 2147483647L);
        int i = (int) bytes;
        return new BatchingStateChangeUploadScheduler(((Duration) readableConfig.get(FsStateChangelogOptions.PERSIST_DELAY)).toMillis(), ((MemorySize) readableConfig.get(FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD)).getBytes(), RetryPolicy.fromConfig(readableConfig), localRecoveryConfig.isLocalRecoveryEnabled() ? new DuplicatingStateChangeFsUploader(jobID, path, path.getFileSystem(), ((Boolean) readableConfig.get(FsStateChangelogOptions.COMPRESSION_ENABLED)).booleanValue(), i, changelogStorageMetricGroup, taskChangelogRegistry, (LocalRecoveryDirectoryProvider) localRecoveryConfig.getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled())) : new StateChangeFsUploader(jobID, path, path.getFileSystem(), ((Boolean) readableConfig.get(FsStateChangelogOptions.COMPRESSION_ENABLED)).booleanValue(), i, changelogStorageMetricGroup, taskChangelogRegistry), ((Integer) readableConfig.get(FsStateChangelogOptions.NUM_UPLOAD_THREADS)).intValue(), ((MemorySize) readableConfig.get(FsStateChangelogOptions.IN_FLIGHT_DATA_LIMIT)).getBytes(), changelogStorageMetricGroup);
    }

    default AvailabilityProvider getAvailabilityProvider() {
        return () -> {
            return AvailabilityProvider.AVAILABLE;
        };
    }
}
