主要是总结线程间的通信方式有哪些.
通信方式
-
通过
synchronized
进行通信
public class MyObject {
synchronized public void methodA() {
//do something....
}
synchronized public void methodB() {
//do some other thing
}
}
public class ThreadA extends Thread {
private MyObject object;
//省略构造方法
@Override
public void run() {
super.run();
object.methodA();
}
}
public class ThreadB extends Thread {
private MyObject object;
//省略构造方法
@Override
public void run() {
super.run();
object.methodB();
}
}
public class Run {
public static void main(String[] args) {
MyObject object = new MyObject();
//线程A与线程B 持有的是同一个对象:object
ThreadA a = new ThreadA(object);
ThreadB b = new ThreadB(object);
a.start();
b.start();
}
}
public class MyObject {
synchronized public void methodA() {
//do something....
}
synchronized public void methodB() {
//do some other thing
}
}
public class ThreadA extends Thread {
private MyObject object;
//省略构造方法
@Override
public void run() {
super.run();
object.methodA();
}
}
public class ThreadB extends Thread {
private MyObject object;
//省略构造方法
@Override
public void run() {
super.run();
object.methodB();
}
}
public class Run {
public static void main(String[] args) {
MyObject object = new MyObject();
//线程A与线程B 持有的是同一个对象:object
ThreadA a = new ThreadA(object);
ThreadB b = new ThreadB(object);
a.start();
b.start();
}
}
public class MyObject { synchronized public void methodA() { //do something.... } synchronized public void methodB() { //do some other thing } } public class ThreadA extends Thread { private MyObject object; //省略构造方法 @Override public void run() { super.run(); object.methodA(); } } public class ThreadB extends Thread { private MyObject object; //省略构造方法 @Override public void run() { super.run(); object.methodB(); } } public class Run { public static void main(String[] args) { MyObject object = new MyObject(); //线程A与线程B 持有的是同一个对象:object ThreadA a = new ThreadA(object); ThreadB b = new ThreadB(object); a.start(); b.start(); } }
在上例中, 因为ThreadA
和ThreadB
持有有共同的变量object
, 因此通过synchronized
的方式可以实现两个线程的串行执行. 以为最终被锁住的应该是object
对象。 同时根据happend-before
原则, 释放锁在获取所之前进行的, 因此线程对同一个对象的修改可以同步给另一个线程。
这种方式是通过共享内存
的方式实现两个线程的通信。
-
通过
while
循环的方式
@Test
public void testWhile() throws InterruptedException {
ObjectValue objectValue = new ObjectValue();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while(true) {
for (int i = 0, len = 10; i < len; i++) {
objectValue.add();
System.out.println("添加了" + (i + 1) + "个元素!");
// 该处会有一个sleep的操作,为了让工作内存有时机能够把数据同步到主内存之中
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
}
}
});
Thread threadB = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println("ThreadB 获取线程的size: " + objectValue.size());
// 该条件会被击穿, 因为数据延迟写入主内存之中
if (objectValue.size() == 5) {
System.out.println("==5条件成立, 线程B退出");
break;
}
}
}
});
thread.start();
threadB.start();
thread.join();
threadB.join();
}
private class ObjectValue {
private List<String> list = new ArrayList<>();
public void add() {
list.add("elements");
}
public int size() {
return list.size();
}
}
@Test
public void testWhile() throws InterruptedException {
ObjectValue objectValue = new ObjectValue();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while(true) {
for (int i = 0, len = 10; i < len; i++) {
objectValue.add();
System.out.println("添加了" + (i + 1) + "个元素!");
// 该处会有一个sleep的操作,为了让工作内存有时机能够把数据同步到主内存之中
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
}
}
});
Thread threadB = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println("ThreadB 获取线程的size: " + objectValue.size());
// 该条件会被击穿, 因为数据延迟写入主内存之中
if (objectValue.size() == 5) {
System.out.println("==5条件成立, 线程B退出");
break;
}
}
}
});
thread.start();
threadB.start();
thread.join();
threadB.join();
}
private class ObjectValue {
private List<String> list = new ArrayList<>();
public void add() {
list.add("elements");
}
public int size() {
return list.size();
}
}
@Test public void testWhile() throws InterruptedException { ObjectValue objectValue = new ObjectValue(); Thread thread = new Thread(new Runnable() { @Override public void run() { while(true) { for (int i = 0, len = 10; i < len; i++) { objectValue.add(); System.out.println("添加了" + (i + 1) + "个元素!"); // 该处会有一个sleep的操作,为了让工作内存有时机能够把数据同步到主内存之中 // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } } } } }); Thread threadB = new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println("ThreadB 获取线程的size: " + objectValue.size()); // 该条件会被击穿, 因为数据延迟写入主内存之中 if (objectValue.size() == 5) { System.out.println("==5条件成立, 线程B退出"); break; } } } }); thread.start(); threadB.start(); thread.join(); threadB.join(); } private class ObjectValue { private List<String> list = new ArrayList<>(); public void add() { list.add("elements"); } public int size() { return list.size(); } }
以上程序存在以下问题:
-
线程B是一个空循环, 没有做其他的事情, 比较耗费CPU的资源
-
ObjectValue
中的list
中的值会使得工作内存到主内存的数据同步延迟 -
(可见性): 线程B中的条件
==5
的条件可能会被击穿, 因为线程B始终都是读取的本地内存的缓存, 不能保证数据同时是最新的数据, 因此可能会导致死循环 -
该处
ObjectValue
中的list
使用volatile
也不能解决问题, 因为volatile
只是保证了线程每次都能够读到主内存中最新的数据, 但是主内存的数据可能在线程B处理过程中, 被修改了很多次.
-
wait()和notify()的方式
import java.util.ArrayList;
import java.util.List;
public class MyList {
private static List<String> list = new ArrayList<String>();
public static void add() {
list.add("anyString");
}
public static int size() {
return list.size();
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
if (MyList.size() != 5) {
System.out.println("wait begin "
+ System.currentTimeMillis());
lock.wait();
System.out.println("wait end "
+ System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
for (int i = 0; i < 10; i++) {
MyList.add();
if (MyList.size() == 5) {
lock.notify();
System.out.println("已经发出了通知");
}
System.out.println("添加了" + (i + 1) + "个元素!");
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) {
try {
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
Thread.sleep(50);
ThreadB b = new ThreadB(lock);
b.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.util.ArrayList;
import java.util.List;
public class MyList {
private static List<String> list = new ArrayList<String>();
public static void add() {
list.add("anyString");
}
public static int size() {
return list.size();
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
if (MyList.size() != 5) {
System.out.println("wait begin "
+ System.currentTimeMillis());
lock.wait();
System.out.println("wait end "
+ System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
for (int i = 0; i < 10; i++) {
MyList.add();
if (MyList.size() == 5) {
lock.notify();
System.out.println("已经发出了通知");
}
System.out.println("添加了" + (i + 1) + "个元素!");
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) {
try {
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
Thread.sleep(50);
ThreadB b = new ThreadB(lock);
b.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.util.ArrayList; import java.util.List; public class MyList { private static List<String> list = new ArrayList<String>(); public static void add() { list.add("anyString"); } public static int size() { return list.size(); } } public class ThreadA extends Thread { private Object lock; public ThreadA(Object lock) { super(); this.lock = lock; } @Override public void run() { try { synchronized (lock) { if (MyList.size() != 5) { System.out.println("wait begin " + System.currentTimeMillis()); lock.wait(); System.out.println("wait end " + System.currentTimeMillis()); } } } catch (InterruptedException e) { e.printStackTrace(); } } } public class ThreadB extends Thread { private Object lock; public ThreadB(Object lock) { super(); this.lock = lock; } @Override public void run() { try { synchronized (lock) { for (int i = 0; i < 10; i++) { MyList.add(); if (MyList.size() == 5) { lock.notify(); System.out.println("已经发出了通知"); } System.out.println("添加了" + (i + 1) + "个元素!"); Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } } } public class Run { public static void main(String[] args) { try { Object lock = new Object(); ThreadA a = new ThreadA(lock); a.start(); Thread.sleep(50); ThreadB b = new ThreadB(lock); b.start(); } catch (InterruptedException e) { e.printStackTrace(); } } }
以上方式是通过两个线程共享一个lock
的对象, 以此作为每个线程的出发点, 线程B在list
的size到了5的时候, 就通知在lock
阻塞的线程。
该方式的缺陷:
-
wait
的线程必须执行在notify
之前, 否则可能会导致线程无知被执行的情况。
-
管道通信 使用java.io.PipedInputStream 和 java.io.PipedOutputStream进行通信, 这两个徐璈成对的使用
public class ThreadPipeExchangeDataTest {
/**
* 测试通过管道进行线程间的通信
*/
@Test
public void testPiped() throws IOException, InterruptedException {
PipedOutputStream pipedOutputStream = new PipedOutputStream();
PipedInputStream pipedInputStream = new PipedInputStream();
pipedInputStream.connect(pipedOutputStream);
ThreadRead threadRead = new ThreadRead(pipedInputStream);
threadRead.start();
Thread.sleep(200);
ThreadWrite threadWrite = new ThreadWrite(pipedOutputStream);
threadWrite.start();
}
/**
* 该线程用于读取数据
*/
private class ThreadRead extends Thread {
private PipedInputStream pipedInputStream;
public ThreadRead(PipedInputStream inputStream) {
this.pipedInputStream = inputStream;
}
@Override
public void run() {
readData();
}
private void readData() {
System.out.println("开始读取数据");
byte[] bytes = new byte[20];
try {
int readLength = this.pipedInputStream.read(bytes);
while (readLength != -1) {
String str = new String(bytes, 0, readLength);
System.out.println("获取到数据:" + str);
readLength = this.pipedInputStream.read(bytes);
}
System.out.println();
this.pipedInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private class ThreadWrite extends Thread {
private PipedOutputStream pipedOutputStream;
public ThreadWrite(PipedOutputStream pipedOutputStream) {
this.pipedOutputStream = pipedOutputStream;
}
@Override
public void run() {
write();
}
private void write() {
System.out.print("write: ");
try {
for (int i = 0, len = 10; i < len; i++) {
String outData = " " + i;
this.pipedOutputStream.write(outData.getBytes());
System.out.print(outData);
}
System.out.println();
this.pipedOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public class ThreadPipeExchangeDataTest {
/**
* 测试通过管道进行线程间的通信
*/
@Test
public void testPiped() throws IOException, InterruptedException {
PipedOutputStream pipedOutputStream = new PipedOutputStream();
PipedInputStream pipedInputStream = new PipedInputStream();
pipedInputStream.connect(pipedOutputStream);
ThreadRead threadRead = new ThreadRead(pipedInputStream);
threadRead.start();
Thread.sleep(200);
ThreadWrite threadWrite = new ThreadWrite(pipedOutputStream);
threadWrite.start();
}
/**
* 该线程用于读取数据
*/
private class ThreadRead extends Thread {
private PipedInputStream pipedInputStream;
public ThreadRead(PipedInputStream inputStream) {
this.pipedInputStream = inputStream;
}
@Override
public void run() {
readData();
}
private void readData() {
System.out.println("开始读取数据");
byte[] bytes = new byte[20];
try {
int readLength = this.pipedInputStream.read(bytes);
while (readLength != -1) {
String str = new String(bytes, 0, readLength);
System.out.println("获取到数据:" + str);
readLength = this.pipedInputStream.read(bytes);
}
System.out.println();
this.pipedInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private class ThreadWrite extends Thread {
private PipedOutputStream pipedOutputStream;
public ThreadWrite(PipedOutputStream pipedOutputStream) {
this.pipedOutputStream = pipedOutputStream;
}
@Override
public void run() {
write();
}
private void write() {
System.out.print("write: ");
try {
for (int i = 0, len = 10; i < len; i++) {
String outData = " " + i;
this.pipedOutputStream.write(outData.getBytes());
System.out.print(outData);
}
System.out.println();
this.pipedOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public class ThreadPipeExchangeDataTest { /** * 测试通过管道进行线程间的通信 */ @Test public void testPiped() throws IOException, InterruptedException { PipedOutputStream pipedOutputStream = new PipedOutputStream(); PipedInputStream pipedInputStream = new PipedInputStream(); pipedInputStream.connect(pipedOutputStream); ThreadRead threadRead = new ThreadRead(pipedInputStream); threadRead.start(); Thread.sleep(200); ThreadWrite threadWrite = new ThreadWrite(pipedOutputStream); threadWrite.start(); } /** * 该线程用于读取数据 */ private class ThreadRead extends Thread { private PipedInputStream pipedInputStream; public ThreadRead(PipedInputStream inputStream) { this.pipedInputStream = inputStream; } @Override public void run() { readData(); } private void readData() { System.out.println("开始读取数据"); byte[] bytes = new byte[20]; try { int readLength = this.pipedInputStream.read(bytes); while (readLength != -1) { String str = new String(bytes, 0, readLength); System.out.println("获取到数据:" + str); readLength = this.pipedInputStream.read(bytes); } System.out.println(); this.pipedInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } private class ThreadWrite extends Thread { private PipedOutputStream pipedOutputStream; public ThreadWrite(PipedOutputStream pipedOutputStream) { this.pipedOutputStream = pipedOutputStream; } @Override public void run() { write(); } private void write() { System.out.print("write: "); try { for (int i = 0, len = 10; i < len; i++) { String outData = " " + i; this.pipedOutputStream.write(outData.getBytes()); System.out.print(outData); } System.out.println(); this.pipedOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } }