package org.apache.flink.table.store.connector.sink;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.catalog.CatalogLock;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.connector.sink.global.GlobalCommitter;
import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink;
import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.log.LogInitContext;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogWriteCallback;
import org.apache.flink.table.store.sink.SinkRecordConverter;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/StoreSink.class */
public class StoreSink<WriterStateT, LogCommT> implements StatefulSink<RowData, WriterStateT>, GlobalCommittingSink<RowData, Committable, ManifestCommittable> {
    private static final long serialVersionUID = 1;
    private final ObjectIdentifier tableIdentifier;
    private final FileStore fileStore;
    private final int[] partitions;
    private final int[] primaryKeys;
    private final int[] logPrimaryKeys;
    private final int numBucket;

    @Nullable
    private final CatalogLock.Factory lockFactory;

    @Nullable
    private final Map<String, String> overwritePartition;

    @Nullable
    private final LogSinkProvider logSinkProvider;

    /* loaded from: input_file:org/apache/flink/table/store/connector/sink/StoreSink$NoOutputSerializer.class */
    private static class NoOutputSerializer<T> implements SimpleVersionedSerializer<T> {
        private NoOutputSerializer() {
        }

        public int getVersion() {
            return 1;
        }

        public byte[] serialize(T t) {
            throw new IllegalStateException("Should not serialize anything");
        }

        public T deserialize(int i, byte[] bArr) {
            throw new IllegalStateException("Should not deserialize anything");
        }
    }

    public StoreSink(ObjectIdentifier objectIdentifier, FileStore fileStore, int[] iArr, int[] iArr2, int[] iArr3, int i, @Nullable CatalogLock.Factory factory, @Nullable Map<String, String> map, @Nullable LogSinkProvider logSinkProvider) {
        this.tableIdentifier = objectIdentifier;
        this.fileStore = fileStore;
        this.partitions = iArr;
        this.primaryKeys = iArr2;
        this.logPrimaryKeys = iArr3;
        this.numBucket = i;
        this.lockFactory = factory;
        this.overwritePartition = map;
        this.logSinkProvider = logSinkProvider;
    }

    /* renamed from: createWriter, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
    public StoreSinkWriter<WriterStateT> m12createWriter(Sink.InitContext initContext) throws IOException {
        return m9restoreWriter(initContext, (Collection) null);
    }

    /* renamed from: restoreWriter, reason: merged with bridge method [inline-methods] */
    public StoreSinkWriter<WriterStateT> m9restoreWriter(Sink.InitContext initContext, Collection<WriterStateT> collection) throws IOException {
        SinkWriter sinkWriter = null;
        LogSinkProvider.WriteCallback writeCallback = null;
        if (this.logSinkProvider != null) {
            writeCallback = new LogWriteCallback();
            LogInitContext logInitContext = new LogInitContext(initContext, this.logSinkProvider.createMetadataConsumer(writeCallback));
            StatefulSink createSink = this.logSinkProvider.createSink();
            sinkWriter = collection == null ? createSink.createWriter(logInitContext) : createSink.restoreWriter(logInitContext, collection);
        }
        return new StoreSinkWriter<>(this.fileStore.newWrite(), new SinkRecordConverter(this.numBucket, this.primaryKeys.length > 0 ? this.fileStore.valueType() : this.fileStore.keyType(), this.partitions, this.primaryKeys, this.logPrimaryKeys), this.overwritePartition != null, sinkWriter, writeCallback);
    }

    public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
        return this.logSinkProvider == null ? new NoOutputSerializer() : this.logSinkProvider.createSink().getWriterStateSerializer();
    }

    @Nullable
    private Committer<LogCommT> logCommitter() {
        if (this.logSinkProvider == null) {
            return null;
        }
        TwoPhaseCommittingSink createSink = this.logSinkProvider.createSink();
        if (!(createSink instanceof TwoPhaseCommittingSink)) {
            return null;
        }
        try {
            return createSink.createCommitter();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    @Nullable
    private SimpleVersionedSerializer<LogCommT> logCommitSerializer() {
        if (this.logSinkProvider == null) {
            return null;
        }
        TwoPhaseCommittingSink createSink = this.logSinkProvider.createSink();
        if (createSink instanceof TwoPhaseCommittingSink) {
            return createSink.getCommittableSerializer();
        }
        return null;
    }

    public Committer<Committable> createCommitter() {
        return new StoreLocalCommitter(logCommitter());
    }

    @Override // org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink
    /* renamed from: createGlobalCommitter, reason: merged with bridge method [inline-methods] */
    public GlobalCommitter<Committable, ManifestCommittable> createGlobalCommitter2() {
        final CatalogLock create;
        Lock lock;
        if (this.lockFactory == null) {
            create = null;
            lock = null;
        } else {
            create = this.lockFactory.create();
            lock = new Lock() { // from class: org.apache.flink.table.store.connector.sink.StoreSink.1
                public <T> T runWithLock(Callable<T> callable) throws Exception {
                    return (T) create.runWithLock(StoreSink.this.tableIdentifier.getDatabaseName(), StoreSink.this.tableIdentifier.getObjectName(), callable);
                }
            };
        }
        return new StoreGlobalCommitter(this.fileStore.newCommit().withLock(lock), this.fileStore.newExpire().withLock(lock), create, this.overwritePartition);
    }

    public SimpleVersionedSerializer<Committable> getCommittableSerializer() {
        return new CommittableSerializer(fileCommitSerializer(), logCommitSerializer());
    }

    @Override // org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink
    public ManifestCommittableSerializer getGlobalCommittableSerializer() {
        return new ManifestCommittableSerializer(this.fileStore.partitionType(), this.fileStore.keyType(), this.fileStore.valueType());
    }

    private FileCommittableSerializer fileCommitSerializer() {
        return new FileCommittableSerializer(this.fileStore.partitionType(), this.fileStore.keyType(), this.fileStore.valueType());
    }
}
