主要是总结线程间的通信方式有哪些.
通信方式
public class MyObject { synchronized public void methodA() { //do something.... } synchronized public void methodB() { //do some other thing } } public class ThreadA extends Thread { private MyObject object; //省略构造方法 @Override public void run() { super.run(); object.methodA(); } } public class ThreadB extends Thread { private MyObject object; //省略构造方法 @Override public void run() { super.run(); object.methodB(); } } public class Run { public static void main(String[] args) { MyObject object = new MyObject(); //线程A与线程B 持有的是同一个对象:object ThreadA a = new ThreadA(object); ThreadB b = new ThreadB(object); a.start(); b.start(); } }
在上例中, 因为ThreadA
和ThreadB
持有有共同的变量object
, 因此通过synchronized
的方式可以实现两个线程的串行执行. 以为最终被锁住的应该是object
对象。 同时根据happend-before
原则, 释放锁在获取所之前进行的, 因此线程对同一个对象的修改可以同步给另一个线程。
这种方式是通过共享内存
的方式实现两个线程的通信。
@Test public void testWhile() throws InterruptedException { ObjectValue objectValue = new ObjectValue(); Thread thread = new Thread(new Runnable() { @Override public void run() { while(true) { for (int i = 0, len = 10; i < len; i++) { objectValue.add(); System.out.println("添加了" + (i + 1) + "个元素!"); // 该处会有一个sleep的操作,为了让工作内存有时机能够把数据同步到主内存之中 // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } } } } }); Thread threadB = new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println("ThreadB 获取线程的size: " + objectValue.size()); // 该条件会被击穿, 因为数据延迟写入主内存之中 if (objectValue.size() == 5) { System.out.println("==5条件成立, 线程B退出"); break; } } } }); thread.start(); threadB.start(); thread.join(); threadB.join(); } private class ObjectValue { private List<String> list = new ArrayList<>(); public void add() { list.add("elements"); } public int size() { return list.size(); } }
以上程序存在以下问题:
-
线程B是一个空循环, 没有做其他的事情, 比较耗费CPU的资源
-
ObjectValue
中的list
中的值会使得工作内存到主内存的数据同步延迟 -
(可见性): 线程B中的条件
==5
的条件可能会被击穿, 因为线程B始终都是读取的本地内存的缓存, 不能保证数据同时是最新的数据, 因此可能会导致死循环 -
该处
ObjectValue
中的list
使用volatile
也不能解决问题, 因为volatile
只是保证了线程每次都能够读到主内存中最新的数据, 但是主内存的数据可能在线程B处理过程中, 被修改了很多次.
import java.util.ArrayList; import java.util.List; public class MyList { private static List<String> list = new ArrayList<String>(); public static void add() { list.add("anyString"); } public static int size() { return list.size(); } } public class ThreadA extends Thread { private Object lock; public ThreadA(Object lock) { super(); this.lock = lock; } @Override public void run() { try { synchronized (lock) { if (MyList.size() != 5) { System.out.println("wait begin " + System.currentTimeMillis()); lock.wait(); System.out.println("wait end " + System.currentTimeMillis()); } } } catch (InterruptedException e) { e.printStackTrace(); } } } public class ThreadB extends Thread { private Object lock; public ThreadB(Object lock) { super(); this.lock = lock; } @Override public void run() { try { synchronized (lock) { for (int i = 0; i < 10; i++) { MyList.add(); if (MyList.size() == 5) { lock.notify(); System.out.println("已经发出了通知"); } System.out.println("添加了" + (i + 1) + "个元素!"); Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } } } public class Run { public static void main(String[] args) { try { Object lock = new Object(); ThreadA a = new ThreadA(lock); a.start(); Thread.sleep(50); ThreadB b = new ThreadB(lock); b.start(); } catch (InterruptedException e) { e.printStackTrace(); } } }
以上方式是通过两个线程共享一个lock
的对象, 以此作为每个线程的出发点, 线程B在list
的size到了5的时候, 就通知在lock
阻塞的线程。
该方式的缺陷:
-
wait
的线程必须执行在notify
之前, 否则可能会导致线程无知被执行的情况。
public class ThreadPipeExchangeDataTest { /** * 测试通过管道进行线程间的通信 */ @Test public void testPiped() throws IOException, InterruptedException { PipedOutputStream pipedOutputStream = new PipedOutputStream(); PipedInputStream pipedInputStream = new PipedInputStream(); pipedInputStream.connect(pipedOutputStream); ThreadRead threadRead = new ThreadRead(pipedInputStream); threadRead.start(); Thread.sleep(200); ThreadWrite threadWrite = new ThreadWrite(pipedOutputStream); threadWrite.start(); } /** * 该线程用于读取数据 */ private class ThreadRead extends Thread { private PipedInputStream pipedInputStream; public ThreadRead(PipedInputStream inputStream) { this.pipedInputStream = inputStream; } @Override public void run() { readData(); } private void readData() { System.out.println("开始读取数据"); byte[] bytes = new byte[20]; try { int readLength = this.pipedInputStream.read(bytes); while (readLength != -1) { String str = new String(bytes, 0, readLength); System.out.println("获取到数据:" + str); readLength = this.pipedInputStream.read(bytes); } System.out.println(); this.pipedInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } private class ThreadWrite extends Thread { private PipedOutputStream pipedOutputStream; public ThreadWrite(PipedOutputStream pipedOutputStream) { this.pipedOutputStream = pipedOutputStream; } @Override public void run() { write(); } private void write() { System.out.print("write: "); try { for (int i = 0, len = 10; i < len; i++) { String outData = " " + i; this.pipedOutputStream.write(outData.getBytes()); System.out.print(outData); } System.out.println(); this.pipedOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } }
文章评论