package org.apache.flink.streaming.connectors.fs;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/fs/RollingSink.class */
public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConfigurable, Checkpointed<BucketState>, CheckpointListener {
    private static final long serialVersionUID = 1;
    private static Logger LOG = LoggerFactory.getLogger(RollingSink.class);
    private final String basePath;
    private Writer<T> writer;
    private transient Path currentPartPath;
    private transient Path currentBucketDirectory;
    private transient FSDataOutputStream outStream;
    private transient int subtaskIndex;
    private transient int partCounter;
    private transient Method refHflushOrSync;
    private transient Method refTruncate;
    private transient BucketState bucketState;
    private final long DEFAULT_BATCH_SIZE = 402653184;
    private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
    private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
    private final String DEFAULT_PENDING_SUFFIX = ".pending";
    private final String DEFAULT_PENDING_PREFIX = "_";
    private final String DEFAULT_VALID_SUFFIX = ".valid-length";
    private final String DEFAULT_VALID_PREFIX = "_";
    private final String DEFAULT_PART_REFIX = "part";
    private final long DEFAULT_ASYNC_TIMEOUT_MS = 60000;
    private boolean cleanupOnOpen = true;
    private String inProgressSuffix = ".in-progress";
    private String inProgressPrefix = "_";
    private String pendingSuffix = ".pending";
    private String pendingPrefix = "_";
    private String validLengthSuffix = ".valid-length";
    private String validLengthPrefix = "_";
    private String partPrefix = "part";
    private long asyncTimeout = 60000;
    private Bucketer bucketer = new DateTimeBucketer();
    private long batchSize = 402653184;
    private Writer<T> writerTemplate = new StringWriter();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/fs/RollingSink$BucketState.class */
    public static final class BucketState implements Serializable {
        private static final long serialVersionUID = 1;
        String currentFile = null;
        long currentFileValidLength = -1;
        List<String> pendingFiles = new ArrayList();
        final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap();

        BucketState() {
        }
    }

    public RollingSink(String str) {
        this.basePath = str;
    }

    public void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) {
        if (this.writerTemplate instanceof InputTypeConfigurable) {
            this.writerTemplate.setInputType(typeInformation, executionConfig);
        }
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
        this.partCounter = 0;
        this.writer = this.writerTemplate.duplicate();
        if (this.bucketState == null) {
            this.bucketState = new BucketState();
        }
        FileSystem fileSystem = new Path(this.basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
        this.refTruncate = reflectTruncate(fileSystem);
        try {
            if (fileSystem.exists(new Path(this.basePath)) && this.cleanupOnOpen) {
                RemoteIterator listFiles = fileSystem.listFiles(new Path(this.basePath), true);
                while (listFiles.hasNext()) {
                    LocatedFileStatus locatedFileStatus = (LocatedFileStatus) listFiles.next();
                    if (locatedFileStatus.getPath().toString().endsWith(this.pendingSuffix) && locatedFileStatus.getPath().toString().contains(this.partPrefix + "-" + this.subtaskIndex + "-")) {
                        LOG.debug("(OPEN) Deleting leftover pending file {}", locatedFileStatus.getPath().toString());
                        fileSystem.delete(locatedFileStatus.getPath(), true);
                    }
                    if (locatedFileStatus.getPath().toString().endsWith(this.inProgressSuffix) && locatedFileStatus.getPath().toString().contains(this.partPrefix + "-" + this.subtaskIndex + "-")) {
                        LOG.debug("(OPEN) Deleting leftover in-progress file {}", locatedFileStatus.getPath().toString());
                        fileSystem.delete(locatedFileStatus.getPath(), true);
                    }
                }
            }
        } catch (IOException e) {
            LOG.error("Error while deleting leftover pending/in-progress files: {}", e);
            throw new RuntimeException("Error while deleting leftover pending/in-progress files.", e);
        }
    }

    public void close() throws Exception {
        closeCurrentPartFile();
    }

    public void invoke(T t) throws Exception {
        if (shouldRoll()) {
            openNewPartFile();
        }
        this.writer.write(t);
    }

    private boolean shouldRoll() throws IOException {
        boolean z = false;
        if (this.outStream == null) {
            z = true;
            LOG.debug("RollingSink {} starting new initial bucket. ", Integer.valueOf(this.subtaskIndex));
        }
        if (this.bucketer.shouldStartNewBucket(new Path(this.basePath), this.currentBucketDirectory)) {
            z = true;
            LOG.debug("RollingSink {} starting new bucket because {} said we should. ", Integer.valueOf(this.subtaskIndex), this.bucketer);
            this.partCounter = 0;
        }
        if (this.outStream != null) {
            long pos = this.outStream.getPos();
            if (this.outStream != null && pos > this.batchSize) {
                z = true;
                LOG.debug("RollingSink {} starting new bucket because file position {} is above batch size {}.", new Object[]{Integer.valueOf(this.subtaskIndex), Long.valueOf(pos), Long.valueOf(this.batchSize)});
            }
        }
        return z;
    }

    private void openNewPartFile() throws Exception {
        closeCurrentPartFile();
        FileSystem fileSystem = new Path(this.basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
        Path nextBucketPath = this.bucketer.getNextBucketPath(new Path(this.basePath));
        if (!nextBucketPath.equals(this.currentBucketDirectory)) {
            this.currentBucketDirectory = nextBucketPath;
            try {
                if (fileSystem.mkdirs(this.currentBucketDirectory)) {
                    LOG.debug("Created new bucket directory: {}", this.currentBucketDirectory);
                }
            } catch (IOException e) {
                throw new RuntimeException("Could not create base path for new rolling file.", e);
            }
        }
        this.currentPartPath = new Path(this.currentBucketDirectory, this.partPrefix + "-" + this.subtaskIndex + "-" + this.partCounter);
        while (true) {
            if (!fileSystem.exists(this.currentPartPath) && !fileSystem.exists(new Path(this.currentPartPath.getParent(), this.pendingPrefix + this.currentPartPath.getName()).suffix(this.pendingSuffix))) {
                break;
            }
            this.partCounter++;
            this.currentPartPath = new Path(this.currentBucketDirectory, this.partPrefix + "-" + this.subtaskIndex + "-" + this.partCounter);
        }
        this.partCounter++;
        LOG.debug("Next part path is {}", this.currentPartPath.toString());
        this.outStream = fileSystem.create(new Path(this.currentPartPath.getParent(), this.inProgressPrefix + this.currentPartPath.getName()).suffix(this.inProgressSuffix), false);
        if (this.refHflushOrSync == null) {
            this.refHflushOrSync = reflectHflushOrSync(this.outStream);
        }
        this.writer.open(this.outStream);
    }

    private void closeCurrentPartFile() throws Exception {
        if (this.writer != null) {
            this.writer.close();
        }
        if (this.outStream != null) {
            hflushOrSync(this.outStream);
            this.outStream.close();
            this.outStream = null;
        }
        if (this.currentPartPath != null) {
            Path suffix = new Path(this.currentPartPath.getParent(), this.inProgressPrefix + this.currentPartPath.getName()).suffix(this.inProgressSuffix);
            Path suffix2 = new Path(this.currentPartPath.getParent(), this.pendingPrefix + this.currentPartPath.getName()).suffix(this.pendingSuffix);
            suffix.getFileSystem(new org.apache.hadoop.conf.Configuration()).rename(suffix, suffix2);
            LOG.debug("Moving in-progress bucket {} to pending file {}", suffix, suffix2);
            this.bucketState.pendingFiles.add(this.currentPartPath.toString());
        }
    }

    protected void hflushOrSync(FSDataOutputStream fSDataOutputStream) throws IOException {
        try {
            this.refHflushOrSync.invoke(fSDataOutputStream, new Object[0]);
        } catch (InvocationTargetException e) {
            LOG.error("Error while trying to hflushOrSync! " + e.getCause());
            Throwable cause = e.getCause();
            if (cause != null && (cause instanceof IOException)) {
                throw ((IOException) cause);
            }
            throw new RuntimeException("Error while trying to hflushOrSync!", e);
        } catch (Exception e2) {
            LOG.error("Error while trying to hflushOrSync! " + e2);
            throw new RuntimeException("Error while trying to hflushOrSync!", e2);
        }
    }

    private Method reflectHflushOrSync(FSDataOutputStream fSDataOutputStream) {
        Method method = null;
        if (fSDataOutputStream != null) {
            Class<?> cls = fSDataOutputStream.getClass();
            try {
                method = cls.getMethod("hflush", new Class[0]);
            } catch (NoSuchMethodException e) {
                LOG.debug("HFlush not found. Will use sync() instead");
                try {
                    method = cls.getMethod("sync", new Class[0]);
                } catch (Exception e2) {
                    LOG.error("Neither hflush not sync were found. That seems to be a problem!");
                    throw new RuntimeException("Neither hflush not sync were found. That seems to be a problem!", e2);
                }
            }
        }
        return method;
    }

    private Method reflectTruncate(FileSystem fileSystem) {
        Method method = null;
        if (fileSystem != null) {
            try {
                method = fileSystem.getClass().getMethod("truncate", Path.class, Long.TYPE);
                Path path = new Path(UUID.randomUUID().toString());
                try {
                    FSDataOutputStream create = fileSystem.create(path);
                    create.writeUTF("hello");
                    create.close();
                    try {
                        method.invoke(fileSystem, path, 2);
                    } catch (IllegalAccessException | InvocationTargetException e) {
                        LOG.debug("Truncate is not supported.", e);
                        method = null;
                    }
                    try {
                        fileSystem.delete(path, false);
                    } catch (IOException e2) {
                        LOG.error("Could not delete truncate test file.", e2);
                        throw new RuntimeException("Could not delete truncate test file.", e2);
                    }
                } catch (IOException e3) {
                    LOG.error("Could not create file for checking if truncate works.", e3);
                    throw new RuntimeException("Could not create file for checking if truncate works.", e3);
                }
            } catch (NoSuchMethodException e4) {
                LOG.debug("Truncate not found. Will write a file with suffix '{}'  and prefix '{}' to specify how many bytes in a bucket are valid.", this.validLengthSuffix, this.validLengthPrefix);
                return null;
            }
        }
        return method;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        synchronized (this.bucketState.pendingFilesPerCheckpoint) {
            Set<Long> keySet = this.bucketState.pendingFilesPerCheckpoint.keySet();
            HashSet hashSet = new HashSet();
            for (Long l : keySet) {
                if (l.longValue() <= j) {
                    LOG.debug("Moving pending files to final location for checkpoint {}", l);
                    Iterator<String> it = this.bucketState.pendingFilesPerCheckpoint.get(l).iterator();
                    while (it.hasNext()) {
                        Path path = new Path(it.next());
                        Path suffix = new Path(path.getParent(), this.pendingPrefix + path.getName()).suffix(this.pendingSuffix);
                        suffix.getFileSystem(new org.apache.hadoop.conf.Configuration()).rename(suffix, path);
                        LOG.debug("Moving pending file {} to final location after complete checkpoint {}.", suffix, l);
                    }
                    hashSet.add(l);
                }
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                this.bucketState.pendingFilesPerCheckpoint.remove((Long) it2.next());
            }
        }
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public BucketState m206snapshotState(long j, long j2) throws Exception {
        if (this.writer != null) {
            this.writer.flush();
        }
        if (this.outStream != null) {
            this.outStream.flush();
            hflushOrSync(this.outStream);
            this.bucketState.currentFile = this.currentPartPath.toString();
            this.bucketState.currentFileValidLength = this.outStream.getPos();
        }
        synchronized (this.bucketState.pendingFilesPerCheckpoint) {
            this.bucketState.pendingFilesPerCheckpoint.put(Long.valueOf(j), this.bucketState.pendingFiles);
        }
        this.bucketState.pendingFiles = new ArrayList();
        return this.bucketState;
    }

    public void restoreState(BucketState bucketState) {
        this.bucketState = bucketState;
        this.bucketState.pendingFiles.clear();
        try {
            DistributedFileSystem fileSystem = new Path(this.basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
            if (this.bucketState.currentFile != null) {
                Path path = new Path(this.bucketState.currentFile);
                try {
                    Path suffix = new Path(path.getParent(), this.pendingPrefix + path.getName()).suffix(this.pendingSuffix);
                    Path suffix2 = new Path(path.getParent(), this.inProgressPrefix + path.getName()).suffix(this.inProgressSuffix);
                    if (fileSystem.exists(suffix)) {
                        LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", path);
                        fileSystem.rename(suffix, path);
                    } else if (fileSystem.exists(suffix2)) {
                        LOG.debug("In-progress file {} is still in-progress, moving to final location.", path);
                        fileSystem.rename(suffix2, path);
                    } else if (fileSystem.exists(path)) {
                        LOG.debug("In-Progress file {} was already moved to final location {}.", this.bucketState.currentFile, path);
                    } else {
                        LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, it was moved to final location by a previous snapshot restore", this.bucketState.currentFile);
                    }
                    this.refTruncate = reflectTruncate(fileSystem);
                    if (this.refTruncate != null) {
                        LOG.debug("Truncating {} to valid length {}", path, Long.valueOf(this.bucketState.currentFileValidLength));
                        if (fileSystem instanceof DistributedFileSystem) {
                            DistributedFileSystem distributedFileSystem = fileSystem;
                            LOG.debug("Trying to recover file lease {}", path);
                            distributedFileSystem.recoverLease(path);
                            StopWatch stopWatch = new StopWatch();
                            stopWatch.start();
                            for (boolean isFileClosed = distributedFileSystem.isFileClosed(path); !isFileClosed && stopWatch.getTime() <= this.asyncTimeout; isFileClosed = distributedFileSystem.isFileClosed(path)) {
                                try {
                                    Thread.sleep(500L);
                                } catch (InterruptedException e) {
                                }
                            }
                        }
                        if (!((Boolean) this.refTruncate.invoke(fileSystem, path, Long.valueOf(this.bucketState.currentFileValidLength))).booleanValue()) {
                            LOG.debug("Truncate did not immediately complete for {}, waiting...", path);
                            StopWatch stopWatch2 = new StopWatch();
                            stopWatch2.start();
                            long len = fileSystem.getFileStatus(path).getLen();
                            while (len != this.bucketState.currentFileValidLength && stopWatch2.getTime() <= this.asyncTimeout) {
                                try {
                                    Thread.sleep(500L);
                                } catch (InterruptedException e2) {
                                }
                                len = fileSystem.getFileStatus(path).getLen();
                            }
                            if (len != this.bucketState.currentFileValidLength) {
                                throw new RuntimeException("Truncate did not truncate to right length. Should be " + this.bucketState.currentFileValidLength + " is " + len + ".");
                            }
                        }
                    } else {
                        LOG.debug("Writing valid-length file for {} to specify valid length {}", path, Long.valueOf(this.bucketState.currentFileValidLength));
                        Path suffix3 = new Path(path.getParent(), this.validLengthPrefix + path.getName()).suffix(this.validLengthSuffix);
                        if (!fileSystem.exists(suffix3)) {
                            FSDataOutputStream create = fileSystem.create(suffix3);
                            create.writeUTF(Long.toString(this.bucketState.currentFileValidLength));
                            create.close();
                        }
                    }
                    this.bucketState.currentFile = null;
                    this.bucketState.currentFileValidLength = -1L;
                } catch (IOException e3) {
                    LOG.error("Error while restoring RollingSink state.", e3);
                    throw new RuntimeException("Error while restoring RollingSink state.", e3);
                } catch (IllegalAccessException | InvocationTargetException e4) {
                    LOG.error("Cound not invoke truncate.", e4);
                    throw new RuntimeException("Could not invoke truncate.", e4);
                }
            }
            LOG.debug("Clearing pending/in-progress files.");
            Set<Long> keySet = this.bucketState.pendingFilesPerCheckpoint.keySet();
            LOG.debug("Moving pending files to final location on restore.");
            for (Long l : keySet) {
                Iterator<String> it = this.bucketState.pendingFilesPerCheckpoint.get(l).iterator();
                while (it.hasNext()) {
                    Path path2 = new Path(it.next());
                    Path suffix4 = new Path(path2.getParent(), this.pendingPrefix + path2.getName()).suffix(this.pendingSuffix);
                    try {
                        if (fileSystem.exists(suffix4)) {
                            LOG.debug("(RESTORE) Moving pending file {} to final location after complete checkpoint {}.", suffix4, l);
                            fileSystem.rename(suffix4, path2);
                        }
                    } catch (IOException e5) {
                        LOG.error("(RESTORE) Error while renaming pending file {} to final path {}: {}", new Object[]{suffix4, path2, e5});
                        throw new RuntimeException("Error while renaming pending file " + suffix4 + " to final path " + path2, e5);
                    }
                }
            }
            this.bucketState.pendingFiles.clear();
            synchronized (this.bucketState.pendingFilesPerCheckpoint) {
                this.bucketState.pendingFilesPerCheckpoint.clear();
            }
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            try {
                RemoteIterator listFiles = fileSystem.listFiles(new Path(this.basePath), true);
                while (listFiles.hasNext()) {
                    LocatedFileStatus locatedFileStatus = (LocatedFileStatus) listFiles.next();
                    if (locatedFileStatus.getPath().toString().endsWith(this.pendingSuffix) && locatedFileStatus.getPath().toString().contains(this.partPrefix + "-" + indexOfThisSubtask + "-")) {
                        LOG.debug("(RESTORE) Deleting pending file {}", locatedFileStatus.getPath().toString());
                        fileSystem.delete(locatedFileStatus.getPath(), true);
                    }
                    if (locatedFileStatus.getPath().toString().endsWith(this.inProgressSuffix) && locatedFileStatus.getPath().toString().contains(this.partPrefix + "-" + indexOfThisSubtask + "-")) {
                        LOG.debug("(RESTORE) Deleting in-progress file {}", locatedFileStatus.getPath().toString());
                        fileSystem.delete(locatedFileStatus.getPath(), true);
                    }
                }
            } catch (IOException e6) {
                LOG.error("Error while deleting old pending files: {}", e6);
                throw new RuntimeException("Error while deleting old pending files.", e6);
            }
        } catch (IOException e7) {
            LOG.error("Error while creating FileSystem in checkpoint restore.", e7);
            throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e7);
        }
    }

    public RollingSink<T> setBatchSize(long j) {
        this.batchSize = j;
        return this;
    }

    public RollingSink<T> setBucketer(Bucketer bucketer) {
        this.bucketer = bucketer;
        return this;
    }

    public RollingSink<T> setWriter(Writer<T> writer) {
        this.writerTemplate = writer;
        return this;
    }

    public RollingSink<T> setInProgressSuffix(String str) {
        this.inProgressSuffix = str;
        return this;
    }

    public RollingSink<T> setInProgressPrefix(String str) {
        this.inProgressPrefix = str;
        return this;
    }

    public RollingSink<T> setPendingSuffix(String str) {
        this.pendingSuffix = str;
        return this;
    }

    public RollingSink<T> setPendingPrefix(String str) {
        this.pendingPrefix = str;
        return this;
    }

    public RollingSink<T> setValidLengthSuffix(String str) {
        this.validLengthSuffix = str;
        return this;
    }

    public RollingSink<T> setValidLengthPrefix(String str) {
        this.validLengthPrefix = str;
        return this;
    }

    public RollingSink<T> setPartPrefix(String str) {
        this.partPrefix = str;
        return this;
    }

    public RollingSink<T> disableCleanupOnOpen() {
        this.cleanupOnOpen = false;
        return this;
    }

    public RollingSink<T> setAsyncTimeout(long j) {
        this.asyncTimeout = j;
        return this;
    }
}
