package org.apache.gobblin.stream;

import io.reactivex.Flowable;
import java.io.Closeable;
import java.io.IOException;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.RecordStreamProcessor;
import org.apache.gobblin.records.RecordStreamWithMetadata;

/* loaded from: input_file:WEB-INF/lib/gobblin-api-0.12.0.jar:org/apache/gobblin/stream/ControlMessageInjector.class */
public abstract class ControlMessageInjector<SI, DI> implements Closeable, RecordStreamProcessor<SI, SI, DI, DI> {
    protected ControlMessageInjector<SI, DI> init(WorkUnitState workUnitState) {
        return this;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    protected void setInputGlobalMetadata(GlobalMetadata<SI> globalMetadata, WorkUnitState workUnitState) {
    }

    protected abstract Iterable<ControlMessage<DI>> injectControlMessagesBefore(RecordEnvelope<DI> recordEnvelope, WorkUnitState workUnitState);

    protected abstract Iterable<ControlMessage<DI>> injectControlMessagesAfter(RecordEnvelope<DI> recordEnvelope, WorkUnitState workUnitState);

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.gobblin.records.RecordStreamProcessor
    public RecordStreamWithMetadata<DI, SI> processStream(RecordStreamWithMetadata<DI, SI> recordStreamWithMetadata, WorkUnitState workUnitState) throws RecordStreamProcessor.StreamProcessingException {
        init(workUnitState);
        setInputGlobalMetadata(recordStreamWithMetadata.getGlobalMetadata(), workUnitState);
        return (RecordStreamWithMetadata<DI, SI>) recordStreamWithMetadata.withRecordStream((Flowable) recordStreamWithMetadata.getRecordStream().flatMap(streamEntity -> {
            if (streamEntity instanceof ControlMessage) {
                if (streamEntity instanceof MetadataUpdateControlMessage) {
                    setInputGlobalMetadata(((MetadataUpdateControlMessage) streamEntity).getGlobalMetadata(), workUnitState);
                }
                getMessageHandler().handleMessage((ControlMessage) streamEntity);
                return Flowable.just(streamEntity);
            }
            if (!(streamEntity instanceof RecordEnvelope)) {
                throw new UnsupportedOperationException();
            }
            RecordEnvelope<DI> recordEnvelope = (RecordEnvelope) streamEntity;
            Iterable<ControlMessage<DI>> injectControlMessagesBefore = injectControlMessagesBefore(recordEnvelope, workUnitState);
            Iterable<ControlMessage<DI>> injectControlMessagesAfter = injectControlMessagesAfter(recordEnvelope, workUnitState);
            if (injectControlMessagesBefore == null && injectControlMessagesAfter == null) {
                return Flowable.just(recordEnvelope);
            }
            Flowable concatWith = injectControlMessagesBefore != null ? Flowable.fromIterable(injectControlMessagesBefore).concatWith(Flowable.just(recordEnvelope)) : Flowable.just(recordEnvelope);
            if (injectControlMessagesAfter != null) {
                concatWith.concatWith(Flowable.fromIterable(injectControlMessagesAfter));
            }
            return concatWith;
        }, 1).doOnComplete(this::close), (GlobalMetadata) recordStreamWithMetadata.getGlobalMetadata());
    }

    protected ControlMessageHandler getMessageHandler() {
        return ControlMessageHandler.NOOP;
    }
}
