题目是这样子的,有4个线程,线程1只输出1,线程2只输出2,线程3只输出3,线程4只输出4。现在要生成4个文件,文件的内容分别为:
文件1:12341234……
文件2:23412341……
文件3:34123412……
线程4:41234123……
看上去很棘手,那么先从简单的情况入手。假如只要生成文件1,应该如何调度这4个线程呢?很容易想到设置一个临界变量,用这个变量表示当前需要哪一个线程写入,然后4个线程就相互竞争就行了。
private Object _lockObject = new Object();
private volatile int _currentNumber = 0;
每个线程先锁定_lockObject,判断当前要写入的数是不是自己要写的,如果是,那么写入该数值(这里用StringBuffer代替文件),设置下一个_currentNumber:
for (;;) {
if (_printCount > 10)
break;
synchronized (TestJava.this._lockObject) {
if (TestJava.this._currentNumber == this._number) {
_buffer1.append(this._number);
TestJava.this._currentNumber = (TestJava.this._currentNumber + 1) % COUNT + 1;
}
}
}
这样做的坏处是,线程之间的顺序是无序的。比如线程1写完后,又会再次进入临界区尝试锁定_lockObject,这个时候JVM有可能会让线程1继续取得锁;还有可能是,线程3进入临界区被休眠,JVM唤醒线程1,然后又唤醒线程3…… 在某个时刻,线程2总会被唤醒,这样会做不少无用功。所以我们的改进办法是,如果当前写入的数字不是该线程的,就把自己休眠起来,这样就防止JVM再次把自己唤醒了。
具体是用到了Object类的wait()和notifyAll()方法:当不是自己写入时,把自己休眠起来;当写入完毕后,唤醒其他线程。
for (;;) {
if (_printCount > 10)
break;
synchronized (TestJava.this._lockObject) {
if (TestJava.this._currentNumber == this._number) {
_buffer1.append(this._number);
_printCount++;
TestJava.this._currentNumber = (TestJava.this._currentNumber + 1) % COUNT;
TestJava.this._lockObject.notifyAll();
} else
try {
TestJava.this._lockObject.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
推广到4个线程的情形,我们可以为每一个文件设置一把锁,每个线程轮询这4个文件:
for (;;) {
for (int i = 0; i < NUMBER_OF_TEXT; i++) {
synchronized (sbs[i]) {
while ((lastContent[i] + 1) % NUMBER_OF_TEXT + 1 != sequence) {
try {
sbs[i].wait();
} catch (InterruptedException e) {
System.out.println("Thread " + sequence + " stopped.");
}
}
lastContent[i] = sequence;
sbs[i].append("" + sequence);
sbs[i].notifyAll();
}
}
}
可是,这样做的效率也不高。假如现在第二个文件应该写入1,那么线程1在询问第一个文件时就会把自己锁住,这样只好等其他线程notify的时候才有机会被唤醒。实际上,当第一个文件不是写1的时候,另一个文件应该要写入1,这个时候线程1应该去那个文件把1写入。可以把上述代码用 ReentrantLock 的 tryLock 方法改写,使每个线程不会被锁住,而是直接尝试下一个文件,这样效率会提升一些。
实际上,仔细观察会发现,对于每个文件的任意一个字符,都是4个线程各写一个,如果我们能把文件的写入分阶段进行,让4个线程同时写到对应位置上,然后再写入下一个位置,这样的效率会显著提高。
这个时候CyclicBarrier就派上用场了。
_buffer = new StringBuffer[COUNT];
_barrier = new CyclicBarrier(COUNT);
Thread[] threads = new Thread[COUNT];
for (int i = 0; i < COUNT; i++) {
_buffer[i] = new StringBuffer();
threads[i] = new Thread(new NumberWriter(i + 1, i));
threads[i].start();
}
首先生成CyclicBarrier的对象,参数表示在线程调用await()方法时,一共要阻塞多少个线程才能被同时唤醒。这样,我们就可以采取如下方法:
- 初始时为每个线程分配一个正确的文件。
- 线程调用 await() 方法被阻塞。当4个线程同时被阻塞时,继续执行。
- 线程写入对应的文件,并设置下一个要写入的文件。进入下一次循环调用 await() 继续被阻塞。
- 当所有的线程都写完并设置好下一个文件位置的时候,同时被唤醒继续执行。
在这里,我们甚至不需要设置其他的锁,因为采取这种方法,每一个阶段都是每个线程写入一次。如果有的线程执行得很快,在下一次写入操作前,会被 await() 方法阻塞,只有等到所有线程执行到这个阶段时,才能被继续执行。
class NumberWriter implements Runnable {
private int _number;
private int _currentWriterIndex;
private int _writeCount = 0;
@Override
public void run() {
for (;;) {
if (_writeCount > 100)
break;
try {
_barrier.await();
TestJava.this._buffer[_currentWriterIndex].append(_number);
_currentWriterIndex = (_currentWriterIndex - 1 + COUNT) % TestJava.this.COUNT;
_writeCount++;
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public NumberWriter(int number, int currentWriterIndex) {
this._number = number;
this._currentWriterIndex = currentWriterIndex;
}
}
可以看到 CyclicBarrier 思路最清晰,最主要是效率最高,不会产生线程之间的相互竞争问题。
当然,使用 CyclicBarrier 之前一定要考虑清楚是否合适。并且,一定要做好进程的调度,否则很有可能线程会导致线程的无限等待。