package org.apache.flink.streaming.connectors.fs.bucketing;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.fs.Clock;
import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.class */
public class BucketingSink<T> extends RichSinkFunction<T> implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
    private static final long serialVersionUID = 1;
    private static final long DEFAULT_BATCH_SIZE = 402653184;
    private static final long DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS = 60000;
    private static final long DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS = 60000;
    private static final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
    private static final String DEFAULT_IN_PROGRESS_PREFIX = "_";
    private static final String DEFAULT_PENDING_SUFFIX = ".pending";
    private static final String DEFAULT_PENDING_PREFIX = "_";
    private static final String DEFAULT_VALID_SUFFIX = ".valid-length";
    private static final String DEFAULT_VALID_PREFIX = "_";
    private static final String DEFAULT_PART_PREFIX = "part";
    private static final long DEFAULT_ASYNC_TIMEOUT_MS = 60000;
    private static final long DEFAULT_BATCH_ROLLOVER_INTERVAL = Long.MAX_VALUE;
    private final String basePath;
    private transient Method refTruncate;
    private transient State<T> state;
    private transient ListState<State<T>> restoredBucketStates;

    @Nullable
    private Configuration fsConfig;
    private transient FileSystem fs;
    private transient Clock clock;
    private transient ProcessingTimeService processingTimeService;
    private static final Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
    private static final String DEFAULT_PART_SUFFIX = null;
    private long batchSize = DEFAULT_BATCH_SIZE;
    private long inactiveBucketCheckInterval = 60000;
    private long inactiveBucketThreshold = 60000;
    private long batchRolloverInterval = DEFAULT_BATCH_ROLLOVER_INTERVAL;
    private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
    private String inProgressPrefix = "_";
    private String pendingSuffix = DEFAULT_PENDING_SUFFIX;
    private String pendingPrefix = "_";
    private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
    private String validLengthPrefix = "_";
    private String partPrefix = DEFAULT_PART_PREFIX;
    private String partSuffix = DEFAULT_PART_SUFFIX;
    private boolean useTruncate = true;
    private long asyncTimeout = 60000;
    private Bucketer<T> bucketer = new DateTimeBucketer();
    private Writer<T> writerTemplate = new StringWriter();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink$BucketState.class */
    public static final class BucketState<T> implements Serializable {
        private static final long serialVersionUID = 1;
        String currentFile;
        long lastWrittenToTime;
        long creationTime;
        private transient int partCounter;
        private transient boolean isWriterOpen;
        private transient Writer<T> writer;
        long currentFileValidLength = -1;
        List<String> pendingFiles = new ArrayList();
        final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap();

        public String toString() {
            return "In-progress=" + this.currentFile + " validLength=" + this.currentFileValidLength + " pendingForNextCheckpoint=" + this.pendingFiles + " pendingForPrevCheckpoints=" + this.pendingFilesPerCheckpoint + " lastModified@" + this.lastWrittenToTime;
        }

        BucketState(long j) {
            this.lastWrittenToTime = j;
        }

        static /* synthetic */ int access$308(BucketState bucketState) {
            int i = bucketState.partCounter;
            bucketState.partCounter = i + 1;
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink$State.class */
    public static final class State<T> implements Serializable {
        private static final long serialVersionUID = 1;
        final Map<String, BucketState<T>> bucketStates = new HashMap();

        State() {
        }

        void addBucketState(Path path, BucketState<T> bucketState) {
            synchronized (this.bucketStates) {
                this.bucketStates.put(path.toString(), bucketState);
            }
        }

        BucketState<T> getBucketState(Path path) {
            BucketState<T> bucketState;
            synchronized (this.bucketStates) {
                bucketState = this.bucketStates.get(path.toString());
            }
            return bucketState;
        }

        public String toString() {
            return this.bucketStates.toString();
        }
    }

    public BucketingSink(String str) {
        this.basePath = str;
    }

    public BucketingSink<T> setFSConfig(Configuration configuration) {
        this.fsConfig = new Configuration();
        this.fsConfig.addAll(configuration);
        return this;
    }

    public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration configuration) {
        this.fsConfig = new Configuration();
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            this.fsConfig.setString((String) entry.getKey(), (String) entry.getValue());
        }
        return this;
    }

    public void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) {
        if (this.writerTemplate instanceof InputTypeConfigurable) {
            this.writerTemplate.setInputType(typeInformation, executionConfig);
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
        try {
            initFileSystem();
            if (this.refTruncate == null) {
                this.refTruncate = reflectTruncate(this.fs);
            }
            this.restoredBucketStates = functionInitializationContext.getOperatorStateStore().getSerializableListState("bucket-states");
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            if (!functionInitializationContext.isRestored()) {
                LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask));
                return;
            }
            LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask));
            for (State<T> state : (Iterable) this.restoredBucketStates.get()) {
                handleRestoredBucketState(state);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} idx {} restored {}", new Object[]{getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), state});
                }
            }
        } catch (IOException e) {
            LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
            throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
        }
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.state = new State<>();
        this.processingTimeService = getRuntimeContext().getProcessingTimeService();
        this.processingTimeService.registerTimer(this.processingTimeService.getCurrentProcessingTime() + this.inactiveBucketCheckInterval, this);
        this.clock = new Clock() { // from class: org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.1
            @Override // org.apache.flink.streaming.connectors.fs.Clock
            public long currentTimeMillis() {
                return BucketingSink.this.processingTimeService.getCurrentProcessingTime();
            }
        };
    }

    private void initFileSystem() throws IOException {
        if (this.fs == null) {
            this.fs = createHadoopFileSystem(new Path(this.basePath), this.fsConfig);
        }
    }

    public void close() throws Exception {
        if (this.state != null) {
            Iterator<Map.Entry<String, BucketState<T>>> it = this.state.bucketStates.entrySet().iterator();
            while (it.hasNext()) {
                closeCurrentPartFile(it.next().getValue());
            }
        }
    }

    public void invoke(T t) throws Exception {
        Path bucketPath = this.bucketer.getBucketPath(this.clock, new Path(this.basePath), t);
        long currentProcessingTime = this.processingTimeService.getCurrentProcessingTime();
        BucketState<T> bucketState = this.state.getBucketState(bucketPath);
        if (bucketState == null) {
            bucketState = new BucketState<>(currentProcessingTime);
            this.state.addBucketState(bucketPath, bucketState);
        }
        if (shouldRoll(bucketState, currentProcessingTime)) {
            openNewPartFile(bucketPath, bucketState);
        }
        ((BucketState) bucketState).writer.write(t);
        bucketState.lastWrittenToTime = currentProcessingTime;
    }

    private boolean shouldRoll(BucketState<T> bucketState, long j) throws IOException {
        boolean z = false;
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        if (((BucketState) bucketState).isWriterOpen) {
            long pos = ((BucketState) bucketState).writer.getPos();
            if (pos > this.batchSize) {
                z = true;
                LOG.debug("BucketingSink {} starting new bucket because file position {} is above batch size {}.", new Object[]{Integer.valueOf(indexOfThisSubtask), Long.valueOf(pos), Long.valueOf(this.batchSize)});
            } else if (j - bucketState.creationTime > this.batchRolloverInterval) {
                z = true;
                LOG.debug("BucketingSink {} starting new bucket because file is older than roll over interval {}.", Integer.valueOf(indexOfThisSubtask), Long.valueOf(this.batchRolloverInterval));
            }
        } else {
            z = true;
            LOG.debug("BucketingSink {} starting new bucket.", Integer.valueOf(indexOfThisSubtask));
        }
        return z;
    }

    public void onProcessingTime(long j) throws Exception {
        long currentProcessingTime = this.processingTimeService.getCurrentProcessingTime();
        closePartFilesByTime(currentProcessingTime);
        this.processingTimeService.registerTimer(currentProcessingTime + this.inactiveBucketCheckInterval, this);
    }

    private void closePartFilesByTime(long j) throws Exception {
        synchronized (this.state.bucketStates) {
            for (Map.Entry<String, BucketState<T>> entry : this.state.bucketStates.entrySet()) {
                if (entry.getValue().lastWrittenToTime < j - this.inactiveBucketThreshold || entry.getValue().creationTime < j - this.batchRolloverInterval) {
                    LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Long.valueOf(this.inactiveBucketThreshold));
                    closeCurrentPartFile(entry.getValue());
                }
            }
        }
    }

    private void openNewPartFile(Path path, BucketState<T> bucketState) throws Exception {
        Path path2;
        closeCurrentPartFile(bucketState);
        if (!this.fs.exists(path)) {
            try {
                if (this.fs.mkdirs(path)) {
                    LOG.debug("Created new bucket directory: {}", path);
                }
            } catch (IOException e) {
                throw new RuntimeException("Could not create new bucket path.", e);
            }
        }
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        Path assemblePartPath = assemblePartPath(path, indexOfThisSubtask, ((BucketState) bucketState).partCounter);
        while (true) {
            path2 = assemblePartPath;
            if (!this.fs.exists(path2) && !this.fs.exists(getPendingPathFor(path2)) && !this.fs.exists(getInProgressPathFor(path2))) {
                break;
            }
            BucketState.access$308(bucketState);
            assemblePartPath = assemblePartPath(path, indexOfThisSubtask, ((BucketState) bucketState).partCounter);
        }
        bucketState.creationTime = this.processingTimeService.getCurrentProcessingTime();
        BucketState.access$308(bucketState);
        LOG.debug("Next part path is {}", path2.toString());
        bucketState.currentFile = path2.toString();
        Path inProgressPathFor = getInProgressPathFor(path2);
        if (((BucketState) bucketState).writer == null) {
            ((BucketState) bucketState).writer = this.writerTemplate.duplicate();
            if (((BucketState) bucketState).writer == null) {
                throw new UnsupportedOperationException("Could not duplicate writer. Class '" + this.writerTemplate.getClass().getCanonicalName() + "' must implement the 'Writer.duplicate()' method.");
            }
        }
        ((BucketState) bucketState).writer.open(this.fs, inProgressPathFor);
        ((BucketState) bucketState).isWriterOpen = true;
    }

    private void closeCurrentPartFile(BucketState<T> bucketState) throws Exception {
        if (((BucketState) bucketState).isWriterOpen) {
            ((BucketState) bucketState).writer.close();
            ((BucketState) bucketState).isWriterOpen = false;
        }
        if (bucketState.currentFile != null) {
            Path path = new Path(bucketState.currentFile);
            Path inProgressPathFor = getInProgressPathFor(path);
            Path pendingPathFor = getPendingPathFor(path);
            this.fs.rename(inProgressPathFor, pendingPathFor);
            LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPathFor, pendingPathFor);
            bucketState.pendingFiles.add(path.toString());
            bucketState.currentFile = null;
        }
    }

    private Method reflectTruncate(FileSystem fileSystem) {
        if (!this.useTruncate) {
            return null;
        }
        Method method = null;
        if (fileSystem != null) {
            try {
                method = fileSystem.getClass().getMethod("truncate", Path.class, Long.TYPE);
                Path path = new Path(this.basePath, UUID.randomUUID().toString());
                try {
                    try {
                        FSDataOutputStream create = fileSystem.create(path);
                        Throwable th = null;
                        try {
                            try {
                                create.writeUTF("hello");
                                if (create != null) {
                                    if (0 != 0) {
                                        try {
                                            create.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        create.close();
                                    }
                                }
                                try {
                                    method.invoke(fileSystem, path, 2);
                                } catch (IllegalAccessException | InvocationTargetException e) {
                                    LOG.debug("Truncate is not supported.", e);
                                    method = null;
                                }
                                try {
                                    fileSystem.delete(path, false);
                                } catch (IOException e2) {
                                    LOG.error("Could not delete truncate test file.", e2);
                                    throw new RuntimeException("Could not delete truncate test file. You can disable support for truncate() completely via BucketingSink.setUseTruncate(false).", e2);
                                }
                            } finally {
                            }
                        } catch (Throwable th3) {
                            if (create != null) {
                                if (th != null) {
                                    try {
                                        create.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    create.close();
                                }
                            }
                            throw th3;
                        }
                    } catch (IOException e3) {
                        LOG.error("Could not create file for checking if truncate works.", e3);
                        throw new RuntimeException("Could not create file for checking if truncate works. You can disable support for truncate() completely via BucketingSink.setUseTruncate(false).", e3);
                    }
                } catch (Throwable th5) {
                    try {
                        fileSystem.delete(path, false);
                        throw th5;
                    } catch (IOException e4) {
                        LOG.error("Could not delete truncate test file.", e4);
                        throw new RuntimeException("Could not delete truncate test file. You can disable support for truncate() completely via BucketingSink.setUseTruncate(false).", e4);
                    }
                }
            } catch (NoSuchMethodException e5) {
                LOG.debug("Truncate not found. Will write a file with suffix '{}'  and prefix '{}' to specify how many bytes in a bucket are valid.", this.validLengthSuffix, this.validLengthPrefix);
                return null;
            }
        }
        return method;
    }

    private Path assemblePartPath(Path path, int i, int i2) {
        return new Path(path, String.format("%s-%s-%s%s", this.partPrefix, Integer.valueOf(i), Integer.valueOf(i2), this.partSuffix != null ? this.partSuffix : ""));
    }

    private Path getPendingPathFor(Path path) {
        return new Path(path.getParent(), this.pendingPrefix + path.getName()).suffix(this.pendingSuffix);
    }

    private Path getInProgressPathFor(Path path) {
        return new Path(path.getParent(), this.inProgressPrefix + path.getName()).suffix(this.inProgressSuffix);
    }

    private Path getValidLengthPathFor(Path path) {
        return new Path(path.getParent(), this.validLengthPrefix + path.getName()).suffix(this.validLengthSuffix);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        synchronized (this.state.bucketStates) {
            Iterator<Map.Entry<String, BucketState<T>>> it = this.state.bucketStates.entrySet().iterator();
            while (it.hasNext()) {
                BucketState<T> value = it.next().getValue();
                synchronized (value.pendingFilesPerCheckpoint) {
                    Iterator<Map.Entry<Long, List<String>>> it2 = value.pendingFilesPerCheckpoint.entrySet().iterator();
                    while (it2.hasNext()) {
                        Map.Entry<Long, List<String>> next = it2.next();
                        Long key = next.getKey();
                        List<String> value2 = next.getValue();
                        if (key.longValue() <= j) {
                            LOG.debug("Moving pending files to final location for checkpoint {}", key);
                            Iterator<String> it3 = value2.iterator();
                            while (it3.hasNext()) {
                                Path path = new Path(it3.next());
                                Path pendingPathFor = getPendingPathFor(path);
                                this.fs.rename(pendingPathFor, path);
                                LOG.debug("Moving pending file {} to final location having completed checkpoint {}.", pendingPathFor, key);
                            }
                            it2.remove();
                        }
                    }
                    if (!((BucketState) value).isWriterOpen && value.pendingFiles.isEmpty() && value.pendingFilesPerCheckpoint.isEmpty()) {
                        it.remove();
                    }
                }
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkNotNull(this.restoredBucketStates, "The operator has not been properly initialized.");
        this.restoredBucketStates.clear();
        synchronized (this.state.bucketStates) {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            Iterator<Map.Entry<String, BucketState<T>>> it = this.state.bucketStates.entrySet().iterator();
            while (it.hasNext()) {
                BucketState<T> value = it.next().getValue();
                if (((BucketState) value).isWriterOpen) {
                    value.currentFileValidLength = ((BucketState) value).writer.flush();
                }
                synchronized (value.pendingFilesPerCheckpoint) {
                    value.pendingFilesPerCheckpoint.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), value.pendingFiles);
                }
                value.pendingFiles = new ArrayList();
            }
            this.restoredBucketStates.add(this.state);
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} idx {} checkpointed {}.", new Object[]{getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), this.state});
            }
        }
    }

    private void handleRestoredBucketState(State<T> state) {
        Preconditions.checkNotNull(state);
        for (BucketState<T> bucketState : state.bucketStates.values()) {
            bucketState.pendingFiles.clear();
            handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileValidLength);
            bucketState.currentFile = null;
            bucketState.currentFileValidLength = -1L;
            ((BucketState) bucketState).isWriterOpen = false;
            handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
            bucketState.pendingFilesPerCheckpoint.clear();
        }
    }

    private void handleRestoredRollingSinkState(RollingSink.BucketState bucketState) {
        bucketState.pendingFiles.clear();
        handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileValidLength);
        bucketState.currentFile = null;
        bucketState.currentFileValidLength = -1L;
        handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
        bucketState.pendingFilesPerCheckpoint.clear();
    }

    private void handlePendingInProgressFile(String str, long j) {
        if (str != null) {
            Path path = new Path(str);
            try {
                Path pendingPathFor = getPendingPathFor(path);
                Path inProgressPathFor = getInProgressPathFor(path);
                if (this.fs.exists(pendingPathFor)) {
                    LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", path);
                    this.fs.rename(pendingPathFor, path);
                } else if (this.fs.exists(inProgressPathFor)) {
                    LOG.debug("In-progress file {} is still in-progress, moving to final location.", path);
                    this.fs.rename(inProgressPathFor, path);
                } else if (this.fs.exists(path)) {
                    LOG.debug("In-Progress file {} was already moved to final location {}.", str, path);
                } else {
                    LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, it was moved to final location by a previous snapshot restore", str);
                }
                if (this.refTruncate == null) {
                    this.refTruncate = reflectTruncate(this.fs);
                }
                if (this.refTruncate != null) {
                    LOG.debug("Truncating {} to valid length {}", path, Long.valueOf(j));
                    if (this.fs instanceof DistributedFileSystem) {
                        DistributedFileSystem distributedFileSystem = this.fs;
                        LOG.debug("Trying to recover file lease {}", path);
                        distributedFileSystem.recoverLease(path);
                        StopWatch stopWatch = new StopWatch();
                        stopWatch.start();
                        for (boolean isFileClosed = distributedFileSystem.isFileClosed(path); !isFileClosed && stopWatch.getTime() <= this.asyncTimeout; isFileClosed = distributedFileSystem.isFileClosed(path)) {
                            try {
                                Thread.sleep(500L);
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                    if (!((Boolean) this.refTruncate.invoke(this.fs, path, Long.valueOf(j))).booleanValue()) {
                        LOG.debug("Truncate did not immediately complete for {}, waiting...", path);
                        StopWatch stopWatch2 = new StopWatch();
                        stopWatch2.start();
                        long len = this.fs.getFileStatus(path).getLen();
                        while (len != j && stopWatch2.getTime() <= this.asyncTimeout) {
                            try {
                                Thread.sleep(500L);
                            } catch (InterruptedException e2) {
                            }
                            len = this.fs.getFileStatus(path).getLen();
                        }
                        if (len != j) {
                            throw new RuntimeException("Truncate did not truncate to right length. Should be " + j + " is " + len + ".");
                        }
                    }
                } else {
                    Path validLengthPathFor = getValidLengthPathFor(path);
                    if (!this.fs.exists(validLengthPathFor) && this.fs.exists(path)) {
                        LOG.debug("Writing valid-length file for {} to specify valid length {}", path, Long.valueOf(j));
                        FSDataOutputStream create = this.fs.create(validLengthPathFor);
                        Throwable th = null;
                        try {
                            try {
                                create.writeUTF(Long.toString(j));
                                if (create != null) {
                                    if (0 != 0) {
                                        try {
                                            create.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        create.close();
                                    }
                                }
                            } catch (Throwable th3) {
                                th = th3;
                                throw th3;
                            }
                        } catch (Throwable th4) {
                            if (create != null) {
                                if (th != null) {
                                    try {
                                        create.close();
                                    } catch (Throwable th5) {
                                        th.addSuppressed(th5);
                                    }
                                } else {
                                    create.close();
                                }
                            }
                            throw th4;
                        }
                    }
                }
            } catch (IOException e3) {
                LOG.error("Error while restoring BucketingSink state.", e3);
                throw new RuntimeException("Error while restoring BucketingSink state.", e3);
            } catch (IllegalAccessException | InvocationTargetException e4) {
                LOG.error("Could not invoke truncate.", e4);
                throw new RuntimeException("Could not invoke truncate.", e4);
            }
        }
    }

    private void handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> map) {
        LOG.debug("Moving pending files to final location on restore.");
        for (Long l : map.keySet()) {
            Iterator<String> it = map.get(l).iterator();
            while (it.hasNext()) {
                Path path = new Path(it.next());
                Path pendingPathFor = getPendingPathFor(path);
                try {
                    if (this.fs.exists(pendingPathFor)) {
                        LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPathFor, l);
                        this.fs.rename(pendingPathFor, path);
                    }
                } catch (IOException e) {
                    LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", new Object[]{pendingPathFor, path, e});
                    throw new RuntimeException("Error while renaming pending file " + pendingPathFor + " to final path " + path, e);
                }
            }
        }
    }

    public BucketingSink<T> setBatchSize(long j) {
        this.batchSize = j;
        return this;
    }

    public BucketingSink<T> setBatchRolloverInterval(long j) {
        if (j > 0) {
            this.batchRolloverInterval = j;
        }
        return this;
    }

    public BucketingSink<T> setInactiveBucketCheckInterval(long j) {
        this.inactiveBucketCheckInterval = j;
        return this;
    }

    public BucketingSink<T> setInactiveBucketThreshold(long j) {
        this.inactiveBucketThreshold = j;
        return this;
    }

    public BucketingSink<T> setBucketer(Bucketer<T> bucketer) {
        this.bucketer = bucketer;
        return this;
    }

    public BucketingSink<T> setWriter(Writer<T> writer) {
        this.writerTemplate = writer;
        return this;
    }

    public BucketingSink<T> setInProgressSuffix(String str) {
        this.inProgressSuffix = str;
        return this;
    }

    public BucketingSink<T> setInProgressPrefix(String str) {
        this.inProgressPrefix = str;
        return this;
    }

    public BucketingSink<T> setPendingSuffix(String str) {
        this.pendingSuffix = str;
        return this;
    }

    public BucketingSink<T> setPendingPrefix(String str) {
        this.pendingPrefix = str;
        return this;
    }

    public BucketingSink<T> setValidLengthSuffix(String str) {
        this.validLengthSuffix = str;
        return this;
    }

    public BucketingSink<T> setValidLengthPrefix(String str) {
        this.validLengthPrefix = str;
        return this;
    }

    public BucketingSink<T> setPartSuffix(String str) {
        this.partSuffix = str;
        return this;
    }

    public BucketingSink<T> setPartPrefix(String str) {
        this.partPrefix = str;
        return this;
    }

    public BucketingSink<T> setUseTruncate(boolean z) {
        this.useTruncate = z;
        return this;
    }

    @Deprecated
    public BucketingSink<T> disableCleanupOnOpen() {
        return this;
    }

    public BucketingSink<T> setAsyncTimeout(long j) {
        this.asyncTimeout = j;
        return this;
    }

    @VisibleForTesting
    public State<T> getState() {
        return this.state;
    }

    public static FileSystem createHadoopFileSystem(Path path, @Nullable Configuration configuration) throws IOException {
        org.apache.hadoop.conf.Configuration conf;
        org.apache.hadoop.conf.Configuration configuration2;
        HadoopFileSystem unguardedFileSystem = org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(path.toUri());
        FileSystem hadoopFileSystem = unguardedFileSystem instanceof HadoopFileSystem ? unguardedFileSystem.getHadoopFileSystem() : null;
        if (configuration == null && hadoopFileSystem != null) {
            return hadoopFileSystem;
        }
        if (hadoopFileSystem != null) {
            conf = hadoopFileSystem.getConf();
        } else {
            HadoopFileSystem unguardedFileSystem2 = org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(URI.create("hdfs://localhost:12345/"));
            if (!(unguardedFileSystem2 instanceof HadoopFileSystem)) {
                throw new IOException("Cannot instantiate a Hadoop file system to access the Hadoop configuration. FS for hdfs:// is " + unguardedFileSystem2.getClass().getName());
            }
            conf = unguardedFileSystem2.getHadoopFileSystem().getConf();
        }
        if (configuration == null) {
            configuration2 = conf;
        } else {
            configuration2 = new org.apache.hadoop.conf.Configuration(conf);
            for (String str : configuration.keySet()) {
                configuration2.set(str, configuration.getString(str, (String) null));
            }
        }
        URI uri = path.toUri();
        String scheme = uri.getScheme();
        String authority = uri.getAuthority();
        if (scheme == null && authority == null) {
            uri = FileSystem.getDefaultUri(configuration2);
        } else if (scheme != null && authority == null) {
            URI defaultUri = FileSystem.getDefaultUri(configuration2);
            if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
                uri = defaultUri;
            }
        }
        try {
            LocalFileSystem localFileSystem = (FileSystem) FileSystem.getFileSystemClass(uri.getScheme(), configuration2).newInstance();
            localFileSystem.initialize(uri, configuration2);
            return localFileSystem instanceof LocalFileSystem ? localFileSystem.getRaw() : localFileSystem;
        } catch (Exception e) {
            throw new IOException("Cannot instantiate the Hadoop file system", e);
        }
    }
}
