/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.heartbeat;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieHeartbeatException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

@NotThreadSafe
public class HoodieHeartbeatClient
implements AutoCloseable,
Serializable {
    private static final Logger LOG = LogManager.getLogger(HoodieHeartbeatClient.class);
    private final transient FileSystem fs;
    private final String basePath;
    private String heartbeatFolderPath;
    private final Long heartbeatIntervalInMs;
    private Integer numTolerableHeartbeatMisses;
    private final Long maxAllowableHeartbeatIntervalInMs;
    private Map<String, Heartbeat> instantToHeartbeatMap;

    public HoodieHeartbeatClient(FileSystem fs, String basePath, Long heartbeatIntervalInMs, Integer numTolerableHeartbeatMisses) {
        ValidationUtils.checkArgument((heartbeatIntervalInMs >= 1000L ? 1 : 0) != 0, (String)"Cannot set heartbeat lower than 1 second");
        this.fs = fs;
        this.basePath = basePath;
        this.heartbeatFolderPath = HoodieTableMetaClient.getHeartbeatFolderPath((String)basePath);
        this.heartbeatIntervalInMs = heartbeatIntervalInMs;
        this.numTolerableHeartbeatMisses = numTolerableHeartbeatMisses;
        this.maxAllowableHeartbeatIntervalInMs = this.heartbeatIntervalInMs * (long)this.numTolerableHeartbeatMisses.intValue();
        this.instantToHeartbeatMap = new HashMap<String, Heartbeat>();
    }

    public void start(String instantTime) {
        LOG.info((Object)("Received request to start heartbeat for instant time " + instantTime));
        Heartbeat heartbeat = this.instantToHeartbeatMap.get(instantTime);
        ValidationUtils.checkArgument((heartbeat == null || heartbeat.isHeartbeatStopped() == false ? 1 : 0) != 0, (String)("Cannot restart a stopped heartbeat for " + instantTime));
        if (heartbeat == null || !heartbeat.isHeartbeatStarted().booleanValue()) {
            Heartbeat newHeartbeat = new Heartbeat();
            newHeartbeat.setHeartbeatStarted(true);
            this.instantToHeartbeatMap.put(instantTime, newHeartbeat);
            this.updateHeartbeat(instantTime);
            newHeartbeat.getTimer().scheduleAtFixedRate((TimerTask)new HeartbeatTask(instantTime), this.heartbeatIntervalInMs, (long)this.heartbeatIntervalInMs);
        }
    }

    public void stop(String instantTime) throws HoodieException {
        Heartbeat heartbeat = this.instantToHeartbeatMap.get(instantTime);
        if (heartbeat != null && heartbeat.isHeartbeatStarted().booleanValue() && !heartbeat.isHeartbeatStopped().booleanValue()) {
            LOG.info((Object)("Stopping heartbeat for instant " + instantTime));
            heartbeat.getTimer().cancel();
            heartbeat.setHeartbeatStopped(true);
            LOG.info((Object)("Stopped heartbeat for instant " + instantTime));
            HeartbeatUtils.deleteHeartbeatFile(this.fs, this.basePath, instantTime);
            LOG.info((Object)("Deleted heartbeat file for instant " + instantTime));
        }
    }

    public void stop() throws HoodieException {
        this.instantToHeartbeatMap.values().forEach(heartbeat -> this.stop(heartbeat.getInstantTime()));
    }

    public static Long getLastHeartbeatTime(FileSystem fs, String basePath, String instantTime) throws IOException {
        Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath((String)basePath) + File.separator + instantTime);
        if (fs.exists(heartbeatFilePath)) {
            return fs.getFileStatus(heartbeatFilePath).getModificationTime();
        }
        return 0L;
    }

    public static Boolean heartbeatExists(FileSystem fs, String basePath, String instantTime) throws IOException {
        Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath((String)basePath) + File.separator + instantTime);
        if (fs.exists(heartbeatFilePath)) {
            return true;
        }
        return false;
    }

    public boolean isHeartbeatExpired(String instantTime) throws IOException {
        Long currentTime = System.currentTimeMillis();
        Heartbeat lastHeartbeatForWriter = this.instantToHeartbeatMap.get(instantTime);
        if (lastHeartbeatForWriter == null) {
            LOG.info((Object)"Heartbeat not found in internal map, falling back to reading from DFS");
            long lastHeartbeatForWriterTime = HoodieHeartbeatClient.getLastHeartbeatTime(this.fs, this.basePath, instantTime);
            lastHeartbeatForWriter = new Heartbeat();
            lastHeartbeatForWriter.setLastHeartbeatTime(lastHeartbeatForWriterTime);
            lastHeartbeatForWriter.setInstantTime(instantTime);
        }
        if (currentTime - lastHeartbeatForWriter.getLastHeartbeatTime() > this.maxAllowableHeartbeatIntervalInMs) {
            LOG.warn((Object)("Heartbeat expired, currentTime = " + currentTime + ", last heartbeat = " + lastHeartbeatForWriter + ", heartbeat interval = " + this.heartbeatIntervalInMs));
            return true;
        }
        return false;
    }

    public List<String> getAllExistingHeartbeatInstants() throws IOException {
        Path heartbeatFolder = new Path(this.heartbeatFolderPath);
        if (this.fs.exists(heartbeatFolder)) {
            FileStatus[] fileStatus = this.fs.listStatus(new Path(this.heartbeatFolderPath));
            return Arrays.stream(fileStatus).map(fs -> fs.getPath().getName()).collect(Collectors.toList());
        }
        return Collections.EMPTY_LIST;
    }

    private void updateHeartbeat(String instantTime) throws HoodieHeartbeatException {
        try {
            Long newHeartbeatTime = System.currentTimeMillis();
            FSDataOutputStream outputStream = this.fs.create(new Path(this.heartbeatFolderPath + File.separator + instantTime), true);
            outputStream.close();
            Heartbeat heartbeat = this.instantToHeartbeatMap.get(instantTime);
            if (heartbeat.getLastHeartbeatTime() != null && this.isHeartbeatExpired(instantTime)) {
                LOG.error((Object)("Aborting, missed generating heartbeat within allowable interval " + this.maxAllowableHeartbeatIntervalInMs));
                Thread.currentThread().interrupt();
            }
            heartbeat.setInstantTime(instantTime);
            heartbeat.setLastHeartbeatTime(newHeartbeatTime);
            heartbeat.setNumHeartbeats(heartbeat.getNumHeartbeats() + 1);
        }
        catch (IOException io) {
            throw new HoodieHeartbeatException("Unable to generate heartbeat ", (Throwable)io);
        }
    }

    public String getHeartbeatFolderPath() {
        return this.heartbeatFolderPath;
    }

    public Heartbeat getHeartbeat(String instantTime) {
        return this.instantToHeartbeatMap.get(instantTime);
    }

    @Override
    public void close() {
        this.stop();
        this.instantToHeartbeatMap.clear();
    }

    class HeartbeatTask
    extends TimerTask {
        private final String instantTime;

        HeartbeatTask(String instantTime) {
            this.instantTime = instantTime;
        }

        @Override
        public void run() {
            HoodieHeartbeatClient.this.updateHeartbeat(this.instantTime);
        }
    }

    class Heartbeat {
        private String instantTime;
        private Boolean isHeartbeatStarted = false;
        private Boolean isHeartbeatStopped = false;
        private Long lastHeartbeatTime;
        private Integer numHeartbeats = 0;
        private Timer timer = new Timer();

        Heartbeat() {
        }

        public String getInstantTime() {
            return this.instantTime;
        }

        public void setInstantTime(String instantTime) {
            this.instantTime = instantTime;
        }

        public Boolean isHeartbeatStarted() {
            return this.isHeartbeatStarted;
        }

        public void setHeartbeatStarted(Boolean heartbeatStarted) {
            this.isHeartbeatStarted = heartbeatStarted;
        }

        public Boolean isHeartbeatStopped() {
            return this.isHeartbeatStopped;
        }

        public void setHeartbeatStopped(Boolean heartbeatStopped) {
            this.isHeartbeatStopped = heartbeatStopped;
        }

        public Long getLastHeartbeatTime() {
            return this.lastHeartbeatTime;
        }

        public void setLastHeartbeatTime(Long lastHeartbeatTime) {
            this.lastHeartbeatTime = lastHeartbeatTime;
        }

        public Integer getNumHeartbeats() {
            return this.numHeartbeats;
        }

        public void setNumHeartbeats(Integer numHeartbeats) {
            this.numHeartbeats = numHeartbeats;
        }

        public Timer getTimer() {
            return this.timer;
        }

        public void setTimer(Timer timer) {
            this.timer = timer;
        }

        public String toString() {
            return "Heartbeat{instantTime='" + this.instantTime + '\'' + ", isHeartbeatStarted=" + this.isHeartbeatStarted + ", isHeartbeatStopped=" + this.isHeartbeatStopped + ", lastHeartbeatTime=" + this.lastHeartbeatTime + ", numHeartbeats=" + this.numHeartbeats + ", timer=" + this.timer + '}';
        }
    }
}

