/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.changelog.fs.BatchingStateChangeUploader;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.changelog.fs.RetryPolicy;
import org.apache.flink.changelog.fs.StateChangeFsUploader;
import org.apache.flink.changelog.fs.StateChangeSet;
import org.apache.flink.changelog.fs.UploadResult;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.util.Preconditions;

interface StateChangeUploader
extends AutoCloseable {
    public void upload(UploadTask var1) throws IOException;

    default public void upload(Collection<UploadTask> tasks) throws IOException {
        for (UploadTask task : tasks) {
            this.upload(task);
        }
    }

    public static StateChangeUploader fromConfig(ReadableConfig config) throws IOException {
        Path basePath = new Path((String)config.get(FsStateChangelogOptions.BASE_PATH));
        long bytes = ((MemorySize)config.get(FsStateChangelogOptions.UPLOAD_BUFFER_SIZE)).getBytes();
        Preconditions.checkArgument((bytes <= Integer.MAX_VALUE ? 1 : 0) != 0);
        int bufferSize = (int)bytes;
        StateChangeFsUploader store = new StateChangeFsUploader(basePath, basePath.getFileSystem(), (Boolean)config.get(FsStateChangelogOptions.COMPRESSION_ENABLED), bufferSize);
        BatchingStateChangeUploader batchingStore = new BatchingStateChangeUploader(((Duration)config.get(FsStateChangelogOptions.PERSIST_DELAY)).toMillis(), ((MemorySize)config.get(FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD)).getBytes(), RetryPolicy.fromConfig(config), store, (Integer)config.get(FsStateChangelogOptions.NUM_UPLOAD_THREADS), ((MemorySize)config.get(FsStateChangelogOptions.IN_FLIGHT_DATA_LIMIT)).getBytes());
        return batchingStore;
    }

    @ThreadSafe
    public static final class UploadTask {
        final Collection<StateChangeSet> changeSets;
        final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
        final Consumer<List<UploadResult>> successCallback;
        final AtomicBoolean finished = new AtomicBoolean();

        public UploadTask(Collection<StateChangeSet> changeSets, Consumer<List<UploadResult>> successCallback, BiConsumer<List<SequenceNumber>, Throwable> failureCallback) {
            this.changeSets = new ArrayList<StateChangeSet>(changeSets);
            this.failureCallback = failureCallback;
            this.successCallback = successCallback;
        }

        public void complete(List<UploadResult> results) {
            if (this.finished.compareAndSet(false, true)) {
                this.successCallback.accept(results);
            }
        }

        public void fail(Throwable error) {
            if (this.finished.compareAndSet(false, true)) {
                this.failureCallback.accept(this.changeSets.stream().map(StateChangeSet::getSequenceNumber).collect(Collectors.toList()), error);
            }
        }

        public long getSize() {
            long size = 0L;
            for (StateChangeSet set : this.changeSets) {
                size = set.getSize();
            }
            return size;
        }

        public String toString() {
            return "changeSets=" + this.changeSets;
        }
    }
}

