package org.apache.seatunnel.translation.flink.sink;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.flink.serialization.CommitWrapperSerializer;
import org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer;
import org.apache.seatunnel.translation.flink.serialization.FlinkWriterStateSerializer;

/* loaded from: input_file:org/apache/seatunnel/translation/flink/sink/FlinkSink.class */
public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>, GlobalCommT> {
    private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink;
    private final CatalogTable catalogTable;

    public FlinkSink(SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> seaTunnelSink, CatalogTable catalogTable) {
        this.sink = seaTunnelSink;
        this.catalogTable = catalogTable;
    }

    public SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> createWriter(Sink.InitContext initContext, List<FlinkWriterState<WriterStateT>> list) throws IOException {
        FlinkSinkWriterContext flinkSinkWriterContext = new FlinkSinkWriterContext(initContext);
        if (list == null || list.isEmpty()) {
            return new FlinkSinkWriter(this.sink.createWriter(flinkSinkWriterContext), 1L, this.catalogTable.getSeaTunnelRowType(), flinkSinkWriterContext.getMetricsContext());
        }
        return new FlinkSinkWriter(this.sink.restoreWriter(flinkSinkWriterContext, (List) list.stream().map((v0) -> {
            return v0.getState();
        }).collect(Collectors.toList())), list.get(0).getCheckpointId() + 1, this.catalogTable.getSeaTunnelRowType(), flinkSinkWriterContext.getMetricsContext());
    }

    public Optional<Committer<CommitWrapper<CommT>>> createCommitter() throws IOException {
        return this.sink.createCommitter().map(FlinkCommitter::new);
    }

    public Optional<GlobalCommitter<CommitWrapper<CommT>, GlobalCommT>> createGlobalCommitter() throws IOException {
        return this.sink.createAggregatedCommitter().map(FlinkGlobalCommitter::new);
    }

    public Optional<SimpleVersionedSerializer<CommitWrapper<CommT>>> getCommittableSerializer() {
        return this.sink.getCommitInfoSerializer().map(CommitWrapperSerializer::new);
    }

    public Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer() {
        return this.sink.getAggregatedCommitInfoSerializer().map(FlinkSimpleVersionedSerializer::new);
    }

    public Optional<SimpleVersionedSerializer<FlinkWriterState<WriterStateT>>> getWriterStateSerializer() {
        return this.sink.getWriterStateSerializer().map(FlinkWriterStateSerializer::new);
    }
}
