线程间的通信四种方式

主要是总结线程间的通信方式有哪些.

通信方式

  1. 通过synchronized进行通信

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
public class MyObject {
synchronized public void methodA() {
//do something....
}
synchronized public void methodB() {
//do some other thing
}
}
public class ThreadA extends Thread {
private MyObject object;
//省略构造方法
@Override
public void run() {
super.run();
object.methodA();
}
}
public class ThreadB extends Thread {
private MyObject object;
//省略构造方法
@Override
public void run() {
super.run();
object.methodB();
}
}
public class Run {
public static void main(String[] args) {
MyObject object = new MyObject();
//线程A与线程B 持有的是同一个对象:object
ThreadA a = new ThreadA(object);
ThreadB b = new ThreadB(object);
a.start();
b.start();
}
}
public class MyObject { synchronized public void methodA() { //do something.... } synchronized public void methodB() { //do some other thing } } ​ public class ThreadA extends Thread { private MyObject object; //省略构造方法 @Override public void run() { super.run(); object.methodA(); } } ​ public class ThreadB extends Thread { private MyObject object; //省略构造方法 @Override public void run() { super.run(); object.methodB(); } } ​ public class Run { public static void main(String[] args) { MyObject object = new MyObject(); //线程A与线程B 持有的是同一个对象:object ThreadA a = new ThreadA(object); ThreadB b = new ThreadB(object); a.start(); b.start(); } }
public class MyObject {
   synchronized public void methodA() {
       //do something....
  }
   synchronized public void methodB() {
       //do some other thing
  }
}
​
public class ThreadA extends Thread {
   private MyObject object;
   //省略构造方法
   @Override
   public void run() {
       super.run();
       object.methodA();
  }
}
​
public class ThreadB extends Thread {
   private MyObject object;
//省略构造方法
   @Override
   public void run() {
       super.run();
       object.methodB();
  }
}
​
public class Run {
   public static void main(String[] args) {
       MyObject object = new MyObject();
       //线程A与线程B 持有的是同一个对象:object
       ThreadA a = new ThreadA(object);
       ThreadB b = new ThreadB(object);
       a.start();
       b.start();
  }
}

 

在上例中, 因为ThreadAThreadB持有有共同的变量object, 因此通过synchronized的方式可以实现两个线程的串行执行. 以为最终被锁住的应该是object对象。 同时根据happend-before原则, 释放锁在获取所之前进行的, 因此线程对同一个对象的修改可以同步给另一个线程。

这种方式是通过共享内存的方式实现两个线程的通信。

  1. 通过while循环的方式

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Test
public void testWhile() throws InterruptedException {
ObjectValue objectValue = new ObjectValue();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while(true) {
for (int i = 0, len = 10; i < len; i++) {
objectValue.add();
System.out.println("添加了" + (i + 1) + "个元素!");
// 该处会有一个sleep的操作,为了让工作内存有时机能够把数据同步到主内存之中
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
}
}
});
Thread threadB = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println("ThreadB 获取线程的size: " + objectValue.size());
// 该条件会被击穿, 因为数据延迟写入主内存之中
if (objectValue.size() == 5) {
System.out.println("==5条件成立, 线程B退出");
break;
}
}
}
});
thread.start();
threadB.start();
thread.join();
threadB.join();
}
private class ObjectValue {
private List<String> list = new ArrayList<>();
public void add() {
list.add("elements");
}
public int size() {
return list.size();
}
}
@Test public void testWhile() throws InterruptedException { ​ ObjectValue objectValue = new ObjectValue(); ​ Thread thread = new Thread(new Runnable() { @Override public void run() { while(true) { for (int i = 0, len = 10; i < len; i++) { objectValue.add(); ​ System.out.println("添加了" + (i + 1) + "个元素!"); // 该处会有一个sleep的操作,为了让工作内存有时机能够把数据同步到主内存之中 // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } } } } }); ​ Thread threadB = new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println("ThreadB 获取线程的size: " + objectValue.size()); // 该条件会被击穿, 因为数据延迟写入主内存之中 if (objectValue.size() == 5) { System.out.println("==5条件成立, 线程B退出"); break; } } } }); ​ thread.start(); threadB.start(); ​ thread.join(); threadB.join(); } ​ private class ObjectValue { private List<String> list = new ArrayList<>(); ​ public void add() { list.add("elements"); } ​ public int size() { return list.size(); } }
@Test
public void testWhile() throws InterruptedException {
​
 ObjectValue objectValue = new ObjectValue();
​
 Thread thread = new Thread(new Runnable() {
   @Override
   public void run() {
    while(true) {
      for (int i = 0, len = 10; i < len; i++) {
         objectValue.add();
​
        System.out.println("添加了" + (i + 1) + "个元素!");
        // 该处会有一个sleep的操作,为了让工作内存有时机能够把数据同步到主内存之中
//           try {
//             Thread.sleep(1000);
//           } catch (InterruptedException e) {
//             e.printStackTrace();
//           }
      }
    }
  }
});
​
 Thread threadB = new Thread(new Runnable() {
   @Override
   public void run() {
     while (true) {
       System.out.println("ThreadB 获取线程的size: " + objectValue.size());
       // 该条件会被击穿, 因为数据延迟写入主内存之中
       if (objectValue.size() == 5) {
         System.out.println("==5条件成立, 线程B退出");
         break;
      }
    }
  }
});
​
 thread.start();
 threadB.start();
​
 thread.join();
 threadB.join();
}
​
private class ObjectValue {
   private List<String> list = new ArrayList<>();
​
   public void add() {
     list.add("elements");
  }
​
   public int size() {
     return list.size();
  }
}

 

以上程序存在以下问题:

  • 线程B是一个空循环, 没有做其他的事情, 比较耗费CPU的资源

  • ObjectValue中的list中的值会使得工作内存到主内存的数据同步延迟

  • (可见性): 线程B中的条件==5的条件可能会被击穿, 因为线程B始终都是读取的本地内存的缓存, 不能保证数据同时是最新的数据, 因此可能会导致死循环

  • 该处ObjectValue中的list使用volatile也不能解决问题, 因为volatile只是保证了线程每次都能够读到主内存中最新的数据, 但是主内存的数据可能在线程B处理过程中, 被修改了很多次.

  1. wait()和notify()的方式

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import java.util.ArrayList;
import java.util.List;
public class MyList {
private static List<String> list = new ArrayList<String>();
public static void add() {
list.add("anyString");
}
public static int size() {
return list.size();
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
if (MyList.size() != 5) {
System.out.println("wait begin "
+ System.currentTimeMillis());
lock.wait();
System.out.println("wait end "
+ System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
for (int i = 0; i < 10; i++) {
MyList.add();
if (MyList.size() == 5) {
lock.notify();
System.out.println("已经发出了通知");
}
System.out.println("添加了" + (i + 1) + "个元素!");
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) {
try {
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
Thread.sleep(50);
ThreadB b = new ThreadB(lock);
b.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.util.ArrayList; import java.util.List; ​ public class MyList { private static List<String> list = new ArrayList<String>(); public static void add() { list.add("anyString"); } public static int size() { return list.size(); } } ​ public class ThreadA extends Thread { ​ private Object lock; public ThreadA(Object lock) { super(); this.lock = lock; } @Override public void run() { try { synchronized (lock) { if (MyList.size() != 5) { System.out.println("wait begin " + System.currentTimeMillis()); lock.wait(); System.out.println("wait end " + System.currentTimeMillis()); } } } catch (InterruptedException e) { e.printStackTrace(); } } } ​ ​ public class ThreadB extends Thread { private Object lock; ​ public ThreadB(Object lock) { super(); this.lock = lock; } ​ @Override public void run() { try { synchronized (lock) { for (int i = 0; i < 10; i++) { MyList.add(); if (MyList.size() == 5) { lock.notify(); System.out.println("已经发出了通知"); } System.out.println("添加了" + (i + 1) + "个元素!"); Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } } } ​ public class Run { public static void main(String[] args) { try { Object lock = new Object(); ThreadA a = new ThreadA(lock); a.start(); Thread.sleep(50); ThreadB b = new ThreadB(lock); b.start(); } catch (InterruptedException e) { e.printStackTrace(); } } }
import java.util.ArrayList;
 import java.util.List;
​
 public class MyList {
     private static List<String> list = new ArrayList<String>();
     public static void add() {
         list.add("anyString");
    }
    public static int size() {
        return list.size();
    }
}
​
public class ThreadA extends Thread {
​
    private Object lock;
    public ThreadA(Object lock) {
        super();
        this.lock = lock;
    }
    @Override
    public void run() {
        try {
            synchronized (lock) {
                if (MyList.size() != 5) {
                    System.out.println("wait begin "
                            + System.currentTimeMillis());
                    lock.wait();
                    System.out.println("wait end "
                            + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
​
​
public class ThreadB extends Thread {
    private Object lock;
​
    public ThreadB(Object lock) {
        super();
        this.lock = lock;
    }
​
    @Override
   public void run() {
       try {
            synchronized (lock) {
                for (int i = 0; i < 10; i++) {
                  MyList.add();
                    if (MyList.size() == 5) {
                        lock.notify();
                        System.out.println("已经发出了通知");
                    }
                    System.out.println("添加了" + (i + 1) + "个元素!");
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
​
public class Run {
    public static void main(String[] args) {
        try {
            Object lock = new Object();
            ThreadA a = new ThreadA(lock);
            a.start();
            Thread.sleep(50);
            ThreadB b = new ThreadB(lock);
            b.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

以上方式是通过两个线程共享一个lock的对象, 以此作为每个线程的出发点, 线程B在list的size到了5的时候, 就通知在lock阻塞的线程。

该方式的缺陷:

  • wait的线程必须执行在notify之前, 否则可能会导致线程无知被执行的情况。

  1. 管道通信 使用java.io.PipedInputStream 和 java.io.PipedOutputStream进行通信, 这两个徐璈成对的使用

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
public class ThreadPipeExchangeDataTest {
/**
* 测试通过管道进行线程间的通信
*/
@Test
public void testPiped() throws IOException, InterruptedException {
PipedOutputStream pipedOutputStream = new PipedOutputStream();
PipedInputStream pipedInputStream = new PipedInputStream();
pipedInputStream.connect(pipedOutputStream);
ThreadRead threadRead = new ThreadRead(pipedInputStream);
threadRead.start();
Thread.sleep(200);
ThreadWrite threadWrite = new ThreadWrite(pipedOutputStream);
threadWrite.start();
}
/**
* 该线程用于读取数据
*/
private class ThreadRead extends Thread {
private PipedInputStream pipedInputStream;
public ThreadRead(PipedInputStream inputStream) {
this.pipedInputStream = inputStream;
}
@Override
public void run() {
readData();
}
private void readData() {
System.out.println("开始读取数据");
byte[] bytes = new byte[20];
try {
int readLength = this.pipedInputStream.read(bytes);
while (readLength != -1) {
String str = new String(bytes, 0, readLength);
System.out.println("获取到数据:" + str);
readLength = this.pipedInputStream.read(bytes);
}
System.out.println();
this.pipedInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private class ThreadWrite extends Thread {
private PipedOutputStream pipedOutputStream;
public ThreadWrite(PipedOutputStream pipedOutputStream) {
this.pipedOutputStream = pipedOutputStream;
}
@Override
public void run() {
write();
}
private void write() {
System.out.print("write: ");
try {
for (int i = 0, len = 10; i < len; i++) {
String outData = " " + i;
this.pipedOutputStream.write(outData.getBytes());
System.out.print(outData);
}
System.out.println();
this.pipedOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public class ThreadPipeExchangeDataTest { ​ /** * 测试通过管道进行线程间的通信 */ @Test public void testPiped() throws IOException, InterruptedException { PipedOutputStream pipedOutputStream = new PipedOutputStream(); PipedInputStream pipedInputStream = new PipedInputStream(); pipedInputStream.connect(pipedOutputStream); ​ ThreadRead threadRead = new ThreadRead(pipedInputStream); threadRead.start(); ​ Thread.sleep(200); ​ ThreadWrite threadWrite = new ThreadWrite(pipedOutputStream); threadWrite.start(); } ​ /** * 该线程用于读取数据 */ private class ThreadRead extends Thread { ​ private PipedInputStream pipedInputStream; ​ public ThreadRead(PipedInputStream inputStream) { this.pipedInputStream = inputStream; } ​ @Override public void run() { readData(); } ​ private void readData() { System.out.println("开始读取数据"); byte[] bytes = new byte[20]; try { int readLength = this.pipedInputStream.read(bytes); while (readLength != -1) { String str = new String(bytes, 0, readLength); System.out.println("获取到数据:" + str); readLength = this.pipedInputStream.read(bytes); } ​ System.out.println(); this.pipedInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } ​ private class ThreadWrite extends Thread { ​ private PipedOutputStream pipedOutputStream; ​ public ThreadWrite(PipedOutputStream pipedOutputStream) { this.pipedOutputStream = pipedOutputStream; } ​ @Override public void run() { write(); } ​ private void write() { System.out.print("write: "); try { for (int i = 0, len = 10; i < len; i++) { String outData = " " + i; ​ this.pipedOutputStream.write(outData.getBytes()); System.out.print(outData); } ​ System.out.println(); this.pipedOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } }
public class ThreadPipeExchangeDataTest {
​
 /**
  * 测试通过管道进行线程间的通信
  */
 @Test
 public void testPiped() throws IOException, InterruptedException {
   PipedOutputStream pipedOutputStream = new PipedOutputStream();
   PipedInputStream pipedInputStream = new PipedInputStream();
   pipedInputStream.connect(pipedOutputStream);
​
   ThreadRead threadRead = new ThreadRead(pipedInputStream);
   threadRead.start();
​
   Thread.sleep(200);
​
   ThreadWrite threadWrite = new ThreadWrite(pipedOutputStream);
   threadWrite.start();
}
​
 /**
  * 该线程用于读取数据
  */
 private class ThreadRead extends Thread {
​
   private PipedInputStream pipedInputStream;
​
   public ThreadRead(PipedInputStream inputStream) {
     this.pipedInputStream = inputStream;
  }
​
   @Override
   public void run() {
     readData();
  }
​
   private void readData() {
     System.out.println("开始读取数据");
     byte[] bytes = new byte[20];
     try {
       int readLength = this.pipedInputStream.read(bytes);
       while (readLength != -1) {
         String str = new String(bytes, 0, readLength);
         System.out.println("获取到数据:" + str);
         readLength = this.pipedInputStream.read(bytes);
      }
​
       System.out.println();
       this.pipedInputStream.close();
    } catch (IOException e) {
       e.printStackTrace();
    }
  }
}
​
 private class ThreadWrite extends Thread {
​
   private PipedOutputStream pipedOutputStream;
​
   public ThreadWrite(PipedOutputStream pipedOutputStream) {
     this.pipedOutputStream = pipedOutputStream;
  }
​
   @Override
   public void run() {
     write();
  }
​
   private void write() {
     System.out.print("write: ");
     try {
       for (int i = 0, len = 10; i < len; i++) {
         String outData = " " + i;
​
         this.pipedOutputStream.write(outData.getBytes());
         System.out.print(outData);
      }
​
       System.out.println();
       this.pipedOutputStream.close();
    } catch (IOException e) {
       e.printStackTrace();
    }
  }
}
}
Leave a Comment

Comments

No comments yet. Why don’t you start the discussion?

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注