实现思路:利用单向链表来保存队列的数据,在往队列中添加元素的时候,新建一个节点,加入到队尾,加入到队尾的操作,利用CAS原理加乐观锁,另外,还需要将新加的节点设置为新的队尾,此步操作也需要利用CAS,head与tail变量用volatile修饰,保证多线程环境下的线程可见性。
相关代码如下:
import sun.misc.Unsafe;
import java.lang.reflect.Field;
public class MyLinkQueue {
//头节点
private volatile Node head;
//尾节点
private volatile Node tail;
//引入Unsafe类
private static final sun.misc.Unsafe UNSAFE;
//头节点的内存偏移地址
private static final long headOffset;
//尾节点的内存偏移地址
private static final long tailOffset;
public MyLinkQueue() {
head = tail = new Node(null);
}
static {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
UNSAFE = (Unsafe) f.get(null);
Class k = MyLinkQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
} catch (Exception e) {
throw new Error(e);
}
}
/**
* 内部节点类
*
* @param
*/
private static class Node {
volatile E item;
volatile Node next;
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
//构造方法
public Node(E item) {
this.item = item;
}
static {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
UNSAFE = (Unsafe) f.get(null);
Class k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
public boolean casNext(Node o, Node newNode) {
return UNSAFE.compareAndSwapObject(this, nextOffset, o, newNode);
}
public boolean casHeadItem(Object o, E item) {
return UNSAFE.compareAndSwapObject(this,itemOffset,o,item);
}
}
/**
* 入队方法
*/
public boolean offer(E item) {
//入队元素不能为空
if (null == item) {
throw new NullPointerException();
}
//创建一个新节点
Node newNode = new Node(item);
for (; ; ) {
//头节点存储的内容为空,表示刚创建的队列,利用cas设置头节点保存item
if(null==head.item){
if(head.casHeadItem(null,item)){
return true;
}
}
Node q = tail.next;
//如果尾节点的下一个节点为空,利用cas设置新节点为尾节点的下一个节点
if (null == q) {
if (tail.casNext(null, newNode)) {
return true;
}
}
//尾节点的下一个节点不为空,设置q为新的尾节点
else {
casTail(tail, q);
}
}
}
/**
* 出队方法
*(队列先进先出,取head保存的元素)
* @return
*/
public E poll() {
for(;;){
//取出头节点
Node q = head;
//取出头节点的下一个节点
Node result = q.next;
E obj = q.item;
if(null!=result){
//将reuslt节点利用cas原理设置为新的头节点
if(casHead(q,result)){
return obj;
}
}else{
//头节点的下一个节点为空,此时head=tail,如果head的item为空,直接返回null,否则应该利用cas设置为null
if(null==obj){
return obj;
}
if(head.casHeadItem(obj,null)){
return obj;
}
}
}
}
private boolean casHead(Node q, Node result) {
return UNSAFE.compareAndSwapObject(this,headOffset,q,result);
}
private void casTail(Node p, Node q) {
UNSAFE.compareAndSwapObject(this, tailOffset, p, q);
}
/**
* 统计队列大小的方法
*
* @return
*/
public int size() {
int count = 0;
Node start = head;
if(null==head.item){
return 0;
}
for (; ; ) {
if (null == start) {
return count;
}
++count;
start = start.next;
}
}
}
相关测试代码如下,测试方法是开启100个线程,每个线程内部循环10次往队列中添加元素,经过测试,最后队列的大小为1000,没有出现线程安全问题。然后开启100个线程,每个线程循环10次从队列中取数据,最后队列的大小为0,也是线程安全的
public class TestMyLinkQueue {
public static void main(String[] args) {
final MyLinkQueue myLinkQueue = new MyLinkQueue();
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
public void run() {
for (int i = 0; i < 10; i++) {
myLinkQueue.offer(i);
}
}
}).start();
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("自定义线程安全队列测试结果(1000个并发数据入队),队列大小为:" + myLinkQueue.size());
for (int i = 0; i < 100; i++) {
new Thread(
new Runnable() {
public void run() {
for(int i = 0;i<10;i++){
myLinkQueue.poll();
}
}
}
).start();
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("自定义线程安全队列测试结果(1000个并发数据出队),队列大小为:" + myLinkQueue.size());
System.out.println("自定义线程安全队列测试结果,空队列出队数据为:"+myLinkQueue.poll());
}
}
相关测试结果如下所示:
![在这里插入图片描述](https://img-blog.csdnimg.cn/20200309104743606.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L1dLemhhbmdsaWFuZzEyMw==,size_16,color_FFFFFF,t_70)