08-disruptor

disruptor 英 [dɪsˈrʌptə] 破坏者;干扰者;干扰器;裂解枪;分裂者

grab 英[ɡræb] v. 攫取,抓住

argument 英 [ˈɑːɡjumənt] n. 论点;争论;论据;辩论;争吵;争辩;理由

简单理解:内存里,存放元素的高效率queue

  • 一个线程中每秒处理600万订单
  • 2011年duke奖
  • 单机速度最快MQ
  • 性能极高,无锁cas,单机支持高并发

1. 介绍

  1. 主页open in new window
  2. 源码open in new window
  3. GettingStartedopen in new window
  4. apiopen in new window
  5. mavenopen in new window

2. RingBuffer

  1. ConcurrentLinkedQueue:链表实现,遍历没有数组快,而且维护首尾指针
    • JDK中没有ConcurrentArrayQueue:复制数组效率太低
  2. Disruptor底层RingBuffer数组实现,没有首尾指针,无锁,高并发
  3. RingBuffer_sequence,指向下一个可用的元素。直接覆盖(不用清除)旧的数据,降低GC频率
  4. 实现了基于事件的生产者_消费者模式(观察者模式)
image-20230507203112566
  1. 假如长度为8,当添加到第12个元素的时候在哪个序号上呢?用(12 % 8)决定
  2. 当Buffer被填满的时候到底是覆盖还是等待,由Producer决定
  3. 长度设为2的n次幂,利于二进制计算
    • eg:12 % 8 = 12 & (8 - 1) => pos = num & (size -1)

3. 开发步骤

  1. Event。RingBuffer中需要处理的元素
  2. EventFactory(生产者)implements EventFactory<>。用于填充队列
    1. 效率问题:disruptor初始化时,会调用Event工厂,对RingBuffer进行内存的提前分配(饿汉式)
    2. 不用new,GC频率会降低
  3. EventHandler(消费者)implements EventHandler<>。处理容器中的元素

1. official

<dependency>
   <groupId>com.lmax</groupId>
   <artifactId>disruptor</artifactId>
   <version>3.4.2</version>
</dependency>
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;

/**
 * RingBuffer中需要处理的元素
 */
@ToString
@Setter
public class EventX {
    private long value;
}

/*
 * 1. producer
 *      1. 效率问题:disruptor初始化,会调用EventFactoryX,对RingBuffer进行内存的提前分配(饿汉式)
 *      2. 不用new,GC频率会降低
 */
class EventFactoryX implements EventFactory<EventX> {

    @Override
    public EventX newInstance() {
        return new EventX();
    }
}

/*
 * 2. consumer
 *      处理容器中的元素
 */
@Slf4j
class EventHandlerX implements EventHandler<EventX> {
    // 处理消息个数
    public static long count = 0;

    /**
     * onEvent
     *
     * @param event      消息对象
     * @param sequence   RingBuffer序号、位置
     * @param endOfBatch 是否是最后一个元素
     */
    @Override
    public void onEvent(EventX event, long sequence, boolean endOfBatch) {
        count++;
        log.warn(event + " 序号:" + sequence);
    }
}
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;

/**
 * 官方样例
 */
public class V1_Official {
    public static void main(String[] args) throws Exception {
        // 1. The factory for the event
        EventFactoryX factory = new EventFactoryX();

        // 2. Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // 3. Construct the Disruptor
        // threadFactory生产的thread,执行EventHandlerX.onEvent来消费EventFactoryX生产的eventX
        Disruptor<EventX> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());

        // 4. Connect the handler
        disruptor.handleEventsWith(new EventHandlerX());

        // 5. Start the Disruptor, starts all threads running
        disruptor.start();

        // 6. Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<EventX> ringBuffer = disruptor.getRingBuffer();

// ---------------------------------- Disruptor创建成功 ----------------------------------

        long sequence = ringBuffer.next();  // Grab the next sequence
        try {
            EventX event = ringBuffer.get(sequence); // Get the entry in the Disruptor
            // for the sequence
            event.setValue(8888L);  // Fill with data
        } finally {
            ringBuffer.publish(sequence); // 发布位置,等待消费
        }
    }
}


















 


 





 





 

 

 



4. EventTranslator

/**
 * EventTranslator
 */
public class V2_EventTranslator {
    RingBuffer<EventX> ringBuffer = null;

    /**
     * Consumer几乎没变。生产者变化了,为Java8_lambda写法做准备
     */
    @BeforeEach
    public void beforeEach() {
        EventFactoryX factory = new EventFactoryX();
        int bufferSize = 1024;

        // DaemonThreadFactory 后台线程
        Disruptor<EventX> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

        disruptor.handleEventsWith(new EventHandlerX());
        disruptor.start();
        ringBuffer = disruptor.getRingBuffer();
    }

    @AfterEach
    public void afterEach() {
        LockSupport.park();
    }

    /*
     * new EventTranslator<LongEvent>
     *
     * <>中的内容不能省略
     * reason: '<>' with anonymous inner classes is not supported in -source 8
     * (use -source 9 or higher to enable '<>' with anonymous inner classes)
     */
    @Test
    public void T1_EventTranslator() {
        // 匿名内部类
        EventTranslator<EventX> translator = new EventTranslator<>() {
            // translateTo,填充LongEvent
            @Override
            public void translateTo(EventX event, long sequence) {
                event.setValue(8888L);
            }
        };
        ringBuffer.publishEvent(translator);
    }

    @Test
    public void T2_EventTranslatorOneArg() {
        EventTranslatorOneArg<EventX, Long> oneArg = new EventTranslatorOneArg<EventX, Long>() {
            @Override
            public void translateTo(EventX event, long sequence, Long l) {
                event.setValue(l);
            }
        };
        ringBuffer.publishEvent(oneArg, 7777L);
    }

    @Test
    public void T3_EventTranslatorTwoArg() {
        EventTranslatorTwoArg<EventX, Long, Long> translator3 = new EventTranslatorTwoArg<EventX, Long, Long>() {
            @Override
            public void translateTo(EventX event, long sequence, Long l1, Long l2) {
                event.setValue(l1 + l2);
            }
        };
        ringBuffer.publishEvent(translator3, 10L, 2L);
    }

    @Test
    public void T4_EventTranslatorThreeArg() {
        EventTranslatorThreeArg<EventX, Long, Long, Long> translator4 = new EventTranslatorThreeArg<EventX, Long, Long, Long>() {
            @Override
            public void translateTo(EventX event, long sequence, Long l1, Long l2, Long l3) {
                event.setValue(l1 + l2 + l3);
            }
        };
        ringBuffer.publishEvent(translator4, 100L, 20L, 3L);
    }

    @Test
    public void T5_EventTranslatorVararg() {
        EventTranslatorVararg<EventX> translator5 = new EventTranslatorVararg<EventX>() {
            @Override
            public void translateTo(EventX event, long sequence, Object... objects) {
                long result = 0;
                for (Object o : objects) {
                    long l = (Long) o;
                    result += l;
                }
                event.setValue(result);
            }
        };
        ringBuffer.publishEvent(translator5, 1000L, 200L, 30L, 4L);
    }

}











 



 

 

 

















 
 
 
 
 
 
 





 
 
 
 
 
 





 
 
 
 
 
 





 
 
 
 
 
 





 
 
 
 
 
 
 
 
 
 
 




5. lambda

public class V3_Lambda {
    public static void main(String[] args) throws Exception {
        int bufferSize = 1024;

        // 1. EventFactoryX => lambda
        Disruptor<EventX> disruptor = new Disruptor<>(EventX::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // 2. EventHandlerX => lambda
        disruptor.handleEventsWith((event, sequence, endOfBatch) ->
                System.out.println("[" + Thread.currentThread().getName() + "]" + event + " 序号:" + sequence));

        disruptor.start();

        RingBuffer<EventX> ringBuffer = disruptor.getRingBuffer();

// ---------------------------------- Disruptor创建成功 ----------------------------------

        // 3. EventTranslator => lambda
        ringBuffer.publishEvent((event, sequence) -> event.setValue(10000L));
        ringBuffer.publishEvent((event, sequence, l) -> event.setValue(l), 15000L);
        ringBuffer.publishEvent((event, sequence, l1, l2) -> event.setValue(l1 + l2), 10000L, 10000L);

        LockSupport.park();
    }
}





 


 
 








 
 
 




6. ProducerType

  1. Producer.MULTI:默认是MULTI。多线程模式下产生Event
  2. Producer.SINGLE:单线程生产者,指定SINGLE,效率会提升

如果是多个生产者(多线程),指定为SINGLE,出问题?

public class Z4_ProducerType {
    // consume计数统计
    public static long count = 0;
    Disruptor<EventX> disruptor;

    /**
     * 1. ProducerType.SINGLE 模式下不报错,发生并发错误
     */
    @Test
    public void T1_single() {
        int bufferSize = 1024;

        // 生产者:单线程模式
        disruptor = new Disruptor<>(EventX::new, bufferSize, Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new BlockingWaitStrategy());
        ooxx();
    }

    /**
     * 2. ProducerType.MULTI:默认
     */
    @Test
    public void T2_multi() {
        int bufferSize = 1024;

        // 生产者:多线程模式
        disruptor = new Disruptor<>(EventX::new, bufferSize, Executors.defaultThreadFactory(),
                ProducerType.MULTI,
                new BlockingWaitStrategy());
        ooxx();
    }

    public void ooxx() {
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
            count++;
            System.out.println("[" + Thread.currentThread().getName() + "]" + event + " 序号:" + sequence);
        });

        disruptor.start();

        RingBuffer<EventX> ringBuffer = disruptor.getRingBuffer();

// —---------------------------------- producer ----------------------------------

        final int threadCount = 50;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);
        ExecutorService es = Executors.newCachedThreadPool();
        for (long i = 0; i < threadCount; i++) {
            final long threadNum = i;
            es.submit(() -> {
                System.out.printf("Thread %s ready to start!\n", threadNum);
                try {
                    // 人满发车
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }

                for (int j = 0; j < 100; j++) {
                    ringBuffer.publishEvent((event, sequence) -> {
                        event.setValue(threadNum);
                        System.out.println("生产了 " + threadNum);
                    });
                }
            });
        }

        es.shutdown();
        // disruptor.shutdown();
        ThreadHelper.sleepSeconds(3);
        System.out.println("消费次数:" + Z4_ProducerType.count);
    }
}














 
 












 
 
























 





 
 
 
 










7. Consumer

1. WaitStrategy

producer > consumer 速率,进行WaitStrategy

  1. BlockingWaitStrategy(默认):线程阻塞,等待生产者唤醒。被唤醒后,再循环检查依赖的sequence是否已经消费
  2. BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu
  3. LiteBlockingWaitStrategy:线程阻塞,等待生产者唤醒
    • 与BWS相比,区别在signalNeeded.getAndSet。如果两个线程同时访问,一个访问waitFor,一个访问signalAll时,可以减少lock加锁次数
  4. LiteTimeoutBlockingWaitStrategy:与LBW相比,设置了阻塞时间,超过时间后抛异常
  5. PhasedBackoffWaitStrategy:根据时间参数和等待策略参数决定使用哪种等待策略
  6. TimeoutBlockingWaitStrategy:相比BWS,设置了等待时间。超时抛异常。经过异常逻辑,thread继续TBWS
  7. YieldingWaitStrategy(常用):尝试100次,然后Thread.yield()让出cpu
  8. SleepingWaitStrategy(常用):sleep

2. MultiConsumer

// 1. 重复消费
Disruptor.handleEventsWith()

// 2. 不重复消费
Disruptor.handleEventsWithWorkerPool()

3. Consumer_Exception

// 默认
Disruptor.setDefaultExceptionHandler()

// 自定义
Disruptor.handleExceptionFor().with()
/**
 * MultiConsumer
 * 多个消费者,位于多个线程
 */
@Slf4j
public class Z5_Consumer {
    Disruptor<EventX> disruptor;
    static int count = 0;

    /*
     * 1. WaitStrategy
     *
     * producer > consumer 速率,进行WaitStrategy
     * 1. BlockingWaitStrategy(默认):线程阻塞,等待生产者唤醒。被唤醒后,再循环检查依赖的sequence是否已经消费
     * 2. BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu
     * 3. LiteBlockingWaitStrategy:线程阻塞,等待生产者唤醒
     *    与BWS相比,区别在signalNeeded.getAndSet。如果两个线程同时访问,一个访问waitFor,一个访问signalAll时,可以减少lock加锁次数
     * 4. LiteTimeoutBlockingWaitStrategy:与LBW相比,设置了阻塞时间,超过时间后抛异常
     * 5. PhasedBackoffWaitStrategy:根据时间参数和等待策略参数决定使用哪种等待策略
     * 6. TimeoutBlockingWaitStrategy:相比BWS,设置了等待时间。超时抛异常。经过异常逻辑,thread继续TBWS
     * 7. YieldingWaitStrategy(常用):尝试100次,然后Thread.yield()让出cpu
     * 8. SleepingWaitStrategy(常用):sleep
     */
    @Test
    public void T1_WaitStrategy() throws Exception {
        /*
         * consumer
         * 1. EventHandler:consumer处理接口
         * 2. TimeoutHandler:超时处理接口
         */
        class EventHandlerY implements EventHandler<EventX>, TimeoutHandler {

            @Override
            public void onEvent(EventX event, long sequence, boolean endOfBatch) {
                count++;
                // ThreadHelper.sleepMilli(200);
                System.out.println("[" + Thread.currentThread().getName() + "]" + event + " 序号:" + sequence);
            }

            @Override
            public void onTimeout(long sequence) throws Exception {
                // debug。等待时间,超时抛异常,异常逻辑
                System.out.println("onTimeout: " + sequence);
            }
        }

        int bufferSize = 8;
        disruptor = new Disruptor<>(EventX::new, bufferSize, Executors.defaultThreadFactory(),
                ProducerType.MULTI,
                new TimeoutBlockingWaitStrategy(1, TimeUnit.SECONDS));

        EventHandlerY eventHandlerY = new EventHandlerY();
        disruptor.handleEventsWith(eventHandlerY);

        ooxx();
    }

    /*
     * 2. MultiConsumer,重复消费(handleEventsWith)
     *      handler(BatchEventProcessor)都自己维护一个sequence,互不干扰,重复消费
     *
     *      打印出不同consumer_thread的输出
     */
    @Test
    public void T2_MultiConsumer() throws Exception {
        int bufferSize = 1024;

        disruptor = new Disruptor<>(EventX::new, bufferSize, Executors.defaultThreadFactory(),
                ProducerType.MULTI,
                new SleepingWaitStrategy());

        // MultiConsumer,每个consumer可以采用不同的onEvent()处理方式
        disruptor.handleEventsWith(
                (event, sequence, endOfBatch) -> {
                    count++;
                    log.error("event: {}", event);
                },
                (event, sequence, endOfBatch) -> {
                    count++;
                    log.error("event: {}", event);
                }
        );
        ooxx();
    }

    /**
     * 3. MultiConsumer,不重复消费(handleEventsWithWorkerPool)
     *      handler(workerProcessor)同享一个workSequence,不会重复消费
     */
    @Test
    public void T3_MultiConsumer() throws Exception {
        int bufferSize = 1024;

        disruptor = new Disruptor<>(EventX::new, bufferSize, Executors.defaultThreadFactory(),
                ProducerType.MULTI,
                new SleepingWaitStrategy());

        // MultiConsumer,每个consumer可以采用不同的onEvent()处理方式
        disruptor.handleEventsWithWorkerPool(
                (event) -> {
                    count++;
                    log.error("event: {}", event);
                },
                (event) -> {
                    count++;
                    log.error("event: {}", event);
                }
        );
        ooxx();
    }

    @Test
    public void T4_Consumer_Exception() throws Exception {
        int bufferSize = 1024;

        disruptor = new Disruptor<>(EventX::new, bufferSize, Executors.defaultThreadFactory(),
                ProducerType.MULTI,
                new SleepingWaitStrategy());

        EventHandler<EventX> eventHandler = (event, sequence, endOfBatch) -> {
            count++;
            System.out.println(event);
            throw new Exception("消费者出异常");
        };

        disruptor.handleEventsWith(eventHandler);

        disruptor.handleExceptionsFor(eventHandler).with(new ExceptionHandler<>() {

            // event产生异常
            @Override
            public void handleEventException(Throwable throwable, long l, EventX longEvent) {
                throwable.printStackTrace();
            }

            // Start出异常
            @Override
            public void handleOnStartException(Throwable throwable) {
                System.out.println("Exception Start to Handle!");
            }

            // Shutdown出异常
            @Override
            public void handleOnShutdownException(Throwable throwable) {
                System.out.println("Exception Handled!");
            }
        });

        ooxx();
    }


    public void ooxx() throws Exception {
        disruptor.start();

        RingBuffer<EventX> ringBuffer = disruptor.getRingBuffer();

        final int threadCount = 10;
        CyclicBarrier cb = new CyclicBarrier(threadCount);
        ExecutorService es = Executors.newCachedThreadPool();
        for (long i = 0; i < threadCount; i++) {
            final long threadNum = i;
            es.submit(() -> {
                System.out.printf("Thread %s ready to start!\n", threadNum);
                try {
                    cb.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }

                for (int j = 0; j < 10; j++) {
                    ringBuffer.publishEvent((event, sequence) -> {
                        event.setValue(threadNum);
                        System.out.println("生产了 " + threadNum);
                    });
                }
            });
        }

        es.shutdown();
        // disruptor.shutdown();
        TimeUnit.SECONDS.sleep(3);
        System.out.println("count:" + count);
    }

}