package org.apache.seatunnel.translation.spark.common.source.micro;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.seatunnel.translation.spark.common.ReaderState;
import org.apache.seatunnel.translation.spark.common.source.batch.ParallelBatchPartitionReader;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;

/* loaded from: input_file:org/apache/seatunnel/translation/spark/common/source/micro/ParallelMicroBatchPartitionReader.class */
public class ParallelMicroBatchPartitionReader extends ParallelBatchPartitionReader {
    protected static final Integer CHECKPOINT_SLEEP_INTERVAL = 10;
    protected volatile Integer checkpointId;
    protected final Integer checkpointInterval;
    protected final String checkpointPath;
    protected final String hdfsRoot;
    protected final String hdfsUser;
    protected Map<Integer, List<byte[]>> restoredState;
    protected ScheduledThreadPoolExecutor executor;
    protected FileSystem fileSystem;

    public ParallelMicroBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource, Integer num, Integer num2, Integer num3, Integer num4, String str, String str2, String str3) {
        super(seaTunnelSource, num, num2);
        this.checkpointId = num3;
        this.checkpointInterval = num4;
        this.checkpointPath = str;
        this.hdfsRoot = str2;
        this.hdfsUser = str3;
    }

    @Override // org.apache.seatunnel.translation.spark.common.source.batch.ParallelBatchPartitionReader
    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
        return new ParallelBatchPartitionReader.InternalParallelSource(this.source, this.restoredState, this.parallelism.intValue(), this.subtaskId.intValue());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.seatunnel.translation.spark.common.source.batch.ParallelBatchPartitionReader
    public void prepare() {
        try {
            this.fileSystem = getFileSystem();
            this.restoredState = restoreState(this.checkpointId.intValue() - 1);
            super.prepare();
            prepareCheckpoint();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected FileSystem getFileSystem() throws URISyntaxException, IOException, InterruptedException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", this.hdfsRoot);
        return StringUtils.isNotBlank(this.hdfsUser) ? FileSystem.get(new URI(this.hdfsRoot), configuration, this.hdfsUser) : FileSystem.get(new URI(this.hdfsRoot), configuration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ReaderState snapshotState() {
        try {
            Map snapshotState = this.internalSource.snapshotState(this.checkpointId.intValue());
            Integer num = this.subtaskId;
            Integer num2 = this.checkpointId;
            this.checkpointId = Integer.valueOf(this.checkpointId.intValue() + 1);
            return new ReaderState(snapshotState, num, num2);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void prepareCheckpoint() {
        this.executor = ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1, String.format("parallel-reader-checkpoint-executor-%s", this.subtaskId));
        this.executor.schedule(this::virtualCheckpoint, this.checkpointInterval.intValue(), TimeUnit.MILLISECONDS);
    }

    public void virtualCheckpoint() {
        try {
            synchronized (this.checkpointLock) {
                while (!this.handover.isEmpty()) {
                    Thread.sleep(CHECKPOINT_SLEEP_INTERVAL.intValue());
                }
                synchronized (this.handover) {
                    int intValue = this.checkpointId.intValue();
                    saveState(snapshotState(), intValue);
                    this.internalSource.notifyCheckpointComplete(intValue);
                    this.running = false;
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("An error occurred in virtual checkpoint execution.", e);
        }
    }

    private Map<Integer, List<byte[]>> restoreState(int i) throws IOException {
        Path checkpointPathWithId = getCheckpointPathWithId(i);
        if (!this.fileSystem.exists(checkpointPathWithId)) {
            return null;
        }
        try {
            FSDataInputStream open = this.fileSystem.open(checkpointPathWithId);
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                try {
                    byte[] bArr = new byte[1024];
                    while (true) {
                        int read = open.read(bArr);
                        if (read == -1) {
                            break;
                        }
                        byteArrayOutputStream.write(bArr, 0, read);
                    }
                    Map<Integer, List<byte[]>> bytes = ((ReaderState) SerializationUtils.deserialize(byteArrayOutputStream.toByteArray())).getBytes();
                    byteArrayOutputStream.close();
                    if (open != null) {
                        open.close();
                    }
                    return bytes;
                } catch (Throwable th) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public void saveState(ReaderState readerState, int i) throws IOException {
        byte[] serialize = SerializationUtils.serialize(readerState);
        Path checkpointPathWithId = getCheckpointPathWithId(i);
        if (!this.fileSystem.exists(checkpointPathWithId)) {
            this.fileSystem.createNewFile(checkpointPathWithId);
        }
        try {
            FSDataOutputStream append = this.fileSystem.append(checkpointPathWithId);
            try {
                append.write(serialize);
                if (append != null) {
                    append.close();
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Path getCheckpointPathWithId(int i) {
        return new Path(this.checkpointPath + File.separator + this.subtaskId + File.separator + i);
    }

    @Override // org.apache.seatunnel.translation.spark.common.source.batch.ParallelBatchPartitionReader
    public void close() throws IOException {
        this.fileSystem.close();
        this.executor.shutdown();
        super.close();
    }
}
