package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.values.CodedTupleTag;
import com.google.cloud.dataflow.sdk.values.CodedTupleTagMap;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.joda.time.Instant;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StreamingModeExecutionContext.class */
public class StreamingModeExecutionContext extends ExecutionContext {
    private String computation;
    private Windmill.WorkItem work;
    private StateFetcher stateFetcher;
    private Windmill.WorkItemCommitRequest.Builder outputBuilder;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StreamingModeExecutionContext$StepContext.class */
    public class StepContext extends ExecutionContext.StepContext {
        private final String mangledPrefix;
        private Map<CodedTupleTag<?>, KV<?, ByteString>> stateCache;
        private Map<CodedTupleTag<?>, List<KV<ByteString, Instant>>> tagListUpdates;

        public StepContext(String str) {
            super(str);
            this.stateCache = new HashMap();
            this.tagListUpdates = new HashMap();
            int length = str.length();
            String valueOf = String.valueOf(String.valueOf(str));
            this.mangledPrefix = new StringBuilder(12 + valueOf.length()).append(length).append(":").append(valueOf).toString();
        }

        @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext, com.google.cloud.dataflow.sdk.transforms.DoFn.KeyedState
        public <T> void store(CodedTupleTag<T> codedTupleTag, T t) throws CoderException, IOException {
            ByteString.Output newOutput = ByteString.newOutput();
            codedTupleTag.getCoder().encode(t, newOutput, Coder.Context.OUTER);
            this.stateCache.put(codedTupleTag, KV.of(t, newOutput.toByteString()));
        }

        @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext, com.google.cloud.dataflow.sdk.transforms.DoFn.KeyedState
        public CodedTupleTagMap lookup(List<? extends CodedTupleTag<?>> list) throws CoderException, IOException {
            ArrayList arrayList = new ArrayList();
            ArrayList<CodedTupleTag<?>> arrayList2 = new ArrayList();
            for (CodedTupleTag<?> codedTupleTag : list) {
                if (this.stateCache.containsKey(codedTupleTag)) {
                    arrayList2.add(codedTupleTag);
                } else {
                    arrayList.add(codedTupleTag);
                }
            }
            Map<CodedTupleTag<?>, Object> lookupState = StreamingModeExecutionContext.this.lookupState(this.mangledPrefix, arrayList);
            for (CodedTupleTag<?> codedTupleTag2 : arrayList2) {
                lookupState.put(codedTupleTag2, this.stateCache.get(codedTupleTag2).getKey());
            }
            return CodedTupleTagMap.of(lookupState);
        }

        @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext
        public <T> void writeToTagList(CodedTupleTag<T> codedTupleTag, T t, Instant instant) throws IOException {
            List<KV<ByteString, Instant>> list = this.tagListUpdates.get(codedTupleTag);
            if (list == null) {
                list = new ArrayList();
                this.tagListUpdates.put(codedTupleTag, list);
            }
            ByteString.Output newOutput = ByteString.newOutput();
            codedTupleTag.getCoder().encode(t, newOutput, Coder.Context.OUTER);
            list.add(KV.of(newOutput.toByteString(), instant));
        }

        @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext
        public <T> Iterable<T> readTagList(CodedTupleTag<T> codedTupleTag) throws IOException {
            return StreamingModeExecutionContext.this.stateFetcher.fetchList(StreamingModeExecutionContext.this.computation, StreamingModeExecutionContext.this.getSerializedKey(), StreamingModeExecutionContext.this.getWorkToken(), this.mangledPrefix, codedTupleTag);
        }

        @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext
        public <T> void deleteTagList(CodedTupleTag<T> codedTupleTag) {
            StreamingModeExecutionContext.this.outputBuilder.addListUpdates(Windmill.TagList.newBuilder().setTag(serializeTag(codedTupleTag)).setEndTimestamp(Long.MAX_VALUE).build());
        }

        public void flushState() {
            for (Map.Entry<CodedTupleTag<?>, KV<?, ByteString>> entry : this.stateCache.entrySet()) {
                StreamingModeExecutionContext.this.outputBuilder.addValueUpdates(Windmill.TagValue.newBuilder().setTag(serializeTag(entry.getKey())).setValue(Windmill.Value.newBuilder().setData(entry.getValue().getValue()).setTimestamp(Long.MAX_VALUE).build()).build());
            }
            for (Map.Entry<CodedTupleTag<?>, List<KV<ByteString, Instant>>> entry2 : this.tagListUpdates.entrySet()) {
                Windmill.TagList.Builder tag = Windmill.TagList.newBuilder().setTag(serializeTag(entry2.getKey()));
                for (KV<ByteString, Instant> kv : entry2.getValue()) {
                    tag.addValues(Windmill.Value.newBuilder().setData(kv.getKey()).setTimestamp(TimeUnit.MILLISECONDS.toMicros(kv.getValue().getMillis())));
                }
                StreamingModeExecutionContext.this.outputBuilder.addListUpdates(tag.build());
            }
            this.stateCache.clear();
            this.tagListUpdates.clear();
        }

        private ByteString serializeTag(CodedTupleTag<?> codedTupleTag) {
            String str;
            String valueOf = String.valueOf(this.mangledPrefix);
            String valueOf2 = String.valueOf(codedTupleTag.getId());
            if (valueOf2.length() != 0) {
                str = valueOf.concat(valueOf2);
            } else {
                str = r1;
                String str2 = new String(valueOf);
            }
            return ByteString.copyFromUtf8(str);
        }
    }

    public StreamingModeExecutionContext(String str, StateFetcher stateFetcher) {
        this.computation = str;
        this.stateFetcher = stateFetcher;
    }

    public void start(Windmill.WorkItem workItem, Windmill.WorkItemCommitRequest.Builder builder) {
        this.work = workItem;
        this.outputBuilder = builder;
    }

    @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext
    public ExecutionContext.StepContext createStepContext(String str) {
        return new StepContext(str);
    }

    @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext
    public void setTimer(String str, Instant instant) {
        this.outputBuilder.addOutputTimers(Windmill.Timer.newBuilder().setTimestamp(TimeUnit.MILLISECONDS.toMicros(instant.getMillis())).setTag(ByteString.copyFromUtf8(str)).build());
    }

    @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext
    public void deleteTimer(String str) {
        this.outputBuilder.addOutputTimers(Windmill.Timer.newBuilder().setTag(ByteString.copyFromUtf8(str)).build());
    }

    public ByteString getSerializedKey() {
        return this.work.getKey();
    }

    public long getWorkToken() {
        return this.work.getWorkToken();
    }

    public Windmill.WorkItem getWork() {
        return this.work;
    }

    public Windmill.WorkItemCommitRequest.Builder getOutputBuilder() {
        return this.outputBuilder;
    }

    public void flushState() {
        Iterator<ExecutionContext.StepContext> it = getAllStepContexts().iterator();
        while (it.hasNext()) {
            ((StepContext) it.next()).flushState();
        }
    }

    public Map<CodedTupleTag<?>, Object> lookupState(String str, List<? extends CodedTupleTag<?>> list) throws CoderException, IOException {
        return this.stateFetcher.fetch(this.computation, getSerializedKey(), getWorkToken(), str, list);
    }
}
