package org.apache.kylin.cube.inmemcubing;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.gridtable.GTRecord;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.class */
public class ConsumeBlockingQueueControllerTest {
    private static final Logger logger = LoggerFactory.getLogger(ConsumeBlockingQueueControllerTest.class);

    /* loaded from: input_file:org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest$InputConverterUnitTest.class */
    private static class InputConverterUnitTest implements InputConverterUnit<String> {
        public static final String END_ROW = new String();
        public static final String CUT_ROW = "0";

        private InputConverterUnitTest() {
        }

        public void convert(String str, GTRecord gTRecord) {
            throw new UnsupportedOperationException();
        }

        public boolean ifEnd(String str) {
            return str == END_ROW;
        }

        public boolean ifCut(String str) {
            return str == CUT_ROW;
        }

        /* renamed from: getEndRow, reason: merged with bridge method [inline-methods] */
        public String m4getEndRow() {
            return END_ROW;
        }

        /* renamed from: getCutRow, reason: merged with bridge method [inline-methods] */
        public String m3getCutRow() {
            return CUT_ROW;
        }

        public boolean ifChange() {
            throw new UnsupportedOperationException();
        }
    }

    @Test
    public void testIterator() {
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final RecordConsumeBlockingQueueController queueController = RecordConsumeBlockingQueueController.getQueueController(new InputConverterUnitTest(), linkedBlockingQueue, 60);
        new Thread(new Runnable() { // from class: org.apache.kylin.cube.inmemcubing.ConsumeBlockingQueueControllerTest.1
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 1; i <= 4345; i++) {
                    try {
                        linkedBlockingQueue.put("test");
                        if (i % 2000 == 0) {
                            linkedBlockingQueue.put(InputConverterUnitTest.CUT_ROW);
                        }
                    } catch (InterruptedException e) {
                        ConsumeBlockingQueueControllerTest.logger.warn("Fail to produce records into BlockingQueue due to: " + e);
                        return;
                    }
                }
                linkedBlockingQueue.put(InputConverterUnitTest.END_ROW);
            }
        }).start();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        while (!queueController.ifEnd()) {
            atomicInteger2.incrementAndGet();
            Thread thread = new Thread(new Runnable() { // from class: org.apache.kylin.cube.inmemcubing.ConsumeBlockingQueueControllerTest.2
                @Override // java.lang.Runnable
                public void run() {
                    while (queueController.hasNext()) {
                        queueController.next();
                        atomicInteger.incrementAndGet();
                    }
                    System.out.println(atomicInteger.get() + " records consumed when finished split " + atomicInteger2.get());
                }
            });
            thread.start();
            try {
                thread.join();
            } catch (InterruptedException e) {
                logger.warn("Fail to join consumer thread: " + e);
            }
        }
        Assert.assertEquals(4345L, atomicInteger.get());
    }
}
