package io.confluent.connect.hdfs.wal;

import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.storage.Storage;
import io.confluent.connect.hdfs.wal.WALFile;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/hdfs/wal/FSWAL.class */
public class FSWAL implements WAL {
    private static final Logger log = LoggerFactory.getLogger(FSWAL.class);
    private static final String leaseException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException";
    private WALFile.Writer writer = null;
    private WALFile.Reader reader = null;
    private String logFile;
    private Configuration conf;
    private Storage storage;

    public FSWAL(String str, TopicPartition topicPartition, Storage storage) throws ConnectException {
        this.logFile = null;
        this.conf = null;
        this.storage = null;
        this.storage = storage;
        this.conf = storage.conf();
        this.logFile = FileUtils.logFileName(storage.url(), str, topicPartition);
    }

    @Override // io.confluent.connect.hdfs.wal.WAL
    public void append(String str, String str2) throws ConnectException {
        try {
            acquireLease();
            this.writer.append(new WALEntry(str), new WALEntry(str2));
            this.writer.hsync();
        } catch (IOException e) {
            throw new ConnectException(e);
        }
    }

    @Override // io.confluent.connect.hdfs.wal.WAL
    public void acquireLease() throws ConnectException {
        long j = 1000;
        while (j < 16000) {
            try {
                if (this.writer == null) {
                    this.writer = WALFile.createWriter(this.conf, WALFile.Writer.file(new Path(this.logFile)), WALFile.Writer.appendIfExists(true));
                    log.info("Successfully acquired lease for {}", this.logFile);
                }
                break;
            } catch (RemoteException e) {
                if (!e.getClassName().equals(leaseException)) {
                    throw new ConnectException(e);
                }
                log.info("Cannot acquire lease on WAL {}", this.logFile);
                try {
                    Thread.sleep(j);
                    j *= 2;
                } catch (InterruptedException e2) {
                    throw new ConnectException(e2);
                }
            } catch (IOException e3) {
                throw new ConnectException("Error creating writer for log file " + this.logFile, e3);
            }
        }
        if (j >= 16000) {
            throw new ConnectException("Cannot acquire lease after timeout, will retry.");
        }
    }

    @Override // io.confluent.connect.hdfs.wal.WAL
    public void apply() throws ConnectException {
        try {
            if (this.storage.exists(this.logFile)) {
                acquireLease();
                if (this.reader == null) {
                    this.reader = new WALFile.Reader(this.conf, WALFile.Reader.file(new Path(this.logFile)));
                }
                HashMap hashMap = new HashMap();
                WALEntry wALEntry = new WALEntry();
                WALEntry wALEntry2 = new WALEntry();
                while (this.reader.next(wALEntry, wALEntry2)) {
                    String name = wALEntry.getName();
                    if (name.equals(WAL.beginMarker)) {
                        hashMap.clear();
                    } else if (name.equals(WAL.endMarker)) {
                        for (Map.Entry entry : hashMap.entrySet()) {
                            String name2 = ((WALEntry) entry.getKey()).getName();
                            String name3 = ((WALEntry) entry.getValue()).getName();
                            if (!this.storage.exists(name3)) {
                                this.storage.commit(name2, name3);
                            }
                        }
                    } else {
                        hashMap.put(new WALEntry(wALEntry.getName()), new WALEntry(wALEntry2.getName()));
                    }
                }
            }
        } catch (IOException e) {
            throw new ConnectException(e);
        }
    }

    @Override // io.confluent.connect.hdfs.wal.WAL
    public void truncate() throws ConnectException {
        try {
            String str = this.logFile + ".1";
            this.storage.delete(str);
            this.storage.commit(this.logFile, str);
            close();
        } catch (IOException e) {
            throw new ConnectException(e);
        }
    }

    @Override // io.confluent.connect.hdfs.wal.WAL
    public void close() throws ConnectException {
        try {
            if (this.writer != null) {
                this.writer.close();
                this.writer = null;
            }
            if (this.reader != null) {
                this.reader.close();
                this.reader = null;
            }
        } catch (IOException e) {
            throw new ConnectException("Error closing " + this.logFile, e);
        }
    }

    @Override // io.confluent.connect.hdfs.wal.WAL
    public String getLogFile() {
        return this.logFile;
    }
}
