package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.services.dataflow.model.SideInputInfo;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Ascii;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Supplier;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableSet;
import com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat;
import com.google.cloud.dataflow.sdk.runners.worker.StateFetcher;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.BaseExecutionContext;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.TimeDomain;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.util.state.StateInternals;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.joda.time.Instant;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingModeExecutionContext.class */
public class StreamingModeExecutionContext extends DataflowExecutionContext {
    private final String stageName;
    private final Map<TupleTag<?>, Map<BoundedWindow, Object>> sideInputCache = new HashMap();
    private final ConcurrentMap<ByteString, StreamingDataflowWorker.ReaderCacheEntry> readerCache;
    private final ConcurrentMap<String, String> stateNameMap;
    private Windmill.WorkItem work;
    private Instant inputDataWatermark;
    private WindmillStateReader stateReader;
    private StateFetcher stateFetcher;
    private Windmill.WorkItemCommitRequest.Builder outputBuilder;
    private UnboundedSource.UnboundedReader<?> activeReader;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext$2, reason: invalid class name */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingModeExecutionContext$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$google$cloud$dataflow$sdk$util$TimeDomain = new int[TimeDomain.values().length];

        static {
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$util$TimeDomain[TimeDomain.EVENT_TIME.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$util$TimeDomain[TimeDomain.PROCESSING_TIME.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$util$TimeDomain[TimeDomain.SYNCHRONIZED_PROCESSING_TIME.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingModeExecutionContext$StepContext.class */
    public class StepContext extends BaseExecutionContext.StepContext {
        private WindmillStateInternals stateInternals;
        private WindmillTimerInternals timerInternals;
        private final String prefix;
        private final String stateFamily;
        private final Supplier<StateSampler.ScopedState> scopedReadStateSupplier;

        public StepContext(final String str, String str2, final StateSampler stateSampler) {
            super(StreamingModeExecutionContext.this, str, str2);
            if (StreamingModeExecutionContext.this.stateNameMap.isEmpty()) {
                this.prefix = str2;
                this.stateFamily = "";
            } else {
                String str3 = (String) StreamingModeExecutionContext.this.stateNameMap.get(str2);
                this.prefix = str3 == null ? "" : str3;
                this.stateFamily = this.prefix;
            }
            this.scopedReadStateSupplier = new Supplier<StateSampler.ScopedState>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext.StepContext.1
                private int readState = -1;

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Supplier
                public StateSampler.ScopedState get() {
                    if (stateSampler == null) {
                        return null;
                    }
                    if (this.readState == -1) {
                        this.readState = stateSampler.stateForName(String.valueOf(str).concat("-windmill-read"));
                    }
                    return stateSampler.scopedState(this.readState);
                }
            };
        }

        public void start(WindmillStateReader windmillStateReader, Instant instant) {
            this.stateInternals = new WindmillStateInternals(this.prefix, !StreamingModeExecutionContext.this.stateNameMap.isEmpty(), windmillStateReader, this.scopedReadStateSupplier);
            this.timerInternals = new WindmillTimerInternals(this.stateFamily, (Instant) Preconditions.checkNotNull(instant));
        }

        public void flushState() {
            this.stateInternals.persist(StreamingModeExecutionContext.this.outputBuilder);
            this.timerInternals.persistTo(StreamingModeExecutionContext.this.outputBuilder);
        }

        @Override // com.google.cloud.dataflow.sdk.util.BaseExecutionContext.StepContext, com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext
        public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tupleTag, Iterable<WindowedValue<T>> iterable, Coder<Iterable<WindowedValue<T>>> coder, W w, Coder<W> coder2) throws IOException {
            if (StreamingModeExecutionContext.this.getSerializedKey().size() != 0) {
                throw new IllegalStateException("writePCollectionViewData must follow a Combine.globally");
            }
            ByteString.Output newOutput = ByteString.newOutput();
            coder.encode(iterable, newOutput, Coder.Context.OUTER);
            ByteString.Output newOutput2 = ByteString.newOutput();
            coder2.encode(w, newOutput2, Coder.Context.OUTER);
            Windmill.GlobalData.Builder data = Windmill.GlobalData.newBuilder().setDataId(Windmill.GlobalDataId.newBuilder().setTag(tupleTag.getId()).setVersion(newOutput2.toByteString()).build()).setData(newOutput.toByteString());
            if (this.stateFamily != null) {
                data.setStateFamily(this.stateFamily);
            }
            StreamingModeExecutionContext.this.outputBuilder.addGlobalDataUpdates(data.build());
        }

        /* JADX WARN: Type inference failed for: r0v3, types: [com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow] */
        public boolean issueSideInputFetch(PCollectionView<?> pCollectionView, BoundedWindow boundedWindow, StateFetcher.SideInputState sideInputState) {
            return StreamingModeExecutionContext.this.fetchSideInput(pCollectionView, pCollectionView.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(boundedWindow), this.stateFamily, sideInputState, this.scopedReadStateSupplier) != null;
        }

        public void addBlockingSideInput(Windmill.GlobalDataRequest globalDataRequest) {
            if (this.stateFamily != null) {
                globalDataRequest = Windmill.GlobalDataRequest.newBuilder(globalDataRequest).setStateFamily(this.stateFamily).build();
            }
            StreamingModeExecutionContext.this.outputBuilder.addGlobalDataRequests(globalDataRequest);
            StreamingModeExecutionContext.this.outputBuilder.addGlobalDataIdRequests(globalDataRequest.getDataId());
        }

        public void addBlockingSideInputs(Iterable<Windmill.GlobalDataRequest> iterable) {
            Iterator<Windmill.GlobalDataRequest> it = iterable.iterator();
            while (it.hasNext()) {
                addBlockingSideInput(it.next());
            }
        }

        @Override // com.google.cloud.dataflow.sdk.util.BaseExecutionContext.StepContext, com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext
        public StateInternals stateInternals() {
            return (StateInternals) Preconditions.checkNotNull(this.stateInternals);
        }

        @Override // com.google.cloud.dataflow.sdk.util.BaseExecutionContext.StepContext, com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext
        public TimerInternals timerInternals() {
            return (TimerInternals) Preconditions.checkNotNull(this.timerInternals);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingModeExecutionContext$StreamingModeSideInputReader.class */
    public static class StreamingModeSideInputReader implements SideInputReader {
        private StreamingModeExecutionContext context;
        private Set<PCollectionView<?>> viewSet;

        private StreamingModeSideInputReader(Iterable<? extends PCollectionView<?>> iterable, StreamingModeExecutionContext streamingModeExecutionContext) {
            this.context = streamingModeExecutionContext;
            this.viewSet = ImmutableSet.copyOf(iterable);
        }

        public static StreamingModeSideInputReader of(Iterable<? extends PCollectionView<?>> iterable, StreamingModeExecutionContext streamingModeExecutionContext) {
            return new StreamingModeSideInputReader(iterable, streamingModeExecutionContext);
        }

        @Override // com.google.cloud.dataflow.sdk.util.SideInputReader
        public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
            if (contains(pCollectionView)) {
                return (T) this.context.fetchSideInput(pCollectionView, boundedWindow, null, StateFetcher.SideInputState.CACHED_IN_WORKITEM, null);
            }
            throw new RuntimeException("get() called with unknown view");
        }

        @Override // com.google.cloud.dataflow.sdk.util.SideInputReader
        public <T> boolean contains(PCollectionView<T> pCollectionView) {
            return this.viewSet.contains(pCollectionView);
        }

        @Override // com.google.cloud.dataflow.sdk.util.SideInputReader
        public boolean isEmpty() {
            return this.viewSet.isEmpty();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/StreamingModeExecutionContext$WindmillTimerInternals.class */
    public static class WindmillTimerInternals implements TimerInternals {
        private Map<TimerInternals.TimerData, Boolean> timers = new HashMap();
        private Instant inputDataWatermark;
        private String stateFamily;

        public WindmillTimerInternals(String str, Instant instant) {
            this.inputDataWatermark = instant;
            this.stateFamily = str;
        }

        @Override // com.google.cloud.dataflow.sdk.util.TimerInternals
        public void setTimer(TimerInternals.TimerData timerData) {
            this.timers.put(timerData, true);
        }

        @Override // com.google.cloud.dataflow.sdk.util.TimerInternals
        public void deleteTimer(TimerInternals.TimerData timerData) {
            this.timers.put(timerData, false);
        }

        @Override // com.google.cloud.dataflow.sdk.util.TimerInternals
        public Instant currentProcessingTime() {
            return Instant.now();
        }

        @Override // com.google.cloud.dataflow.sdk.util.TimerInternals
        public Instant currentWatermarkTime() {
            return this.inputDataWatermark;
        }

        private ByteString timerTag(TimerInternals.TimerData timerData) {
            return ByteString.copyFromUtf8(String.format("%s+%d:%d", timerData.getNamespace().stringKey(), Integer.valueOf(timerData.getDomain().ordinal()), Long.valueOf(timerData.getTimestamp().getMillis())));
        }

        public void persistTo(Windmill.WorkItemCommitRequest.Builder builder) {
            for (Map.Entry<TimerInternals.TimerData, Boolean> entry : this.timers.entrySet()) {
                Windmill.Timer.Builder type = builder.addOutputTimersBuilder().setTag(timerTag(entry.getKey())).setType(timerType(entry.getKey().getDomain()));
                if (this.stateFamily != null) {
                    type.setStateFamily(this.stateFamily);
                }
                if (entry.getValue().booleanValue()) {
                    type.setTimestamp(TimeUnit.MILLISECONDS.toMicros(entry.getKey().getTimestamp().getMillis()));
                }
            }
            this.timers.clear();
        }

        private Windmill.Timer.Type timerType(TimeDomain timeDomain) {
            switch (AnonymousClass2.$SwitchMap$com$google$cloud$dataflow$sdk$util$TimeDomain[timeDomain.ordinal()]) {
                case 1:
                    return Windmill.Timer.Type.WATERMARK;
                case 2:
                    return Windmill.Timer.Type.REALTIME;
                case Ascii.ETX /* 3 */:
                    return Windmill.Timer.Type.DEPENDENT_REALTIME;
                default:
                    String valueOf = String.valueOf(timeDomain);
                    throw new IllegalArgumentException(new StringBuilder(25 + String.valueOf(valueOf).length()).append("Unrecgonized TimeDomain: ").append(valueOf).toString());
            }
        }
    }

    public StreamingModeExecutionContext(String str, ConcurrentMap<ByteString, StreamingDataflowWorker.ReaderCacheEntry> concurrentMap, ConcurrentMap<String, String> concurrentMap2) {
        this.stageName = str;
        this.readerCache = concurrentMap;
        this.stateNameMap = concurrentMap2;
    }

    public void start(Windmill.WorkItem workItem, Instant instant, WindmillStateReader windmillStateReader, StateFetcher stateFetcher, Windmill.WorkItemCommitRequest.Builder builder) {
        this.work = workItem;
        this.inputDataWatermark = instant;
        this.stateReader = windmillStateReader;
        this.stateFetcher = stateFetcher;
        this.outputBuilder = builder;
        this.sideInputCache.clear();
        Iterator<ExecutionContext.StepContext> it = getAllStepContexts().iterator();
        while (it.hasNext()) {
            ((StepContext) it.next()).start(windmillStateReader, instant);
        }
    }

    @Override // com.google.cloud.dataflow.sdk.util.BaseExecutionContext
    public ExecutionContext.StepContext createStepContext(String str, String str2, StateSampler stateSampler) {
        StepContext stepContext = new StepContext(str, str2, stateSampler);
        stepContext.start(this.stateReader, this.inputDataWatermark);
        return stepContext;
    }

    @Override // com.google.cloud.dataflow.sdk.runners.worker.DataflowExecutionContext
    public SideInputReader getSideInputReader(Iterable<? extends SideInputInfo> iterable) {
        throw new UnsupportedOperationException("Cannot call getSideInputReader for StreamingDataflowWorker: the MapTask specification should not have had any SideInputInfo descriptors since the streaming runner does not yet support them.");
    }

    @Override // com.google.cloud.dataflow.sdk.runners.worker.DataflowExecutionContext
    public SideInputReader getSideInputReaderForViews(Iterable<? extends PCollectionView<?>> iterable) {
        return StreamingModeSideInputReader.of(iterable, this);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> T fetchSideInput(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow, String str, StateFetcher.SideInputState sideInputState, Supplier<StateSampler.ScopedState> supplier) {
        String str2;
        Map<BoundedWindow, Object> map = this.sideInputCache.get(pCollectionView.getTagInternal());
        if (map == null) {
            map = new HashMap();
            this.sideInputCache.put(pCollectionView.getTagInternal(), map);
        }
        T t = (T) map.get(boundedWindow);
        if (t != null) {
            return t;
        }
        if (sideInputState != StateFetcher.SideInputState.CACHED_IN_WORKITEM) {
            T t2 = (T) this.stateFetcher.fetchSideInput(pCollectionView, boundedWindow, str, sideInputState, supplier);
            if (t2 == null) {
                return null;
            }
            map.put(boundedWindow, t2);
            return t2;
        }
        String valueOf = String.valueOf(pCollectionView.getTagInternal().getId());
        if (valueOf.length() != 0) {
            str2 = "Expected side input to be cached. Tag: ".concat(valueOf);
        } else {
            str2 = r3;
            String str3 = new String("Expected side input to be cached. Tag: ");
        }
        throw new IllegalStateException(str2);
    }

    public Iterable<Windmill.GlobalDataId> getSideInputNotifications() {
        return this.work.getGlobalDataIdNotificationsList();
    }

    public ByteString getSerializedKey() {
        return this.work.getKey();
    }

    public long getWorkToken() {
        return this.work.getWorkToken();
    }

    public Windmill.WorkItem getWork() {
        return this.work;
    }

    public Windmill.WorkItemCommitRequest.Builder getOutputBuilder() {
        return this.outputBuilder;
    }

    public UnboundedSource.UnboundedReader<?> getCachedReader() {
        StreamingDataflowWorker.ReaderCacheEntry readerCacheEntry = this.readerCache.get(getSerializedKey());
        if (readerCacheEntry == null) {
            return null;
        }
        if (readerCacheEntry.token == getWork().getCacheToken()) {
            return readerCacheEntry.reader;
        }
        this.readerCache.remove(getSerializedKey());
        return null;
    }

    public void setActiveReader(UnboundedSource.UnboundedReader<?> unboundedReader) {
        this.activeReader = unboundedReader;
    }

    public UnboundedSource.CheckpointMark getReaderCheckpoint(Coder<? extends UnboundedSource.CheckpointMark> coder) {
        try {
            ByteString state = this.work.getSourceState().getState();
            if (state.isEmpty()) {
                return null;
            }
            return coder.decode(state.newInput(), Coder.Context.OUTER);
        } catch (IOException e) {
            throw new RuntimeException("Exception while decoding checkpoint", e);
        }
    }

    public Map<Long, Runnable> flushState() {
        String str;
        HashMap hashMap = new HashMap();
        Iterator<ExecutionContext.StepContext> it = getAllStepContexts().iterator();
        while (it.hasNext()) {
            ((StepContext) it.next()).flushState();
        }
        if (this.activeReader != null) {
            Windmill.SourceState.Builder sourceStateUpdatesBuilder = this.outputBuilder.getSourceStateUpdatesBuilder();
            final UnboundedSource.CheckpointMark checkpointMark = this.activeReader.getCheckpointMark();
            Instant watermark = this.activeReader.getWatermark();
            long nextLong = ThreadLocalRandom.current().nextLong();
            sourceStateUpdatesBuilder.addFinalizeIds(nextLong);
            hashMap.put(Long.valueOf(nextLong), new Runnable() { // from class: com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        checkpointMark.finalizeCheckpoint();
                    } catch (IOException e) {
                        throw new RuntimeException("Exception while finalizing checkpoint", e);
                    }
                }
            });
            Coder<?> checkpointMarkCoder = this.activeReader.getCurrentSource().getCheckpointMarkCoder();
            if (checkpointMarkCoder != null) {
                ByteString.Output newOutput = ByteString.newOutput();
                try {
                    checkpointMarkCoder.encode(checkpointMark, newOutput, Coder.Context.OUTER);
                    sourceStateUpdatesBuilder.setState(newOutput.toByteString());
                } catch (IOException e) {
                    throw new RuntimeException("Exception while encoding checkpoint", e);
                }
            }
            this.outputBuilder.setSourceWatermark(TimeUnit.MILLISECONDS.toMicros(watermark.getMillis()));
            long splitBacklogBytes = this.activeReader.getSplitBacklogBytes();
            if (splitBacklogBytes == -1 && BasicSerializableSourceFormat.isFirstUnboundedSourceSplit(getSerializedKey())) {
                splitBacklogBytes = this.activeReader.getTotalBacklogBytes();
            }
            if (splitBacklogBytes != -1) {
                Windmill.WorkItemCommitRequest.Builder builder = this.outputBuilder;
                Windmill.Counter.Builder newBuilder = Windmill.Counter.newBuilder();
                String valueOf = String.valueOf(this.stageName);
                if (valueOf.length() != 0) {
                    str = "dataflow_backlog_size-".concat(valueOf);
                } else {
                    str = r3;
                    String str2 = new String("dataflow_backlog_size-");
                }
                builder.addCounterUpdates(newBuilder.setName(str).setKind(Windmill.Counter.Kind.SUM).setIntScalar(splitBacklogBytes).setCumulative(true).build());
            }
            this.readerCache.put(getSerializedKey(), new StreamingDataflowWorker.ReaderCacheEntry(this.activeReader, getWork().getCacheToken()));
        }
        return hashMap;
    }

    public List<Long> getReadyCommitCallbackIds() {
        return this.work.getSourceState().getFinalizeIdsList();
    }
}
