08-disruptor
disruptor 英 [dɪsˈrʌptə] 破坏者;干扰者;干扰器;裂解枪;分裂者
grab 英[ɡræb] v. 攫取,抓住
argument 英 [ˈɑːɡjumənt] n. 论点;争论;论据;辩论;争吵;争辩;理由
简单理解:内存里,存放元素的高效率queue
- 一个线程中每秒处理600万订单
- 2011年duke奖
- 单机速度最快MQ
- 性能极高,无锁cas,单机支持高并发
1. 介绍
2. RingBuffer
ConcurrentLinkedQueue
:链表实现,遍历没有数组快,而且维护首尾指针- JDK中没有
ConcurrentArrayQueue
:复制数组效率太低
- JDK中没有
- Disruptor底层RingBuffer数组实现,没有首尾指针,无锁,高并发
- RingBuffer_sequence,指向下一个可用的元素。直接覆盖(不用清除)旧的数据,降低GC频率
- 实现了基于事件的生产者_消费者模式(观察者模式)
- 假如长度为8,当添加到第12个元素的时候在哪个序号上呢?用(12 % 8)决定
- 当Buffer被填满的时候到底是覆盖还是等待,由Producer决定
- 长度设为2的n次幂,利于二进制计算
- eg:
12 % 8 = 12 & (8 - 1)
=> pos =num & (size -1)
- eg:
3. 开发步骤
- Event。RingBuffer中需要处理的元素
- EventFactory(生产者)
implements EventFactory<>
。用于填充队列- 效率问题:disruptor初始化时,会调用Event工厂,对RingBuffer进行内存的提前分配(饿汉式)
- 不用new,GC频率会降低
- EventHandler(消费者)
implements EventHandler<>
。处理容器中的元素
1. official
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
/**
* RingBuffer中需要处理的元素
*/
@ToString
@Setter
public class EventX {
private long value;
}
/*
* 1. producer
* 1. 效率问题:disruptor初始化,会调用EventFactoryX,对RingBuffer进行内存的提前分配(饿汉式)
* 2. 不用new,GC频率会降低
*/
class EventFactoryX implements EventFactory<EventX> {
@Override
public EventX newInstance() {
return new EventX();
}
}
/*
* 2. consumer
* 处理容器中的元素
*/
@Slf4j
class EventHandlerX implements EventHandler<EventX> {
// 处理消息个数
public static long count = 0;
/**
* onEvent
*
* @param event 消息对象
* @param sequence RingBuffer序号、位置
* @param endOfBatch 是否是最后一个元素
*/
@Override
public void onEvent(EventX event, long sequence, boolean endOfBatch) {
count++;
log.warn(event + " 序号:" + sequence);
}
}
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
/**
* 官方样例
*/
public class V1_Official {
public static void main(String[] args) throws Exception {
// 1. The factory for the event
EventFactoryX factory = new EventFactoryX();
// 2. Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// 3. Construct the Disruptor
// threadFactory生产的thread,执行EventHandlerX.onEvent来消费EventFactoryX生产的eventX
Disruptor<EventX> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());
// 4. Connect the handler
disruptor.handleEventsWith(new EventHandlerX());
// 5. Start the Disruptor, starts all threads running
disruptor.start();
// 6. Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<EventX> ringBuffer = disruptor.getRingBuffer();
// ---------------------------------- Disruptor创建成功 ----------------------------------
long sequence = ringBuffer.next(); // Grab the next sequence
try {
EventX event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.setValue(8888L); // Fill with data
} finally {
ringBuffer.publish(sequence); // 发布位置,等待消费
}
}
}
4. EventTranslator
/**
* EventTranslator
*/
public class V2_EventTranslator {
RingBuffer<EventX> ringBuffer = null;
/**
* Consumer几乎没变。生产者变化了,为Java8_lambda写法做准备
*/
@BeforeEach
public void beforeEach() {
EventFactoryX factory = new EventFactoryX();
int bufferSize = 1024;
// DaemonThreadFactory 后台线程
Disruptor<EventX> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
disruptor.handleEventsWith(new EventHandlerX());
disruptor.start();
ringBuffer = disruptor.getRingBuffer();
}
@AfterEach
public void afterEach() {
LockSupport.park();
}
/*
* new EventTranslator<LongEvent>
*
* <>中的内容不能省略
* reason: '<>' with anonymous inner classes is not supported in -source 8
* (use -source 9 or higher to enable '<>' with anonymous inner classes)
*/
@Test
public void T1_EventTranslator() {
// 匿名内部类
EventTranslator<EventX> translator = new EventTranslator<>() {
// translateTo,填充LongEvent
@Override
public void translateTo(EventX event, long sequence) {
event.setValue(8888L);
}
};
ringBuffer.publishEvent(translator);
}
@Test
public void T2_EventTranslatorOneArg() {
EventTranslatorOneArg<EventX, Long> oneArg = new EventTranslatorOneArg<EventX, Long>() {
@Override
public void translateTo(EventX event, long sequence, Long l) {
event.setValue(l);
}
};
ringBuffer.publishEvent(oneArg, 7777L);
}
@Test
public void T3_EventTranslatorTwoArg() {
EventTranslatorTwoArg<EventX, Long, Long> translator3 = new EventTranslatorTwoArg<EventX, Long, Long>() {
@Override
public void translateTo(EventX event, long sequence, Long l1, Long l2) {
event.setValue(l1 + l2);
}
};
ringBuffer.publishEvent(translator3, 10L, 2L);
}
@Test
public void T4_EventTranslatorThreeArg() {
EventTranslatorThreeArg<EventX, Long, Long, Long> translator4 = new EventTranslatorThreeArg<EventX, Long, Long, Long>() {
@Override
public void translateTo(EventX event, long sequence, Long l1, Long l2, Long l3) {
event.setValue(l1 + l2 + l3);
}
};
ringBuffer.publishEvent(translator4, 100L, 20L, 3L);
}
@Test
public void T5_EventTranslatorVararg() {
EventTranslatorVararg<EventX> translator5 = new EventTranslatorVararg<EventX>() {
@Override
public void translateTo(EventX event, long sequence, Object... objects) {
long result = 0;
for (Object o : objects) {
long l = (Long) o;
result += l;
}
event.setValue(result);
}
};
ringBuffer.publishEvent(translator5, 1000L, 200L, 30L, 4L);
}
}
5. lambda
public class V3_Lambda {
public static void main(String[] args) throws Exception {
int bufferSize = 1024;
// 1. EventFactoryX => lambda
Disruptor<EventX> disruptor = new Disruptor<>(EventX::new, bufferSize, DaemonThreadFactory.INSTANCE);
// 2. EventHandlerX => lambda
disruptor.handleEventsWith((event, sequence, endOfBatch) ->
System.out.println("[" + Thread.currentThread().getName() + "]" + event + " 序号:" + sequence));
disruptor.start();
RingBuffer<EventX> ringBuffer = disruptor.getRingBuffer();
// ---------------------------------- Disruptor创建成功 ----------------------------------
// 3. EventTranslator => lambda
ringBuffer.publishEvent((event, sequence) -> event.setValue(10000L));
ringBuffer.publishEvent((event, sequence, l) -> event.setValue(l), 15000L);
ringBuffer.publishEvent((event, sequence, l1, l2) -> event.setValue(l1 + l2), 10000L, 10000L);
LockSupport.park();
}
}
6. ProducerType
Producer.MULTI
:默认是MULTI。多线程模式下产生EventProducer.SINGLE
:单线程生产者,指定SINGLE,效率会提升
如果是多个生产者(多线程),指定为SINGLE,出问题?
public class Z4_ProducerType {
// consume计数统计
public static long count = 0;
Disruptor<EventX> disruptor;
/**
* 1. ProducerType.SINGLE 模式下不报错,发生并发错误
*/
@Test
public void T1_single() {
int bufferSize = 1024;
// 生产者:单线程模式
disruptor = new Disruptor<>(EventX::new, bufferSize, Executors.defaultThreadFactory(),
ProducerType.SINGLE,
new BlockingWaitStrategy());
ooxx();
}
/**
* 2. ProducerType.MULTI:默认
*/
@Test
public void T2_multi() {
int bufferSize = 1024;
// 生产者:多线程模式
disruptor = new Disruptor<>(EventX::new, bufferSize, Executors.defaultThreadFactory(),
ProducerType.MULTI,
new BlockingWaitStrategy());
ooxx();
}
public void ooxx() {
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
count++;
System.out.println("[" + Thread.currentThread().getName() + "]" + event + " 序号:" + sequence);
});
disruptor.start();
RingBuffer<EventX> ringBuffer = disruptor.getRingBuffer();
// —---------------------------------- producer ----------------------------------
final int threadCount = 50;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
ExecutorService es = Executors.newCachedThreadPool();
for (long i = 0; i < threadCount; i++) {
final long threadNum = i;
es.submit(() -> {
System.out.printf("Thread %s ready to start!\n", threadNum);
try {
// 人满发车
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
for (int j = 0; j < 100; j++) {
ringBuffer.publishEvent((event, sequence) -> {
event.setValue(threadNum);
System.out.println("生产了 " + threadNum);
});
}
});
}
es.shutdown();
// disruptor.shutdown();
ThreadHelper.sleepSeconds(3);
System.out.println("消费次数:" + Z4_ProducerType.count);
}
}
7. Consumer
1. WaitStrategy
producer > consumer 速率,进行WaitStrategy
BlockingWaitStrategy
(默认):线程阻塞,等待生产者唤醒。被唤醒后,再循环检查依赖的sequence是否已经消费BusySpinWaitStrategy
:线程一直自旋等待,可能比较耗cpuLiteBlockingWaitStrategy
:线程阻塞,等待生产者唤醒- 与BWS相比,区别在signalNeeded.getAndSet。如果两个线程同时访问,一个访问waitFor,一个访问signalAll时,可以减少lock加锁次数
LiteTimeoutBlockingWaitStrategy
:与LBW相比,设置了阻塞时间,超过时间后抛异常PhasedBackoffWaitStrategy
:根据时间参数和等待策略参数决定使用哪种等待策略TimeoutBlockingWaitStrategy
:相比BWS,设置了等待时间。超时抛异常。经过异常逻辑,thread继续TBWSYieldingWaitStrategy
(常用):尝试100次,然后Thread.yield()
让出cpuSleepingWaitStrategy
(常用):sleep
2. MultiConsumer
// 1. 重复消费
Disruptor.handleEventsWith()
// 2. 不重复消费
Disruptor.handleEventsWithWorkerPool()
3. Consumer_Exception
// 默认
Disruptor.setDefaultExceptionHandler()
// 自定义
Disruptor.handleExceptionFor().with()
/**
* MultiConsumer
* 多个消费者,位于多个线程
*/
@Slf4j
public class Z5_Consumer {
Disruptor<EventX> disruptor;
static int count = 0;
/*
* 1. WaitStrategy
*
* producer > consumer 速率,进行WaitStrategy
* 1. BlockingWaitStrategy(默认):线程阻塞,等待生产者唤醒。被唤醒后,再循环检查依赖的sequence是否已经消费
* 2. BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu
* 3. LiteBlockingWaitStrategy:线程阻塞,等待生产者唤醒
* 与BWS相比,区别在signalNeeded.getAndSet。如果两个线程同时访问,一个访问waitFor,一个访问signalAll时,可以减少lock加锁次数
* 4. LiteTimeoutBlockingWaitStrategy:与LBW相比,设置了阻塞时间,超过时间后抛异常
* 5. PhasedBackoffWaitStrategy:根据时间参数和等待策略参数决定使用哪种等待策略
* 6. TimeoutBlockingWaitStrategy:相比BWS,设置了等待时间。超时抛异常。经过异常逻辑,thread继续TBWS
* 7. YieldingWaitStrategy(常用):尝试100次,然后Thread.yield()让出cpu
* 8. SleepingWaitStrategy(常用):sleep
*/
@Test
public void T1_WaitStrategy() throws Exception {
/*
* consumer
* 1. EventHandler:consumer处理接口
* 2. TimeoutHandler:超时处理接口
*/
class EventHandlerY implements EventHandler<EventX>, TimeoutHandler {
@Override
public void onEvent(EventX event, long sequence, boolean endOfBatch) {
count++;
// ThreadHelper.sleepMilli(200);
System.out.println("[" + Thread.currentThread().getName() + "]" + event + " 序号:" + sequence);
}
@Override
public void onTimeout(long sequence) throws Exception {
// debug。等待时间,超时抛异常,异常逻辑
System.out.println("onTimeout: " + sequence);
}
}
int bufferSize = 8;
disruptor = new Disruptor<>(EventX::new, bufferSize, Executors.defaultThreadFactory(),
ProducerType.MULTI,
new TimeoutBlockingWaitStrategy(1, TimeUnit.SECONDS));
EventHandlerY eventHandlerY = new EventHandlerY();
disruptor.handleEventsWith(eventHandlerY);
ooxx();
}
/*
* 2. MultiConsumer,重复消费(handleEventsWith)
* handler(BatchEventProcessor)都自己维护一个sequence,互不干扰,重复消费
*
* 打印出不同consumer_thread的输出
*/
@Test
public void T2_MultiConsumer() throws Exception {
int bufferSize = 1024;
disruptor = new Disruptor<>(EventX::new, bufferSize, Executors.defaultThreadFactory(),
ProducerType.MULTI,
new SleepingWaitStrategy());
// MultiConsumer,每个consumer可以采用不同的onEvent()处理方式
disruptor.handleEventsWith(
(event, sequence, endOfBatch) -> {
count++;
log.error("event: {}", event);
},
(event, sequence, endOfBatch) -> {
count++;
log.error("event: {}", event);
}
);
ooxx();
}
/**
* 3. MultiConsumer,不重复消费(handleEventsWithWorkerPool)
* handler(workerProcessor)同享一个workSequence,不会重复消费
*/
@Test
public void T3_MultiConsumer() throws Exception {
int bufferSize = 1024;
disruptor = new Disruptor<>(EventX::new, bufferSize, Executors.defaultThreadFactory(),
ProducerType.MULTI,
new SleepingWaitStrategy());
// MultiConsumer,每个consumer可以采用不同的onEvent()处理方式
disruptor.handleEventsWithWorkerPool(
(event) -> {
count++;
log.error("event: {}", event);
},
(event) -> {
count++;
log.error("event: {}", event);
}
);
ooxx();
}
@Test
public void T4_Consumer_Exception() throws Exception {
int bufferSize = 1024;
disruptor = new Disruptor<>(EventX::new, bufferSize, Executors.defaultThreadFactory(),
ProducerType.MULTI,
new SleepingWaitStrategy());
EventHandler<EventX> eventHandler = (event, sequence, endOfBatch) -> {
count++;
System.out.println(event);
throw new Exception("消费者出异常");
};
disruptor.handleEventsWith(eventHandler);
disruptor.handleExceptionsFor(eventHandler).with(new ExceptionHandler<>() {
// event产生异常
@Override
public void handleEventException(Throwable throwable, long l, EventX longEvent) {
throwable.printStackTrace();
}
// Start出异常
@Override
public void handleOnStartException(Throwable throwable) {
System.out.println("Exception Start to Handle!");
}
// Shutdown出异常
@Override
public void handleOnShutdownException(Throwable throwable) {
System.out.println("Exception Handled!");
}
});
ooxx();
}
public void ooxx() throws Exception {
disruptor.start();
RingBuffer<EventX> ringBuffer = disruptor.getRingBuffer();
final int threadCount = 10;
CyclicBarrier cb = new CyclicBarrier(threadCount);
ExecutorService es = Executors.newCachedThreadPool();
for (long i = 0; i < threadCount; i++) {
final long threadNum = i;
es.submit(() -> {
System.out.printf("Thread %s ready to start!\n", threadNum);
try {
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
for (int j = 0; j < 10; j++) {
ringBuffer.publishEvent((event, sequence) -> {
event.setValue(threadNum);
System.out.println("生产了 " + threadNum);
});
}
});
}
es.shutdown();
// disruptor.shutdown();
TimeUnit.SECONDS.sleep(3);
System.out.println("count:" + count);
}
}