package com.cloudera.cmon.pipeline;

import com.cloudera.cmon.pipeline.PipelineStage;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cmon/pipeline/TestPipelineStage.class */
public class TestPipelineStage {
    public static final int NTHREADS = 2;
    public static final int QUEUE_SZ = 4;
    public static final int UNCONGESTED_SZ = 3;
    static Logger LOG = LoggerFactory.getLogger(TestPipelineStage.class);
    static long TIMEOUT_MS = 10000;
    LinkedBlockingQueue<String> signalList = null;
    PipelineStage<String> upstream = null;
    NotificationStage<String> downstream = null;
    TestReceiver receiver = null;

    /* loaded from: input_file:com/cloudera/cmon/pipeline/TestPipelineStage$TestReceiver.class */
    static class TestReceiver extends PipelineStage.ItemReceiver<String> {
        PipelineStage.ItemReceiver.OverflowPolicy policy = PipelineStage.ItemReceiver.OverflowPolicy.SKIP;
        final BlockingQueue<String> signalList;
        final String name;
        static CountDownLatch receiversReady = new CountDownLatch(2);

        public static void awaitReceipt() {
            try {
                receiversReady.await(TestPipelineStage.TIMEOUT_MS, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                TestPipelineStage.LOG.error("Receivers did not process their items in time");
            }
            receiversReady = new CountDownLatch(2);
        }

        public TestReceiver(String str, BlockingQueue<String> blockingQueue) {
            this.name = str;
            this.signalList = blockingQueue;
        }

        public String receiveItem(String str) {
            if (!this.signalList.add(str)) {
                TestPipelineStage.LOG.error(this.name + " has no space to hold new item: " + str);
                return null;
            }
            synchronized (str) {
                try {
                    receiversReady.countDown();
                    str.wait();
                    this.signalList.remove(str);
                } catch (InterruptedException e) {
                }
            }
            TestPipelineStage.LOG.info(this.name + " processed '" + str + "'");
            return str;
        }

        public PipelineStage.ItemReceiver.OverflowPolicy overflowPolicy(String str) {
            return this.policy;
        }

        public void setPolicy(PipelineStage.ItemReceiver.OverflowPolicy overflowPolicy) {
            this.policy = overflowPolicy;
        }
    }

    @Before
    public void setup() {
        this.signalList = new LinkedBlockingQueue<>();
        this.upstream = new PipelineStage<>("upstream", 2, 2, 4);
        this.downstream = new NotificationStage<>();
        this.receiver = new TestReceiver("upstream", this.signalList);
        this.upstream.setItemReceiver(this.receiver);
        this.upstream.setNextStage(this.downstream);
    }

    @After
    public void cleanup() {
        this.upstream.stopService();
        this.downstream.stopService();
    }

    @Test
    @Ignore
    public void testOverflowSkip() throws InterruptedException {
        this.receiver.setPolicy(PipelineStage.ItemReceiver.OverflowPolicy.SKIP);
        for (int i = 0; i < 2; i++) {
            this.upstream.enqueue("submitted " + i);
        }
        for (int i2 = 0; i2 < 3; i2++) {
            this.upstream.enqueue("internal queued " + i2);
        }
        this.downstream.reset(1);
        this.upstream.enqueue("skipped");
        if (!this.downstream.awaitCountdown(TIMEOUT_MS)) {
            LOG.error("Upstream did not skip item within allowed time");
        }
        Assert.assertEquals(0L, this.upstream.getEventsProcessed());
        Assert.assertEquals(3L, this.upstream.getInputQueueSize());
        Assert.assertEquals(1L, this.upstream.getEventsForwarded());
        TestReceiver.awaitReceipt();
        this.downstream.reset(this.signalList.size());
        for (String str : (String[]) this.signalList.toArray(new String[0])) {
            synchronized (str) {
                str.notifyAll();
            }
        }
        if (!this.downstream.awaitCountdown(TIMEOUT_MS)) {
            LOG.error("Pipeline did not finish within allowed time");
        }
        TestReceiver.awaitReceipt();
        Assert.assertEquals(2L, this.upstream.getEventsProcessed());
    }

    @Test
    @Ignore
    public void testOverflowQueue() throws InterruptedException {
        this.receiver.setPolicy(PipelineStage.ItemReceiver.OverflowPolicy.QUEUE);
        for (int i = 0; i < 2; i++) {
            this.upstream.enqueue("submitted " + i);
        }
        for (int i2 = 0; i2 < 3; i2++) {
            this.upstream.enqueue("internal queued " + i2);
        }
        for (int i3 = 0; i3 < 1; i3++) {
            this.upstream.enqueue("congestion queued " + i3);
        }
        Assert.assertEquals("Congestion items queued?", 4L, this.upstream.getInputQueueSize());
        this.upstream.enqueue("dropped");
        Assert.assertEquals("Item dropped?", 1L, this.upstream.getEventsDropped());
        Assert.assertEquals("Queue size unchanged", 4L, this.upstream.getInputQueueSize());
        letItemFinish();
        while (this.signalList.size() != 2) {
            Thread.yield();
        }
        Assert.assertEquals("Internal queue shortened?", 3L, this.upstream.getInputQueueSize());
        Assert.assertEquals(1L, this.upstream.getEventsForwarded());
    }

    void letItemFinish() throws InterruptedException {
        this.downstream.reset(1);
        String take = this.signalList.take();
        synchronized (take) {
            take.notifyAll();
        }
        if (this.downstream.awaitCountdown(TIMEOUT_MS)) {
            return;
        }
        LOG.error("Item '" + take + "' didn't finish within allowed time");
    }
}
