package org.apache.flume.channel.file;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.channel.file.encryption.EncryptionConfiguration;
import org.apache.flume.channel.file.encryption.KeyProvider;
import org.apache.flume.channel.file.encryption.KeyProviderFactory;
import org.apache.flume.instrumentation.ChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Stable
/* loaded from: input_file:org/apache/flume/channel/file/FileChannel.class */
public class FileChannel extends BasicChannelSemantics {
    private static final Logger LOG = LoggerFactory.getLogger(FileChannel.class);
    private int capacity;
    private int keepAlive;
    private int transactionCapacity;
    private long checkpointInterval;
    private long maxFileSize;
    private File checkpointDir;
    private File[] dataDirs;
    private Log log;
    private volatile boolean open;
    private volatile Throwable startupError;
    private Semaphore queueRemaining;
    private int logWriteTimeout;
    private int checkpointWriteTimeout;
    private ChannelCounter channelCounter;
    private boolean useLogReplayV1;
    private KeyProvider encryptionKeyProvider;
    private String encryptionActiveKey;
    private String encryptionCipherProvider;
    private final ThreadLocal<FileBackedTransaction> transactions = new ThreadLocal<>();
    private String channelNameDescriptor = "[channel=unknown]";
    private boolean useFastReplay = false;

    /* loaded from: input_file:org/apache/flume/channel/file/FileChannel$FileBackedTransaction.class */
    static class FileBackedTransaction extends BasicTransactionSemantics {
        private final LinkedBlockingDeque<FlumeEventPointer> takeList;
        private final LinkedBlockingDeque<FlumeEventPointer> putList;
        private final long transactionID;
        private final int keepAlive;
        private final Log log;
        private final FlumeEventQueue queue;
        private final Semaphore queueRemaining;
        private final String channelNameDescriptor;
        private final ChannelCounter channelCounter;

        public FileBackedTransaction(Log log, long j, int i, int i2, Semaphore semaphore, String str, ChannelCounter channelCounter) {
            this.log = log;
            this.queue = log.getFlumeEventQueue();
            this.transactionID = j;
            this.keepAlive = i2;
            this.queueRemaining = semaphore;
            this.putList = new LinkedBlockingDeque<>(i);
            this.takeList = new LinkedBlockingDeque<>(i);
            this.channelNameDescriptor = "[channel=" + str + "]";
            this.channelCounter = channelCounter;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isClosed() {
            return BasicTransactionSemantics.State.CLOSED.equals(getState());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getStateAsString() {
            return String.valueOf(getState());
        }

        protected void doPut(Event event) throws InterruptedException {
            this.channelCounter.incrementEventPutAttemptCount();
            if (this.putList.remainingCapacity() == 0) {
                throw new ChannelException("Put queue for FileBackedTransaction of capacity " + this.putList.size() + " full, consider committing more frequently, increasing capacity or increasing thread count. " + this.channelNameDescriptor);
            }
            if (!this.queueRemaining.tryAcquire(this.keepAlive, TimeUnit.SECONDS)) {
                throw new ChannelException("The channel has reached it's capacity. This might be the result of a sink on the channel having too low of batch size, a downstream system running slower than normal, or that the channel capacity is just too low. " + this.channelNameDescriptor);
            }
            boolean tryLockShared = this.log.tryLockShared();
            try {
                try {
                    if (!tryLockShared) {
                        throw new ChannelException("Failed to obtain lock for writing to the log. Try increasing the log write timeout value. " + this.channelNameDescriptor);
                    }
                    FlumeEventPointer put = this.log.put(this.transactionID, event);
                    Preconditions.checkState(this.putList.offer(put), "putList offer failed " + this.channelNameDescriptor);
                    this.queue.addWithoutCommit(put, this.transactionID);
                    if (tryLockShared) {
                        this.log.unlockShared();
                    }
                    if (1 == 0) {
                        this.queueRemaining.release();
                    }
                } catch (IOException e) {
                    throw new ChannelException("Put failed due to IO error " + this.channelNameDescriptor, e);
                }
            } catch (Throwable th) {
                if (tryLockShared) {
                    this.log.unlockShared();
                }
                if (0 == 0) {
                    this.queueRemaining.release();
                }
                throw th;
            }
        }

        protected Event doTake() throws InterruptedException {
            this.channelCounter.incrementEventTakeAttemptCount();
            if (this.takeList.remainingCapacity() == 0) {
                throw new ChannelException("Take list for FileBackedTransaction, capacity " + this.takeList.size() + " full, consider committing more frequently, increasing capacity, or increasing thread count. " + this.channelNameDescriptor);
            }
            if (!this.log.tryLockShared()) {
                throw new ChannelException("Failed to obtain lock for writing to the log. Try increasing the log write timeout value. " + this.channelNameDescriptor);
            }
            try {
                FlumeEventPointer removeHead = this.queue.removeHead(this.transactionID);
                if (removeHead == null) {
                    return null;
                }
                try {
                    Preconditions.checkState(this.takeList.offer(removeHead), "takeList offer failed " + this.channelNameDescriptor);
                    this.log.take(this.transactionID, removeHead);
                    FlumeEvent flumeEvent = this.log.get(removeHead);
                    this.log.unlockShared();
                    return flumeEvent;
                } catch (IOException e) {
                    throw new ChannelException("Take failed due to IO error " + this.channelNameDescriptor, e);
                }
            } finally {
                this.log.unlockShared();
            }
        }

        protected void doCommit() throws InterruptedException {
            int size = this.putList.size();
            int size2 = this.takeList.size();
            if (size > 0) {
                Preconditions.checkState(size2 == 0, "nonzero puts and takes " + this.channelNameDescriptor);
                if (!this.log.tryLockShared()) {
                    throw new ChannelException("Failed to obtain lock for writing to the log. Try increasing the log write timeout value. " + this.channelNameDescriptor);
                }
                try {
                    try {
                        this.log.commitPut(this.transactionID);
                        this.channelCounter.addToEventPutSuccessCount(size);
                        synchronized (this.queue) {
                            while (!this.putList.isEmpty()) {
                                if (!this.queue.addTail(this.putList.removeFirst())) {
                                    StringBuilder sb = new StringBuilder();
                                    sb.append("Queue add failed, this shouldn't be able to ");
                                    sb.append("happen. A portion of the transaction has been ");
                                    sb.append("added to the queue but the remaining portion ");
                                    sb.append("cannot be added. Those messages will be consumed ");
                                    sb.append("despite this transaction failing. Please report.");
                                    sb.append(this.channelNameDescriptor);
                                    FileChannel.LOG.error(sb.toString());
                                    Preconditions.checkState(false, sb.toString());
                                }
                            }
                            this.queue.completeTransaction(this.transactionID);
                        }
                        this.log.unlockShared();
                    } catch (IOException e) {
                        throw new ChannelException("Commit failed due to IO error " + this.channelNameDescriptor, e);
                    }
                } finally {
                }
            } else if (size2 > 0) {
                try {
                    if (!this.log.tryLockShared()) {
                        throw new ChannelException("Failed to obtain lock for writing to the log. Try increasing the log write timeout value. " + this.channelNameDescriptor);
                    }
                    try {
                        this.log.commitTake(this.transactionID);
                        this.queue.completeTransaction(this.transactionID);
                        this.channelCounter.addToEventTakeSuccessCount(size2);
                        this.log.unlockShared();
                        this.queueRemaining.release(size2);
                    } catch (IOException e2) {
                        throw new ChannelException("Commit failed due to IO error " + this.channelNameDescriptor, e2);
                    }
                } finally {
                }
            }
            this.putList.clear();
            this.takeList.clear();
            this.channelCounter.setChannelSize(this.queue.getSize());
        }

        protected void doRollback() throws InterruptedException {
            int size = this.putList.size();
            int size2 = this.takeList.size();
            boolean tryLockShared = this.log.tryLockShared();
            try {
                try {
                    if (!tryLockShared) {
                        throw new ChannelException("Failed to obtain lock for writing to the log. Try increasing the log write timeout value. " + this.channelNameDescriptor);
                    }
                    if (size2 > 0) {
                        Preconditions.checkState(size == 0, "nonzero puts and takes " + this.channelNameDescriptor);
                        synchronized (this.queue) {
                            while (!this.takeList.isEmpty()) {
                                Preconditions.checkState(this.queue.addHead(this.takeList.removeLast()), "Queue add failed, this shouldn't be able to happen " + this.channelNameDescriptor);
                            }
                        }
                    }
                    this.putList.clear();
                    this.takeList.clear();
                    this.queue.completeTransaction(this.transactionID);
                    this.channelCounter.setChannelSize(this.queue.getSize());
                    this.log.rollback(this.transactionID);
                    if (tryLockShared) {
                        this.log.unlockShared();
                    }
                    this.queueRemaining.release(size);
                } catch (IOException e) {
                    throw new ChannelException("Commit failed due to IO error " + this.channelNameDescriptor, e);
                }
            } catch (Throwable th) {
                if (tryLockShared) {
                    this.log.unlockShared();
                }
                this.queueRemaining.release(size);
                throw th;
            }
        }
    }

    public synchronized void setName(String str) {
        this.channelNameDescriptor = "[channel=" + str + "]";
        super.setName(str);
    }

    public void configure(Context context) {
        String replace = System.getProperty("user.home").replace('\\', '/');
        String string = context.getString(FileChannelConfiguration.CHECKPOINT_DIR, replace + "/.flume/file-channel/checkpoint");
        String[] split = context.getString(FileChannelConfiguration.DATA_DIRS, replace + "/.flume/file-channel/data").split(",");
        if (this.checkpointDir == null) {
            this.checkpointDir = new File(string);
        } else if (!this.checkpointDir.getAbsolutePath().equals(new File(string).getAbsolutePath())) {
            LOG.warn("An attempt was made to change the checkpoint directory after start, this is not supported.");
        }
        if (this.dataDirs == null) {
            this.dataDirs = new File[split.length];
            for (int i = 0; i < split.length; i++) {
                this.dataDirs[i] = new File(split[i]);
            }
        } else {
            boolean z = false;
            if (this.dataDirs.length != split.length) {
                z = true;
            } else {
                int i2 = 0;
                while (true) {
                    if (i2 >= split.length) {
                        break;
                    }
                    if (!this.dataDirs[i2].getAbsolutePath().equals(new File(split[i2]).getAbsolutePath())) {
                        z = true;
                        break;
                    }
                    i2++;
                }
            }
            if (z) {
                LOG.warn("An attempt was made to change the data directories after start, this is not supported.");
            }
        }
        int intValue = context.getInteger(FileChannelConfiguration.CAPACITY, Integer.valueOf(FileChannelConfiguration.DEFAULT_CAPACITY)).intValue();
        if (this.capacity <= 0 || intValue == this.capacity) {
            this.capacity = intValue;
        } else {
            LOG.warn("Capacity of this channel cannot be sized on the fly due the requirement we have enough DirectMemory for the queue and downsizing of the queue cannot be guranteed due to the fact there maybe more items on the queue than the new capacity.");
        }
        this.keepAlive = context.getInteger(FileChannelConfiguration.KEEP_ALIVE, 3).intValue();
        this.transactionCapacity = context.getInteger(FileChannelConfiguration.TRANSACTION_CAPACITY, Integer.valueOf(FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY)).intValue();
        this.checkpointInterval = context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL, Long.valueOf(FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL)).longValue();
        this.maxFileSize = Math.min(context.getLong(FileChannelConfiguration.MAX_FILE_SIZE, Long.valueOf(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)).longValue(), FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE);
        this.logWriteTimeout = context.getInteger(FileChannelConfiguration.LOG_WRITE_TIMEOUT, 10).intValue();
        if (this.logWriteTimeout < 0) {
            LOG.warn("Log write time out is invalid: " + this.logWriteTimeout + ", using default: 10");
            this.logWriteTimeout = 10;
        }
        this.checkpointWriteTimeout = context.getInteger(FileChannelConfiguration.CHECKPOINT_WRITE_TIMEOUT, Integer.valueOf(FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT)).intValue();
        if (this.checkpointWriteTimeout < 0) {
            LOG.warn("Checkpoint write time out is invalid: " + this.checkpointWriteTimeout + ", using default: " + FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT);
            this.checkpointWriteTimeout = FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT;
        }
        this.useLogReplayV1 = context.getBoolean(FileChannelConfiguration.USE_LOG_REPLAY_V1, false).booleanValue();
        this.useFastReplay = context.getBoolean(FileChannelConfiguration.USE_FAST_REPLAY, false).booleanValue();
        Context context2 = new Context(context.getSubProperties("encryption."));
        String string2 = context2.getString(EncryptionConfiguration.KEY_PROVIDER);
        this.encryptionActiveKey = context2.getString(EncryptionConfiguration.ACTIVE_KEY);
        this.encryptionCipherProvider = context2.getString(EncryptionConfiguration.CIPHER_PROVIDER);
        if (string2 != null) {
            Preconditions.checkState(!Strings.isNullOrEmpty(this.encryptionActiveKey), "Encryption configuration problem: activeKey is missing");
            Preconditions.checkState(!Strings.isNullOrEmpty(this.encryptionCipherProvider), "Encryption configuration problem: cipherProvider is missing");
            this.encryptionKeyProvider = KeyProviderFactory.getInstance(string2, new Context(context2.getSubProperties("keyProvider.")));
        } else {
            Preconditions.checkState(this.encryptionActiveKey == null, "Encryption configuration problem: activeKey is present while key provider name is not.");
            Preconditions.checkState(this.encryptionCipherProvider == null, "Encryption configuration problem: cipherProvider is present while key provider name is not.");
        }
        if (this.queueRemaining == null) {
            this.queueRemaining = new Semaphore(this.capacity, true);
        }
        if (this.log != null) {
            this.log.setCheckpointInterval(this.checkpointInterval);
            this.log.setMaxFileSize(this.maxFileSize);
        }
        if (this.channelCounter == null) {
            this.channelCounter = new ChannelCounter(getName());
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:6:0x0132  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public synchronized void start() {
        /*
            Method dump skipped, instructions count: 342
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flume.channel.file.FileChannel.start():void");
    }

    public synchronized void stop() {
        LOG.info("Stopping {}...", this);
        this.startupError = null;
        int depth = getDepth();
        close();
        if (!this.open) {
            this.channelCounter.setChannelSize(depth);
            this.channelCounter.stop();
        }
        super.stop();
    }

    public String toString() {
        return "FileChannel " + getName() + " { dataDirs: " + Arrays.toString(this.dataDirs) + " }";
    }

    protected BasicTransactionSemantics createTransaction() {
        if (!this.open) {
            String str = "Channel closed " + this.channelNameDescriptor;
            if (this.startupError != null) {
                throw new IllegalStateException(str + ". Due to " + this.startupError.getClass().getName() + ": " + this.startupError.getMessage(), this.startupError);
            }
            throw new IllegalStateException(str);
        }
        FileBackedTransaction fileBackedTransaction = this.transactions.get();
        if (fileBackedTransaction != null && !fileBackedTransaction.isClosed()) {
            Preconditions.checkState(false, "Thread has transaction which is still open: " + fileBackedTransaction.getStateAsString() + this.channelNameDescriptor);
        }
        FileBackedTransaction fileBackedTransaction2 = new FileBackedTransaction(this.log, TransactionIDOracle.next(), this.transactionCapacity, this.keepAlive, this.queueRemaining, getName(), this.channelCounter);
        this.transactions.set(fileBackedTransaction2);
        return fileBackedTransaction2;
    }

    int getDepth() {
        Preconditions.checkState(this.open, "Channel closed" + this.channelNameDescriptor);
        Preconditions.checkNotNull(this.log, "log");
        FlumeEventQueue flumeEventQueue = this.log.getFlumeEventQueue();
        Preconditions.checkNotNull(flumeEventQueue, "queue");
        return flumeEventQueue.getSize();
    }

    void close() {
        if (this.open) {
            this.open = false;
            this.log.close();
            this.log = null;
            this.queueRemaining = null;
        }
    }

    public boolean isOpen() {
        return this.open;
    }
}
