package org.apache.seatunnel.api.sink.multitablesink;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

/* loaded from: input_file:org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.class */
public class MultiTableSink implements SeaTunnelSink<SeaTunnelRow, MultiTableState, MultiTableCommitInfo, MultiTableAggregatedCommitInfo> {
    private final Map<String, SeaTunnelSink> sinks;
    private final int replicaNum;

    public MultiTableSink(MultiTableFactoryContext multiTableFactoryContext) {
        this.sinks = multiTableFactoryContext.getSinks();
        this.replicaNum = ((Integer) multiTableFactoryContext.getOptions().get(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)).intValue();
    }

    @Override // org.apache.seatunnel.api.common.PluginIdentifierInterface
    public String getPluginName() {
        return "MultiTableSink";
    }

    @Override // org.apache.seatunnel.api.sink.SeaTunnelSink
    public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> createWriter(SinkWriter.Context context) throws IOException {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < this.replicaNum; i++) {
            for (String str : this.sinks.keySet()) {
                SeaTunnelSink seaTunnelSink = this.sinks.get(str);
                int indexOfSubtask = (context.getIndexOfSubtask() * this.replicaNum) + i;
                hashMap.put(SinkIdentifier.of(str, indexOfSubtask), seaTunnelSink.createWriter(new SinkContextProxy(indexOfSubtask, context)));
            }
        }
        return new MultiTableSinkWriter(hashMap, this.replicaNum);
    }

    @Override // org.apache.seatunnel.api.sink.SeaTunnelSink
    public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> restoreWriter(SinkWriter.Context context, List<MultiTableState> list) throws IOException {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < this.replicaNum; i++) {
            for (String str : this.sinks.keySet()) {
                SeaTunnelSink seaTunnelSink = this.sinks.get(str);
                int indexOfSubtask = (context.getIndexOfSubtask() * this.replicaNum) + i;
                SinkIdentifier of = SinkIdentifier.of(str, indexOfSubtask);
                List list2 = (List) list.stream().map(multiTableState -> {
                    return multiTableState.getStates().get(of);
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).flatMap((v0) -> {
                    return v0.stream();
                }).collect(Collectors.toList());
                if (list2.isEmpty()) {
                    hashMap.put(of, seaTunnelSink.createWriter(new SinkContextProxy(indexOfSubtask, context)));
                } else {
                    hashMap.put(of, seaTunnelSink.restoreWriter(new SinkContextProxy(indexOfSubtask, context), list2));
                }
            }
        }
        return new MultiTableSinkWriter(hashMap, this.replicaNum);
    }

    @Override // org.apache.seatunnel.api.sink.SeaTunnelSink
    public Optional<Serializer<MultiTableState>> getWriterStateSerializer() {
        return Optional.of(new DefaultSerializer());
    }

    @Override // org.apache.seatunnel.api.sink.SeaTunnelSink
    public Optional<SinkCommitter<MultiTableCommitInfo>> createCommitter() throws IOException {
        HashMap hashMap = new HashMap();
        for (String str : this.sinks.keySet()) {
            this.sinks.get(str).createCommitter().ifPresent(obj -> {
            });
        }
        return hashMap.isEmpty() ? Optional.empty() : Optional.of(new MultiTableSinkCommitter(hashMap));
    }

    @Override // org.apache.seatunnel.api.sink.SeaTunnelSink
    public Optional<Serializer<MultiTableCommitInfo>> getCommitInfoSerializer() {
        return Optional.of(new DefaultSerializer());
    }

    @Override // org.apache.seatunnel.api.sink.SeaTunnelSink
    public Optional<SinkAggregatedCommitter<MultiTableCommitInfo, MultiTableAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
        HashMap hashMap = new HashMap();
        for (String str : this.sinks.keySet()) {
            this.sinks.get(str).createAggregatedCommitter().ifPresent(sinkAggregatedCommitter -> {
            });
        }
        return hashMap.isEmpty() ? Optional.empty() : Optional.of(new MultiTableSinkAggregatedCommitter(hashMap));
    }

    public List<TablePath> getSinkTables() {
        return (List) this.sinks.keySet().stream().map(TablePath::of).collect(Collectors.toList());
    }

    @Override // org.apache.seatunnel.api.sink.SeaTunnelSink
    public Optional<Serializer<MultiTableAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
        return Optional.of(new DefaultSerializer());
    }

    @Override // org.apache.seatunnel.api.source.SeaTunnelJobAware
    public void setJobContext(JobContext jobContext) {
        this.sinks.values().forEach(seaTunnelSink -> {
            seaTunnelSink.setJobContext(jobContext);
        });
    }

    public Map<String, SeaTunnelSink> getSinks() {
        return this.sinks;
    }
}
