package co.cask.cdap.data2.transaction.stream;

import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.DequeueResult;
import co.cask.tephra.Transaction;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/CombineStreamConsumer.class */
public final class CombineStreamConsumer implements StreamConsumer {
    private final StreamConsumer firstConsumer;
    private final StreamConsumer secondConsumer;
    private StreamConsumer activeConsumer;
    private boolean emptyResult;

    public CombineStreamConsumer(StreamConsumer streamConsumer, StreamConsumer streamConsumer2) {
        Preconditions.checkArgument(streamConsumer != streamConsumer2, "First and second consumers cannot be the same instance");
        Preconditions.checkArgument(streamConsumer.getStreamName().equals(streamConsumer2.getStreamName()), "Stream not match between %s and %s", new Object[]{streamConsumer, streamConsumer2});
        Preconditions.checkArgument(streamConsumer.getConsumerConfig().equals(streamConsumer2.getConsumerConfig()), "Consumer config not match between %s and %s", new Object[]{streamConsumer, streamConsumer2});
        this.firstConsumer = streamConsumer;
        this.secondConsumer = streamConsumer2;
        this.activeConsumer = streamConsumer;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumer
    public QueueName getStreamName() {
        return this.activeConsumer.getStreamName();
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumer
    public ConsumerConfig getConsumerConfig() {
        return this.activeConsumer.getConsumerConfig();
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumer
    public DequeueResult<StreamEvent> poll(int i, long j, TimeUnit timeUnit) throws IOException, InterruptedException {
        DequeueResult<StreamEvent> poll = this.activeConsumer.poll(i, j, timeUnit);
        if (this.activeConsumer == this.firstConsumer) {
            this.emptyResult = poll.isEmpty();
        }
        return poll;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.activeConsumer != this.firstConsumer) {
            this.activeConsumer.close();
        } else {
            Closeables.closeQuietly(this.firstConsumer);
            this.secondConsumer.close();
        }
    }

    public void startTx(Transaction transaction) {
        this.activeConsumer.startTx(transaction);
    }

    public Collection<byte[]> getTxChanges() {
        return this.activeConsumer.getTxChanges();
    }

    public boolean commitTx() throws Exception {
        return this.activeConsumer.commitTx();
    }

    public void postTxCommit() {
        this.activeConsumer.postTxCommit();
        if (this.activeConsumer == this.firstConsumer && this.emptyResult) {
            this.activeConsumer = this.secondConsumer;
            Closeables.closeQuietly(this.firstConsumer);
        }
    }

    public boolean rollbackTx() throws Exception {
        return this.activeConsumer.rollbackTx();
    }

    public String getTransactionAwareName() {
        return this.activeConsumer.getTransactionAwareName();
    }

    public String toString() {
        return Objects.toStringHelper(this).add("active", this.activeConsumer).add("first", this.firstConsumer).add("second", this.secondConsumer).toString();
    }
}
