package io.confluent.connect.hdfs.wal;

import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.hdfs.wal.WALFile;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/hdfs/wal/FSWAL.class */
public class FSWAL implements WAL {
    private static final Logger log = LoggerFactory.getLogger(FSWAL.class);
    private WALFile.Writer writer = null;
    private WALFile.Reader reader = null;
    private String logFile;
    private HdfsSinkConnectorConfig conf;
    private HdfsStorage storage;

    public FSWAL(String str, TopicPartition topicPartition, HdfsStorage hdfsStorage) throws ConnectException {
        this.logFile = null;
        this.conf = null;
        this.storage = null;
        this.storage = hdfsStorage;
        this.conf = hdfsStorage.m16conf();
        this.logFile = FileUtils.logFileName(hdfsStorage.url(), str, topicPartition);
    }

    public void append(String str, String str2) throws ConnectException {
        try {
            acquireLease();
            this.writer.append(new WALEntry(str), new WALEntry(str2));
            this.writer.hsync();
        } catch (IOException e) {
            log.error("Error appending WAL file: {}, {}", this.logFile, e);
            close();
            throw new DataException(e);
        }
    }

    public void acquireLease() throws ConnectException {
        long j;
        long j2 = 1000;
        while (true) {
            j = j2;
            if (j >= 16000) {
                break;
            }
            try {
                if (this.writer != null) {
                    break;
                }
                this.writer = WALFile.createWriter(this.conf, WALFile.Writer.file(new Path(this.logFile)), WALFile.Writer.appendIfExists(true));
                log.info("Successfully acquired lease, {}-{}, file {}", new Object[]{this.conf.name(), Integer.valueOf(this.conf.getTaskId()), this.logFile});
                break;
            } catch (IOException e) {
                throw new DataException(String.format("Error creating writer for log file, %s-%s, file %s", this.conf.name(), Integer.valueOf(this.conf.getTaskId()), this.logFile), e);
            } catch (RemoteException e2) {
                if (!e2.getClassName().equals("org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException")) {
                    throw new ConnectException(e2);
                }
                log.warn("Cannot acquire lease on WAL, {}-{}, file {}", new Object[]{this.conf.name(), Integer.valueOf(this.conf.getTaskId()), this.logFile});
                try {
                    Thread.sleep(j);
                    j2 = j * 2;
                } catch (InterruptedException e3) {
                    throw new ConnectException(e3);
                }
            }
        }
        if (j >= 16000) {
            throw new ConnectException("Cannot acquire lease after timeout, will retry.");
        }
    }

    public void apply() throws ConnectException {
        try {
            if (!this.storage.exists(this.logFile)) {
                log.debug("Storage does not exist");
                return;
            }
            acquireLease();
            log.debug("Lease acquired");
            if (this.reader == null) {
                this.reader = new WALFile.Reader(this.conf.getHadoopConfiguration(), WALFile.Reader.file(new Path(this.logFile)));
            }
            HashMap hashMap = new HashMap();
            WALEntry wALEntry = new WALEntry();
            WALEntry wALEntry2 = new WALEntry();
            while (this.reader.next(wALEntry, wALEntry2)) {
                String name = wALEntry.getName();
                if (name.equals("BEGIN")) {
                    hashMap.clear();
                } else if (name.equals("END")) {
                    for (Map.Entry entry : hashMap.entrySet()) {
                        String name2 = ((WALEntry) entry.getKey()).getName();
                        String name3 = ((WALEntry) entry.getValue()).getName();
                        if (!this.storage.exists(name3)) {
                            this.storage.commit(name2, name3);
                        }
                    }
                } else {
                    hashMap.put(new WALEntry(wALEntry.getName()), new WALEntry(wALEntry2.getName()));
                }
            }
            log.debug("Finished applying WAL");
        } catch (IOException e) {
            log.error("Error applying WAL file: {}, {}", this.logFile, e);
            close();
            throw new DataException(e);
        }
    }

    public void truncate() throws ConnectException {
        try {
            String str = this.logFile + ".1";
            this.storage.delete(str);
            this.storage.commit(this.logFile, str);
        } finally {
            close();
        }
    }

    public void close() throws ConnectException {
        log.info("Closing WAL, {}-{}, file: {}", new Object[]{this.conf.name(), Integer.valueOf(this.conf.getTaskId()), this.logFile});
        try {
            try {
                if (this.writer != null) {
                    this.writer.close();
                }
                if (this.reader != null) {
                    this.reader.close();
                }
            } catch (IOException e) {
                throw new DataException("Error closing " + this.logFile, e);
            }
        } finally {
            this.writer = null;
            this.reader = null;
        }
    }

    public String getLogFile() {
        return this.logFile;
    }
}
