package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.Structs;
import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillSink.class */
public class WindmillSink<T> extends Sink<WindowedValue<T>> {
    private WindmillSink<T>.WindmillStreamWriter writer;
    private final Coder<T> valueCoder;
    private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
    private StreamingModeExecutionContext context;

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillSink$WindmillStreamWriter.class */
    class WindmillStreamWriter implements Sink.SinkWriter<WindowedValue<T>> {
        private Map<ByteString, Windmill.KeyedMessageBundle.Builder> productionMap;
        private final String destinationName;

        private WindmillStreamWriter(String str) {
            this.destinationName = str;
            this.productionMap = new HashMap();
        }

        private <T> ByteString encode(Coder<T> coder, T t) throws IOException {
            ByteString.Output newOutput = ByteString.newOutput();
            coder.encode(t, newOutput, Coder.Context.OUTER);
            return newOutput.toByteString();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink.SinkWriter
        public long add(WindowedValue<T> windowedValue) throws IOException {
            ByteString serializedKey;
            ByteString encode;
            ByteString byteString = ByteString.EMPTY;
            ByteString encodeMetadata = WindmillSink.encodeMetadata(WindmillSink.this.windowsCoder, windowedValue.getWindows(), windowedValue.getPane());
            if (WindmillSink.this.valueCoder instanceof KvCoder) {
                KvCoder kvCoder = (KvCoder) WindmillSink.this.valueCoder;
                KV kv = (KV) windowedValue.getValue();
                serializedKey = encode(kvCoder.getKeyCoder(), kv.getKey());
                Coder valueCoder = kvCoder.getValueCoder();
                if (valueCoder instanceof ValueWithRecordId.ValueWithRecordIdCoder) {
                    ValueWithRecordId valueWithRecordId = (ValueWithRecordId) kv.getValue();
                    encode = encode(((ValueWithRecordId.ValueWithRecordIdCoder) valueCoder).getValueCoder(), valueWithRecordId.getValue());
                    byteString = ByteString.copyFrom(valueWithRecordId.getId());
                } else {
                    encode = encode(valueCoder, kv.getValue());
                }
            } else {
                serializedKey = WindmillSink.this.context.getSerializedKey();
                encode = encode(WindmillSink.this.valueCoder, windowedValue.getValue());
            }
            Windmill.KeyedMessageBundle.Builder builder = this.productionMap.get(serializedKey);
            if (builder == null) {
                builder = Windmill.KeyedMessageBundle.newBuilder().setKey(serializedKey);
                this.productionMap.put(serializedKey, builder);
            }
            builder.addMessages(Windmill.Message.newBuilder().setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(windowedValue.getTimestamp())).setData(encode).setMetadata(encodeMetadata).build());
            builder.addMessagesIds(byteString);
            return serializedKey.size() + encode.size() + encodeMetadata.size() + byteString.size();
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink.SinkWriter, java.lang.AutoCloseable
        public void close() throws IOException {
            Windmill.OutputMessageBundle.Builder destinationStreamId = Windmill.OutputMessageBundle.newBuilder().setDestinationStreamId(this.destinationName);
            Iterator<Windmill.KeyedMessageBundle.Builder> it = this.productionMap.values().iterator();
            while (it.hasNext()) {
                destinationStreamId.addBundles(it.next().build());
            }
            if (destinationStreamId.getBundlesCount() > 0) {
                WindmillSink.this.context.getOutputBuilder().addOutputMessages(destinationStreamId.build());
            }
            this.productionMap.clear();
        }
    }

    WindmillSink(String str, Coder<WindowedValue<T>> coder, StreamingModeExecutionContext streamingModeExecutionContext) {
        this.writer = new WindmillStreamWriter(str);
        WindowedValue.FullWindowedValueCoder fullWindowedValueCoder = (WindowedValue.FullWindowedValueCoder) coder;
        this.valueCoder = fullWindowedValueCoder.getValueCoder();
        this.windowsCoder = fullWindowedValueCoder.getWindowsCoder();
        this.context = streamingModeExecutionContext;
    }

    public static ByteString encodeMetadata(Coder<Collection<? extends BoundedWindow>> coder, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) throws IOException {
        OutputStream newOutput = ByteString.newOutput();
        PaneInfo.PaneInfoCoder.INSTANCE.encode(paneInfo, newOutput, Coder.Context.NESTED);
        coder.encode(collection, newOutput, Coder.Context.OUTER);
        return newOutput.toByteString();
    }

    public static PaneInfo decodeMetadataPane(ByteString byteString) throws IOException {
        return PaneInfo.PaneInfoCoder.INSTANCE.decode(byteString.newInput(), Coder.Context.NESTED);
    }

    public static Collection<? extends BoundedWindow> decodeMetadataWindows(Coder<Collection<? extends BoundedWindow>> coder, ByteString byteString) throws IOException {
        InputStream newInput = byteString.newInput();
        PaneInfo.PaneInfoCoder.INSTANCE.decode(newInput, Coder.Context.NESTED);
        return coder.decode(newInput, Coder.Context.OUTER);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static <T> WindmillSink<T> create(PipelineOptions pipelineOptions, CloudObject cloudObject, Coder<WindowedValue<T>> coder, ExecutionContext executionContext, CounterSet.AddCounterMutator addCounterMutator) throws Exception {
        return new WindmillSink<>(Structs.getString(cloudObject, "stream_id"), coder, (StreamingModeExecutionContext) executionContext);
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink
    public Sink.SinkWriter<WindowedValue<T>> writer() {
        return this.writer;
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink
    public boolean supportsRestart() {
        return true;
    }
}
