disruptor 线程安全吗
disruptor一个生产者和多个生产者的区别
一个生产者的时候用的是SingleProducerSequencer,多个的时候用的是MultiProducerSequencer.
SingleProducerSequencer直接用cursor去判断,应为只有一个生产者的cursor,所以是没问题的.
public boolean isAvailable(long sequence)
{
return sequence <= cursor.get();
}
但是MultiProducerSequencer时候,每一个生产者都有cursor,disruptor采取的做法是用一个数组来存.
而且这个数组的大小和RingBuffer的大小是一样的.因为数组的一个位置只会由一个生产者获取,而且一个位置只能被某一圈的时候获取.
(获取一个位置两次,表示一个生产者从后面追上另外一个生产者,这个是不允许的.)
这里说说他们next方法的不同:
SingleProducerSequencer,完全不同sequence来表示位置,而是用了Padding类,缓存同样也用的是Padding类
private static class Padding
{
public long nextValue = Sequence.INITIAL_VALUE, cachedValue = Sequence.INITIAL_VALUE, p2, p3, p4, p5, p6, p7;
}
可以看到Padding的p2到p7是没用的,只是为了凑够64字节.因为cpu的缓存是64字节.这样做是为了避免缓存失效.Sequence类也做了同样的事情
可以看到Padding不是线程安全的,但是对于SingleProducerSequencer来说,只有一个生产者,足够了.
MultiProducerSequencer的next方法用cursor(Sequence类)来表示位置信息.用gatingSequenceCache(Sequence类)来缓存位置信息,这是为什么呢.
来看看Sequence类吧
public long get()
{
return UNSAFE.getLongVolatile(paddedValue, VALUE_OFFSET);
}
public void set(final long value)
{
UNSAFE.putOrderedLong(paddedValue, VALUE_OFFSET, value);
}
可以看到Sequence类是线程安全的,在MultiProducerSequencer 多个生产者的情况下必须要保障线程安全的.
但是说了这么多,感觉disruptor不可能会出现多个生产者.为什么呢.
在RingBuffer的构造函数中只能传入一个Sequencer对象,一个对象怎么多个生产者.
而且Sequencer不是线程类,完全没办法多线程.
Disruptor 源码分析 一个生产者和多个生产者的区别
disruptor一个生产者和多个生产者的区别
一个生产者的时候用的是SingleProducerSequencer,多个的时候用的是MultiProducerSequencer.
SingleProducerSequencer直接用cursor去判断,应为只有一个生产者的cursor,所以是没问题的.
public boolean isAvailable(long sequence)
{
return sequence <= cursor.get();
}
但是MultiProducerSequencer时候,每一个生产者都有cursor,disruptor采取的做法是用一个数组来存.
而且这个数组的大小和RingBuffer的大小是一样的.因为数组的一个位置只会由一个生产者获取,而且一个位置只能被某一圈的时候获取.
(获取一个位置两次,表示一个生产者从后面追上另外一个生产者,这个是不允许的.)
这里说说他们next方法的不同:
SingleProducerSequencer,完全不同sequence来表示位置,而是用了Padding类,缓存同样也用的是Padding类
private static class Padding
{
public long nextValue = Sequence.INITIAL_VALUE, cachedValue = Sequence.INITIAL_VALUE, p2, p3, p4, p5, p6, p7;
}
可以看到Padding的p2到p7是没用的,只是为了凑够64字节.因为cpu的缓存是64字节.这样做是为了避免缓存失效.Sequence类也做了同样的事情
可以看到Padding不是线程安全的,但是对于SingleProducerSequencer来说,只有一个生产者,足够了.
MultiProducerSequencer的next方法用cursor(Sequence类)来表示位置信息.用gatingSequenceCache(Sequence类)来缓存位置信息,这是为什么呢.
来看看Sequence类吧
public long get()
{
return UNSAFE.getLongVolatile(paddedValue, VALUE_OFFSET);
}
public void set(final long value)
{
UNSAFE.putOrderedLong(paddedValue, VALUE_OFFSET, value);
}
可以看到Sequence类是线程安全的,在MultiProducerSequencer 多个生产者的情况下必须要保障线程安全的.
但是说了这么多,感觉disruptor不可能会出现多个生产者.为什么呢.
在RingBuffer的构造函数中只能传入一个Sequencer对象,一个对象怎么多个生产者.
而且Sequencer不是线程类,完全没办法多线程.
new producer;是线程安全的吗
Spring框架里的bean,或者说组件,获取实例的时候都是默认的单例模式。
单例模式的意思就是只有一个实例。单例模式确保某一个类只有一个实例,而且自行实例化并向整个系统提供这个实例。这个类称为单例类。
当多用户同时请求一个服务时,容器会给每一个请求分配一个线程,这是多个线程会并发执行该请求多对应的业务逻辑(成员方法)。
public class PersonController{
private PersonService personService;
public void setFirstName(HttpServletRequest request){
personService.setFirstName(request.getParameter("firstname"));
}
public String getFirstName(){
return personService.getFirstName();
}
}
disruptor ringbuffer 缓冲区大小的单位是字节吗
我对Disruptor的最初印象就是ringbuffer。但是后来我意识到尽管ringbuffer是整个模式(Disruptor)的核心,但是Disruptor对ringbuffer的访问控制策略才是真正的关键点所在。
ringbuffer到底是什么?
嗯,正如名字所说的一样,它是一个环(首尾相接的环),你可以把它用做在不同上下文(线程)间传递数据的buffer。
基本来说,ringbuffer拥有一个序号,这个序号指向数组中下一个可用的元素。(校对注:如下图右边的图片表示序号,这个序号指向数组的索引4的位置。)
随着你不停地填充这个buffer(可能也会有相应的读取),这个序号会一直增长,直到绕过这个环。
要找到数组中当前序号指向的元素,可以通过mod操作:
以上面的ringbuffer为例(java的mod语法):12 % 10 = 2。很简单吧。
事实上,上图中的ringbuffer只有10个槽完全是个意外。如果槽的个数是2的N次方更有利于基于二进制的计算机进行计算。
(校对注:2的N次方换成二进制就是1000,100,10,1这样的数字, sequence & (array length-1) = array index,比如一共有8槽,3&(8-1)=3,HashMap就是用这个方式来定位数组元素的,这种方式比取模的速度更快。)
如何让Java以光的速度跨线程通信
一个比Disruptor吞吐量等性能指标更好的框架,使用Railway算法,将线程之间的消费发送参考现实生活中火车在站点之间搬运货物。
目标起始于一个简单的想法:创建一个开发人员友好的,简单的,轻量级线程间的通信框架,无需使用任何锁,同步器,信号量,等待,通知以及没有队列,消息,事件或任何其它并发特定的语法或工具。
只是一个Java接口接受到POJO以后在其背后实现这个通信,这个主意很类似Akka的Actors,但是它也许是有点矫枉过正,特别是对于单个多核计算机上线程间的通信优化必须是轻量的。
Akka的伟大之处是跨进程通信,特别是Actor是能够跨越不同JVM节点实现分布式通信。
无论如何,你可能觉得使用Akka在一个小型项目上有些过度,因为你只需要线程之间的通信,但是你还是想使用类似Actor这种做法模式。
该文章作者使用了动态代理 堵塞队列和一个缓存的线程池创建了这个解决方案,如图:
SPSC队列是一个Single Producer/Single Consumer 队列(单生产者/单消费者),而MPSC是一个Multi Producer/Single Consumer队列。
Dispatcher线程从Actor线程接受到消息,然后发送到相应的SPSC中。
Actor线程从接受的消息中使用数据,调用相应的actor类的方法,Actor实例都是发送消息给MPSC队列,然后再从Actor线程那里得到消息。
下面是ping-pong案例:
public interface PlayerA (
void pong(long ball); //send and forget method call
}
public interface PlayerB {
void ping(PlayerA playerA, long ball); //send and forget method call
}
public class PlayerAImpl implements PlayerA {
@Override
@ublic void pong(long ball) {
}
}
public class PlayerBImpl implements PlayerB {
@Override
public void ping(PlayerA playerA, long ball) {
playerA.pong(ball);
}
}
public class PingPongExample {
public void testPingPong() {
// this manager hides the complexity of inter-thread communications
// and it takes control over actor proxies, actor implementations and threads
ActorManager manager = new ActorManager();
// registers actor implementations inside the manager
manager.registerImpl(PlayerAImpl.class);
manager.registerImpl(PlayerBImpl.class);
//Create actor proxies. Proxies convert method calls into internal messages
//which would be sent between threads to a specific actor instance.
PlayerA playerA = manager.createActor(PlayerA.class);
PlayerB playerB = manager.createActor(PlayerB.class);
for(int i = 0; i < 1000000; i++) {
playerB.ping(playerA, i);
}
}
这两个play能够每秒打500,000个乒乓。但是如果和单个线程执行速度相比,还是很差的,同样代码在单个线程可以到达每秒两百万个。
作者开始研究缓慢的原因,在一些校验和测试以后,他认为是Actors之间发送消息影响了整体性能:
作者找到一个SPSC单生产者和单消费者的无锁队列,http://www.infoq.com/presentations/Lock-Free-Algorithms
无锁队列提供比锁队列更好的性能。锁队列中在当一个线程获得锁,其他线程将被阻塞,直到该锁被释放的。在无锁算法的情况下,生产者线程可以产生消息,但不阻止其他生产者线程,以及其他消费者,而从队列中读取的消费者不会被阻塞。
这个无锁队列据测试结果是超过每秒100M ops,是JDK的并发队列实现的10倍。
但是作者使用这个无锁队列提到SPSC 以后,并没有产生明显性能提升,他立即意识到这个框架的性能瓶颈不是在SPSC,而是在多个生产者/单个消费者(MPSC)那里。
多个生产者如果使用SPSC会覆盖彼此的值,因为SPSC并没有一个对生产者的控制机制,即使最快的SPSC也不适合。
对于MPSC作者找到了LMAX的disruptor,一个通过Ringbuffer实现的高性能线程间通信库包。
使用Disruptor很容易实现非常低延迟,高吞吐量的线程间消息通信。它还提供了用例对生产者和消费者的不同组合。多个线程可以从环形缓冲区中读取而不会阻塞对方:
多生产者和多消费者:
三个生产者/一个消费者测试结果显示,Disruptor都是两倍于LinkedBlockingQueue 。
但是使用Disruptor后的这个框架性能还是没有达到预期,作者从上下班的地铁中得到灵感,在某个站点同一车厢出来的人是生产者,进去的是消费者。
建立一个Railway类,使用AtomicLong来跟踪地铁在站与站之间的传递,下面是一个single-train railway:
public class RailWay {
private final Train train = new Train();
//站台号码stationNo 跟踪火车,定义哪个站点接受火车
private final AtomicInteger stationIndex = new AtomicInteger();
//多线程访问这个方法,也就是在特定站点等待火车
public Train waitTrainOnStation(final int stationNo) {
while (stationIndex.get() % stationCount != stationNo) {
Thread.yield(); // this is necessary to keep a high throughput of message passing.
//But it eats CPU cycles while waiting for a train
}
// the busy loop returns only when the station number will match
// stationIndex.get() % stationCount condition
return train;
}
//这个方法通过增加火车站台号将火车移到下一个站点。 public void sendTrain() {
stationIndex.getAndIncrement();
}
}
参考Disruptor,创建线程间传递long值:
public class Train {
//
public static int CAPACITY = 2*1024;
private final long[] goodsArray; // array to transfer freight goods
private int index;
public Train() {
goodsArray = new long[CAPACITY];
}
public int goodsCount() { // returns the count of goods
return index;
}
public void addGoods(long i) { // adds item to the train
goodsArray[index++] = i;
}
public long getGoods(int i) { //removes the item from the train
index--;
return goodsArray[i];
}
}
如下图两个线程传递long:
使用一列火车实现单个生产者单个消费者:
public void testRailWay() {
final Railway railway = new Railway();
final long n = 20000000000l;
//starting a consumer thread
new Thread() {
long lastValue = 0;
@Override
public void run() {
while (lastValue < n) {
Train train = railway.waitTrainOnStation(1); //waits for the train at the station #1
int count = train.goodsCount();
for (int i = 0; i < count; i++) {
lastValue = train.getGoods(i); // unload goods
}
railway.sendTrain(); //sends the current train to the first station.
}
}
}.start();
final long start = System.nanoTime();
long i = 0;
while (i < n) {
Train train = railway.waitTrainOnStation(0); // waits for the train on the station #0
int capacity = train.getCapacity();
for (int j = 0; j < capacity; j++) {
train.addGoods((int)i++); // adds goods to the train
}
railway.sendTrain();
if (i % 100000000 == 0) { //measures the performance per each 100M items
final long duration = System.nanoTime() - start;|
final long ops = (i * 1000L * 1000L * 1000L) / duration;
System.out.format("ops/sec = %,d\n", ops);
System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY);
System.out.format("latency nanos = %.3f%n\n",
duration / (float)(i) * (float) Train.CAPACITY);
}
}
}