package org.apache.flink.table.store.connector.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.connector.sink.Committable;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.log.LogWriteCallback;
import org.apache.flink.table.store.sink.SinkRecord;
import org.apache.flink.table.store.sink.SinkRecordConverter;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/StoreSinkWriter.class */
public class StoreSinkWriter<WriterStateT> implements StatefulSink.StatefulSinkWriter<RowData, WriterStateT>, TwoPhaseCommittingSink.PrecommittingSinkWriter<RowData, Committable> {
    private final FileStoreWrite fileStoreWrite;
    private final SinkRecordConverter recordConverter;
    private final boolean overwrite;

    @Nullable
    private final SinkWriter<SinkRecord> logWriter;

    @Nullable
    private final LogWriteCallback logCallback;
    private final ExecutorService compactExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("compaction-thread"));
    private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.table.store.connector.sink.StoreSinkWriter$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/table/store/connector/sink/StoreSinkWriter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$types$RowKind = new int[RowKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_AFTER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_BEFORE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public StoreSinkWriter(FileStoreWrite fileStoreWrite, SinkRecordConverter sinkRecordConverter, boolean z, @Nullable SinkWriter<SinkRecord> sinkWriter, @Nullable LogWriteCallback logWriteCallback) {
        this.fileStoreWrite = fileStoreWrite;
        this.recordConverter = sinkRecordConverter;
        this.overwrite = z;
        this.logWriter = sinkWriter;
        this.logCallback = logWriteCallback;
    }

    private RecordWriter getWriter(BinaryRowData binaryRowData, int i) {
        Map<Integer, RecordWriter> map = this.writers.get(binaryRowData);
        if (map == null) {
            map = new HashMap();
            this.writers.put(binaryRowData.copy(), map);
        }
        return map.computeIfAbsent(Integer.valueOf(i), num -> {
            return this.overwrite ? this.fileStoreWrite.createEmptyWriter(binaryRowData, i, this.compactExecutor) : this.fileStoreWrite.createWriter(binaryRowData, i, this.compactExecutor);
        });
    }

    public void write(RowData rowData, SinkWriter.Context context) throws IOException, InterruptedException {
        SinkRecord convert = this.recordConverter.convert(rowData);
        try {
            writeToFileStore(getWriter(convert.partition(), convert.bucket()), convert);
            if (this.logWriter != null) {
                this.logWriter.write(this.recordConverter.convertToLogSinkRecord(convert), context);
            }
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    private void writeToFileStore(RecordWriter recordWriter, SinkRecord sinkRecord) throws Exception {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$types$RowKind[sinkRecord.row().getRowKind().ordinal()]) {
            case 1:
            case 2:
                if (sinkRecord.primaryKey().getArity() == 0) {
                    recordWriter.write(ValueKind.ADD, sinkRecord.row(), GenericRowData.of(new Object[]{1L}));
                    return;
                } else {
                    recordWriter.write(ValueKind.ADD, sinkRecord.primaryKey(), sinkRecord.row());
                    return;
                }
            case 3:
            case 4:
                if (sinkRecord.primaryKey().getArity() == 0) {
                    recordWriter.write(ValueKind.ADD, sinkRecord.row(), GenericRowData.of(new Object[]{-1L}));
                    return;
                } else {
                    recordWriter.write(ValueKind.DELETE, sinkRecord.primaryKey(), sinkRecord.row());
                    return;
                }
            default:
                return;
        }
    }

    public void flush(boolean z) throws IOException, InterruptedException {
        if (this.logWriter != null) {
            this.logWriter.flush(z);
        }
    }

    public List<WriterStateT> snapshotState(long j) throws IOException {
        return (this.logWriter == null || !(this.logWriter instanceof StatefulSink.StatefulSinkWriter)) ? Collections.emptyList() : this.logWriter.snapshotState(j);
    }

    /* renamed from: prepareCommit, reason: merged with bridge method [inline-methods] */
    public List<Committable> m13prepareCommit() throws IOException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>> it = this.writers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> next = it.next();
            BinaryRowData key = next.getKey();
            Iterator<Map.Entry<Integer, RecordWriter>> it2 = next.getValue().entrySet().iterator();
            while (it2.hasNext()) {
                Map.Entry<Integer, RecordWriter> next2 = it2.next();
                int intValue = next2.getKey().intValue();
                RecordWriter value = next2.getValue();
                try {
                    FileCommittable fileCommittable = new FileCommittable(key, intValue, value.prepareCommit());
                    arrayList.add(new Committable(Committable.Kind.FILE, fileCommittable));
                    if (fileCommittable.increment().newFiles().isEmpty()) {
                        closeWriter(value);
                        it2.remove();
                    }
                } catch (Exception e) {
                    throw new IOException(e);
                }
            }
            if (next.getValue().isEmpty()) {
                it.remove();
            }
        }
        if (this.logWriter != null) {
            if (this.logWriter instanceof TwoPhaseCommittingSink.PrecommittingSinkWriter) {
                Iterator it3 = this.logWriter.prepareCommit().iterator();
                while (it3.hasNext()) {
                    arrayList.add(new Committable(Committable.Kind.LOG, it3.next()));
                }
            }
            Objects.requireNonNull(this.logCallback, "logCallback should not be null.");
            this.logCallback.offsets().forEach((num, l) -> {
                arrayList.add(new Committable(Committable.Kind.LOG_OFFSET, new LogOffsetCommittable(num.intValue(), l.longValue())));
            });
        }
        return arrayList;
    }

    private void closeWriter(RecordWriter recordWriter) throws IOException {
        try {
            recordWriter.sync();
            recordWriter.close();
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    public void close() throws Exception {
        this.compactExecutor.shutdownNow();
        Iterator<Map<Integer, RecordWriter>> it = this.writers.values().iterator();
        while (it.hasNext()) {
            Iterator<RecordWriter> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                closeWriter(it2.next());
            }
        }
        this.writers.clear();
        if (this.logWriter != null) {
            this.logWriter.close();
        }
    }

    @VisibleForTesting
    Map<BinaryRowData, Map<Integer, RecordWriter>> writers() {
        return this.writers;
    }
}
