package org.apache.flink.table.store.file.operation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/store/file/operation/TestCommitThread.class */
public class TestCommitThread extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(TestCommitThread.class);
    private final RowType keyType;
    private final RowType valueType;
    private final boolean enableOverwrite;
    private final Map<BinaryRowData, List<KeyValue>> data;
    private final Map<BinaryRowData, List<KeyValue>> result = new HashMap();
    private final Map<BinaryRowData, MergeTreeWriter> writers = new HashMap();
    private final Set<BinaryRowData> writtenPartitions = new HashSet();
    private final AbstractFileStoreWrite<KeyValue> write;
    private final FileStoreCommit commit;
    private long commitIdentifier;

    public TestCommitThread(RowType rowType, RowType rowType2, boolean z, Map<BinaryRowData, List<KeyValue>> map, TestFileStore testFileStore, TestFileStore testFileStore2) {
        this.keyType = rowType;
        this.valueType = rowType2;
        this.enableOverwrite = z;
        this.data = map;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Test commit thread is given these data:\n" + ((String) map.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).map(keyValue -> {
                return keyValue.toString(rowType, rowType2);
            }).collect(Collectors.joining("\n"))));
        }
        String uuid = UUID.randomUUID().toString();
        this.write = testFileStore2.newWrite(uuid);
        this.commit = testFileStore.newCommit(uuid).withCreateEmptyCommit(true);
        this.commitIdentifier = 0L;
    }

    public List<KeyValue> getResult() {
        return (List) this.result.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!this.data.isEmpty()) {
            try {
                if (this.enableOverwrite && ThreadLocalRandom.current().nextInt(5) == 0) {
                    doOverwrite();
                } else {
                    doCommit();
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        doFinalCompact();
        Iterator<MergeTreeWriter> it = this.writers.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    private void doCommit() throws Exception {
        int nextInt = ThreadLocalRandom.current().nextInt(3) + 1;
        for (int i = 0; i < nextInt && !this.data.isEmpty(); i++) {
            writeData();
        }
        long j = this.commitIdentifier;
        this.commitIdentifier = j + 1;
        ManifestCommittable manifestCommittable = new ManifestCommittable(j);
        for (Map.Entry<BinaryRowData, MergeTreeWriter> entry : this.writers.entrySet()) {
            RecordWriter.CommitIncrement prepareCommit = entry.getValue().prepareCommit(true);
            manifestCommittable.addFileCommittable(new FileCommittable(entry.getKey(), 0, prepareCommit.newFilesIncrement(), prepareCommit.compactIncrement()));
        }
        runWithRetry(manifestCommittable, () -> {
            this.commit.commit(manifestCommittable, Collections.emptyMap());
        });
    }

    private void doOverwrite() throws Exception {
        BinaryRowData overwriteData = overwriteData();
        long j = this.commitIdentifier;
        this.commitIdentifier = j + 1;
        ManifestCommittable manifestCommittable = new ManifestCommittable(j);
        RecordWriter.CommitIncrement prepareCommit = this.writers.get(overwriteData).prepareCommit(true);
        manifestCommittable.addFileCommittable(new FileCommittable(overwriteData, 0, prepareCommit.newFilesIncrement(), prepareCommit.compactIncrement()));
        runWithRetry(manifestCommittable, () -> {
            this.commit.overwrite(TestKeyValueGenerator.toPartitionMap(overwriteData, TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), manifestCommittable, Collections.emptyMap());
        });
    }

    /*  JADX ERROR: Failed to decode insn: 0x0005: MOVE_MULTI, method: org.apache.flink.table.store.file.operation.TestCommitThread.doFinalCompact():void
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    private void doFinalCompact() {
        /*
            r8 = this;
            r0 = r8
            r1 = r0
            long r1 = r1.commitIdentifier
            // decode failed: arraycopy: source index -1 out of bounds for object array[8]
            r2 = 1
            long r1 = r1 + r2
            r0.commitIdentifier = r1
            r9 = r-1
            org.apache.flink.table.store.file.manifest.ManifestCommittable r-1 = new org.apache.flink.table.store.file.manifest.ManifestCommittable
            r0 = r-1
            r1 = r9
            r0.<init>(r1)
            r11 = r-1
            r-1 = r8
            java.util.Set<org.apache.flink.table.data.binary.BinaryRowData> r-1 = r-1.writtenPartitions
            r-1.iterator()
            r12 = r-1
            r-1 = r12
            r-1.hasNext()
            if (r-1 == 0) goto L79
            r-1 = r12
            r-1.next()
            org.apache.flink.table.data.binary.BinaryRowData r-1 = (org.apache.flink.table.data.binary.BinaryRowData) r-1
            r13 = r-1
            r-1 = r8
            java.util.Map<org.apache.flink.table.data.binary.BinaryRowData, org.apache.flink.table.store.file.mergetree.MergeTreeWriter> r-1 = r-1.writers
            r0 = r13
            r1 = r8
            void r1 = (v1) -> { // java.util.function.Function.apply(java.lang.Object):java.lang.Object
                return r1.lambda$doFinalCompact$3(v1);
            }
            r-1.computeIfAbsent(r0, r1)
            org.apache.flink.table.store.file.mergetree.MergeTreeWriter r-1 = (org.apache.flink.table.store.file.mergetree.MergeTreeWriter) r-1
            r14 = r-1
            r-1 = r14
            r0 = 1
            r-1.compact(r0)
            r-1 = r14
            r0 = 1
            r-1.prepareCommit(r0)
            r15 = r-1
            r-1 = r11
            org.apache.flink.table.store.table.sink.FileCommittable r0 = new org.apache.flink.table.store.table.sink.FileCommittable
            r1 = r0
            r2 = r13
            r3 = 0
            r4 = r15
            org.apache.flink.table.store.file.io.NewFilesIncrement r4 = r4.newFilesIncrement()
            r5 = r15
            org.apache.flink.table.store.file.io.CompactIncrement r5 = r5.compactIncrement()
            r1.<init>(r2, r3, r4, r5)
            r-1.addFileCommittable(r0)
            goto L20
            r-1 = r8
            org.apache.flink.table.store.file.operation.FileStoreCommit r-1 = r-1.commit
            r0 = r11
            java.util.Map r1 = java.util.Collections.emptyMap()
            r-1.commit(r0, r1)
            goto Lac
            r11 = move-exception
            org.slf4j.Logger r0 = org.apache.flink.table.store.file.operation.TestCommitThread.LOG
            boolean r0 = r0.isDebugEnabled()
            if (r0 == 0) goto La0
            org.slf4j.Logger r0 = org.apache.flink.table.store.file.operation.TestCommitThread.LOG
            java.lang.String r1 = "Failed to do final compact because of exception, try again"
            r2 = r11
            r0.debug(r1, r2)
            r0 = r8
            java.util.Map<org.apache.flink.table.data.binary.BinaryRowData, org.apache.flink.table.store.file.mergetree.MergeTreeWriter> r0 = r0.writers
            r0.clear()
            goto Lc
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.table.store.file.operation.TestCommitThread.doFinalCompact():void");
    }

    private void runWithRetry(ManifestCommittable manifestCommittable, Runnable runnable) {
        boolean z = false;
        while (true) {
            if (z) {
                try {
                    if (this.commit.filterCommitted(Collections.singletonList(manifestCommittable)).isEmpty()) {
                        return;
                    }
                } catch (Throwable th) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Failed to commit because of exception, try again", th);
                    }
                    this.writers.clear();
                    z = true;
                }
            }
            runnable.run();
            return;
        }
    }

    private void writeData() throws Exception {
        ArrayList arrayList = new ArrayList();
        BinaryRowData pickData = pickData(arrayList);
        this.result.computeIfAbsent(pickData, binaryRowData -> {
            return new ArrayList();
        }).addAll(arrayList);
        this.writtenPartitions.add(pickData);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Test commit thread will write data\n" + ((String) arrayList.stream().map(keyValue -> {
                return keyValue.toString(this.keyType, this.valueType);
            }).collect(Collectors.joining("\n"))));
        }
        MergeTreeWriter computeIfAbsent = this.writers.computeIfAbsent(pickData, binaryRowData2 -> {
            return createWriter(binaryRowData2, false);
        });
        Iterator<KeyValue> it = arrayList.iterator();
        while (it.hasNext()) {
            computeIfAbsent.write(it.next());
        }
    }

    private BinaryRowData overwriteData() throws Exception {
        ArrayList arrayList = new ArrayList();
        BinaryRowData pickData = pickData(arrayList);
        List<KeyValue> computeIfAbsent = this.result.computeIfAbsent(pickData, binaryRowData -> {
            return new ArrayList();
        });
        computeIfAbsent.clear();
        computeIfAbsent.addAll(arrayList);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Test commit thread will overwrite data\n" + ((String) arrayList.stream().map(keyValue -> {
                return keyValue.toString(this.keyType, this.valueType);
            }).collect(Collectors.joining("\n"))));
        }
        if (this.writers.containsKey(pickData)) {
            this.writers.get(pickData).close();
        }
        MergeTreeWriter createWriter = createWriter(pickData, true);
        this.writers.put(pickData, createWriter);
        Iterator<KeyValue> it = arrayList.iterator();
        while (it.hasNext()) {
            createWriter.write(it.next());
        }
        return pickData;
    }

    private BinaryRowData pickData(List<KeyValue> list) {
        ArrayList arrayList = new ArrayList(this.data.keySet());
        BinaryRowData binaryRowData = (BinaryRowData) arrayList.get(ThreadLocalRandom.current().nextInt(arrayList.size()));
        List<KeyValue> list2 = this.data.get(binaryRowData);
        int nextInt = ThreadLocalRandom.current().nextInt(Math.min(100, list2.size() + 1));
        list.addAll(list2.subList(0, nextInt));
        if (nextInt == list2.size()) {
            this.data.remove(binaryRowData);
        } else {
            list2.subList(0, nextInt).clear();
        }
        return binaryRowData;
    }

    private MergeTreeWriter createWriter(BinaryRowData binaryRowData, boolean z) {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName(Thread.currentThread().getName() + "-writer-service-pool");
            return thread;
        });
        MergeTreeWriter mergeTreeWriter = z ? (MergeTreeWriter) this.write.createEmptyWriterContainer(binaryRowData, 0, newSingleThreadExecutor).writer : this.write.createWriterContainer(binaryRowData, 0, newSingleThreadExecutor).writer;
        mergeTreeWriter.setMemoryPool(new HeapMemorySegmentPool(TestFileStore.WRITE_BUFFER_SIZE.getBytes(), (int) TestFileStore.PAGE_SIZE.getBytes()));
        return mergeTreeWriter;
    }
}
