package org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator;

import com.google.cloud.Timestamp;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.MathContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.IOUtils;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.beam.sdk.coders.Coder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/BytesThroughputEstimatorTest.class */
public class BytesThroughputEstimatorTest {
    private static final double DELTA = 1.0E-10d;
    private static final int WINDOW_SIZE_SECONDS = 10;
    private BytesThroughputEstimator<byte[]> estimator;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/BytesThroughputEstimatorTest$TestCoder.class */
    private static class TestCoder extends Coder<byte[]> {
        private TestCoder() {
        }

        public void encode(byte[] bArr, OutputStream outputStream) throws IOException {
            outputStream.write(bArr);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public byte[] m141decode(InputStream inputStream) throws IOException {
            return IOUtils.toByteArray(inputStream);
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return Collections.emptyList();
        }

        public void verifyDeterministic() {
        }
    }

    @Before
    public void setup() {
        this.estimator = new BytesThroughputEstimator<>(WINDOW_SIZE_SECONDS, new SizeEstimator(new TestCoder()));
    }

    @Test
    public void testThroughputIsZeroWhenNothingsBeenRegistered() {
        Assert.assertEquals(0.0d, this.estimator.get(), DELTA);
        Assert.assertEquals(0.0d, this.estimator.getFrom(Timestamp.now()), DELTA);
    }

    @Test
    public void testThroughputCalculation() {
        this.estimator.update(Timestamp.ofTimeSecondsAndNanos(2L, 0), new byte[WINDOW_SIZE_SECONDS]);
        this.estimator.update(Timestamp.ofTimeSecondsAndNanos(3L, 0), new byte[20]);
        this.estimator.update(Timestamp.ofTimeSecondsAndNanos(5L, 0), new byte[30]);
        this.estimator.update(Timestamp.ofTimeSecondsAndNanos(10L, 0), new byte[40]);
        Assert.assertEquals(10.0d, this.estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(11L, 0)), DELTA);
        this.estimator.update(Timestamp.ofTimeSecondsAndNanos(20L, 0), new byte[WINDOW_SIZE_SECONDS]);
        this.estimator.update(Timestamp.ofTimeSecondsAndNanos(21L, 0), new byte[20]);
        this.estimator.update(Timestamp.ofTimeSecondsAndNanos(21L, 0), new byte[WINDOW_SIZE_SECONDS]);
        this.estimator.update(Timestamp.ofTimeSecondsAndNanos(29L, 0), new byte[40]);
        Assert.assertEquals(8.0d, this.estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(30L, 0)), DELTA);
        this.estimator.update(Timestamp.ofTimeSecondsAndNanos(31L, 0), new byte[WINDOW_SIZE_SECONDS]);
        this.estimator.update(Timestamp.ofTimeSecondsAndNanos(35L, 0), new byte[40]);
        Assert.assertEquals(5.0d, this.estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(41L, 0)), DELTA);
        Assert.assertEquals(0.0d, this.estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(50L, 0)), DELTA);
    }

    @Test
    public void testThroughputIsAccumulatedWithin60SecondsWindow() {
        List<ImmutablePair<Timestamp, byte[]>> generateTestData = generateTestData(100, 0, WINDOW_SIZE_SECONDS);
        generateTestData.sort(Comparator.comparing((v0) -> {
            return v0.getLeft();
        }));
        Timestamp timestamp = (Timestamp) generateTestData.get(generateTestData.size() - 1).getLeft();
        BigDecimal valueOf = BigDecimal.valueOf(0L);
        Iterator<ImmutablePair<Timestamp, byte[]>> it = generateTestData.iterator();
        while (it.hasNext()) {
            valueOf = valueOf.add(BigDecimal.valueOf(((byte[]) it.next().getRight()).length));
        }
        BigDecimal divide = valueOf.divide(BigDecimal.valueOf(10L), MathContext.DECIMAL128);
        for (ImmutablePair<Timestamp, byte[]> immutablePair : generateTestData) {
            this.estimator.update((Timestamp) immutablePair.getLeft(), (byte[]) immutablePair.getRight());
        }
        Assert.assertEquals(divide.doubleValue(), this.estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(10L, 0)), DELTA);
        Assert.assertEquals(0.0d, this.estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(timestamp.getSeconds() + 10 + 1, timestamp.getNanos())), DELTA);
    }

    @Test
    public void testThroughputIsAccumulatedWithin50SecondsWindow() {
        List<ImmutablePair<Timestamp, byte[]>> generateTestData = generateTestData(300, 0, WINDOW_SIZE_SECONDS);
        List<ImmutablePair<Timestamp, byte[]>> generateTestData2 = generateTestData(50, WINDOW_SIZE_SECONDS, 20);
        List<ImmutablePair> list = (List) Stream.concat(generateTestData.stream(), generateTestData2.stream()).sorted(Comparator.comparing((v0) -> {
            return v0.getLeft();
        })).collect(Collectors.toList());
        Timestamp timestamp = (Timestamp) ((ImmutablePair) list.get(list.size() - 1)).getLeft();
        BigDecimal valueOf = BigDecimal.valueOf(0L);
        Iterator<ImmutablePair<Timestamp, byte[]>> it = generateTestData2.iterator();
        while (it.hasNext()) {
            valueOf = valueOf.add(BigDecimal.valueOf(((byte[]) it.next().getRight()).length));
        }
        BigDecimal divide = valueOf.divide(BigDecimal.valueOf(10L), MathContext.DECIMAL128);
        for (ImmutablePair immutablePair : list) {
            this.estimator.update((Timestamp) immutablePair.getLeft(), (byte[]) immutablePair.getRight());
        }
        Assert.assertEquals(divide.doubleValue(), this.estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(20L, 0)), DELTA);
        Assert.assertEquals(0.0d, this.estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(timestamp.getSeconds() + 10 + 1, timestamp.getNanos())), DELTA);
    }

    private List<ImmutablePair<Timestamp, byte[]>> generateTestData(int i, int i2, int i3) {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        ArrayList arrayList = new ArrayList();
        for (int i4 = 0; i4 < i; i4++) {
            arrayList.add(new ImmutablePair(Timestamp.ofTimeSecondsAndNanos(current.nextInt(i3 - i2) + i2, 0), new byte[current.nextInt(100)]));
        }
        return arrayList;
    }
}
