package org.apache.seatunnel.translation.flink.sink;

import java.io.IOException;
import java.io.InvalidClassException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.seatunnel.api.common.metrics.Counter;
import org.apache.seatunnel.api.common.metrics.Meter;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SupportResourceShare;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.class */
public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> {
    private static final Logger log = LoggerFactory.getLogger(FlinkSinkWriter.class);
    private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter;
    private final Counter sinkWriteCount;
    private final Counter sinkWriteBytes;
    private final Meter sinkWriterQPS;
    private long checkpointId;
    private MultiTableResourceManager resourceManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlinkSinkWriter(org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter, long j, SeaTunnelDataType<?> seaTunnelDataType, MetricsContext metricsContext) {
        this.sinkWriter = sinkWriter;
        this.checkpointId = j;
        this.sinkWriteCount = metricsContext.counter("SinkWriteCount");
        this.sinkWriteBytes = metricsContext.counter("SinkWriteBytes");
        this.sinkWriterQPS = metricsContext.meter("SinkWriteQPS");
        if (sinkWriter instanceof SupportResourceShare) {
            this.resourceManager = ((SupportResourceShare) sinkWriter).initMultiTableResourceManager(1, 1);
            ((SupportResourceShare) sinkWriter).setMultiTableResourceManager(this.resourceManager, 0);
        }
    }

    public void write(InputT inputt, SinkWriter.Context context) throws IOException {
        if (inputt == null) {
            return;
        }
        if (!(inputt instanceof SeaTunnelRow)) {
            throw new InvalidClassException("only support SeaTunnelRow at now, the element Class is " + inputt.getClass());
        }
        this.sinkWriter.write((SeaTunnelRow) inputt);
        this.sinkWriteCount.inc();
        this.sinkWriteBytes.inc(((SeaTunnelRow) inputt).getBytesSize());
        this.sinkWriterQPS.markEvent();
    }

    public List<CommitWrapper<CommT>> prepareCommit(boolean z) throws IOException {
        return (List) this.sinkWriter.prepareCommit().map(CommitWrapper::new).map((v0) -> {
            return Collections.singletonList(v0);
        }).orElse(Collections.emptyList());
    }

    public List<FlinkWriterState<WriterStateT>> snapshotState() throws IOException {
        List<FlinkWriterState<WriterStateT>> list = (List) this.sinkWriter.snapshotState(this.checkpointId).stream().map(obj -> {
            return new FlinkWriterState(this.checkpointId, obj);
        }).collect(Collectors.toList());
        this.checkpointId++;
        return list;
    }

    public void close() throws Exception {
        this.sinkWriter.close();
        try {
            if (this.resourceManager != null) {
                this.resourceManager.close();
            }
        } catch (Throwable th) {
            log.error("close resourceManager error", th);
        }
    }
}
