多线程读写资料利器-ReentrantReadWriteLock
多线程读写文件利器-ReentrantReadWriteLock
理解线程,首先要明白线程的几种状态,以及状态之间的转换,具体参考下图:
其次,必须理解线程中"锁"的作用,以下引用自sun公司文档Threads and Locks一章中关于Locks的描述:
引用
The Java programming language provides multiple mechanisms for communicating between threads. The most basic of these methods is synchronization, which is implemented using monitors. Each object in Java is associated with a monitor, which a thread can lock or unlock. Only one thread at a time may hold a lock on a monitor. Any other threads attempting to lock that monitor are blocked until they can obtain a lock on that monitor. A thread t may lock a particular monitor multiple times; each unlock reverses the effect of one lock operation.
以上说明,对于多线程来说,正是依靠锁定Object的Monitor来保证同一时间只有一个线程持有某个特定的锁.
Java API中定义Object的wait(),notify()方法就是基于其Monitor来实现:
引用
notify(): Wakes up a single thread that is waiting on this object's monitor.
wait(): Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object.
notifyAll(),wait(long timeout),wait(long timeout, int nanos)...
对于Object Monitor来说,可参考Analyzing Stack Traces文档关于Monitor的部分说明及种类介绍:
引用
A monitor can be thought of as a lock on an object, and every object has a monitor.
...
The following table describes the common registered monitors:
MonitorDescription
utf8 hash tableLocks the hashtable of defined i18N Strings that were loaded from the class constant pool.
JNI pinning lockProtects block copies of arrays to native method code.
JNI global reference lockLocks the global reference table which holds values that need to be explicitly freed, and will outlive the lifetime of the native method call.
BinClass lockLocks access to the loaded and resolved classes list. The global table list of classes
Class linking lockProtects a classes data when loading native libraries to resolve symbolic references
System class loader lockEnsures that only one thread is loading a system class at a time.
Code rewrite lockProtects code when an optimization is attempted.
Heap lockProtects the Java heap during heap memory management
Monitor cache lockOnly one thread can have access to the monitor cache at a time this lock ensures the integrity of the monitor cache
Dynamic loading lockProtects Unix green threads JVMs from loading the shared library stub libdl.so more than once at a time.
Monitor IO lockProtects physical I/O for example, open and read.
User signal monitorControls access to the signal handler if a user signal USRSIG in green threads JVMs.
Child death monitorControls access to the process wait information when using the runtime system calls to run locals commands in a green threads JVM.
I/O MonitorControls access to the threads file descriptors for poll/select events
Alarm MonitorControls access to a clock handler used in green threads JVMs to handle timeouts
Thread queue lockProtects the queue of active threads
Monitor registryOnly one thread can have access to the monitor registry at a time this lock ensures the integrity of that registry
Has finalization queue lock *Protects the list of queue lock objects that have been garbage-collected, and deemed to need finalization. They are copied to the Finalize me queue
Finalize me queue lock *Protects a list of objects that can be finalized at leisure
Name and type hash table lock *Protects the JVM hash tables of constants and their types
String intern lock *Locks the hashtable of defined Strings that were loaded from the class constant pool
Class loading lock *Ensures only one thread loads a class at a time
Java stack lock *Protects the free stack segments list
多线程下的数据操作需要保证其数据的可靠一致性,为此必须实现线程的同步.以下引用自Intrinsic Locks and Synchronization:
引用
Synchronization is built around an internal entity known as the intrinsic lock or monitor lock. (The API specification often refers to this entity simply as a "monitor.") Intrinsic locks play a role in both aspects of synchronization: enforcing exclusive access to an object's state and establishing happens-before relationships that are essential to visibility.
Every object has an intrinsic lock associated with it. By convention, a thread that needs exclusive and consistent access to an object's fields has to acquire the object's intrinsic lock before accessing them, and then release the intrinsic lock when it's done with them. A thread is said to own the intrinsic lock between the time it has acquired the lock and released the lock. As long as a thread owns an intrinsic lock, no other thread can acquire the same lock. The other thread will block when it attempts to acquire the lock.
When a thread releases an intrinsic lock, a happens-before relationship is established between that action and any subsequent acquistion of the same lock.
Locks In Synchronized Methods
...
Synchronized Statements
...
Reentrant Synchronization
...
对于以上部分,需要注意以下几点:
1. 线程同步的部分为整个Synchronized同步方法,整个Synchronized同步块,或是Reentrant同步中上锁(lock())与开锁(unlock())之间的部分.
2. 对于多线程synchronized同步块的锁定, 应该限制为锁定同一个Object对象:
synchronized(Object obj){} 注: 如果每个线程锁定的对象不同,则线程之间互不影响,无法达到同步效果.
synchronized同步块还可以直接锁定某个类的所有对象:
synchronized(XXX.class){} 注意没有static synchronized(XXX){}的用法.
锁定对象的线程与锁定类的线程之间互不影响.
3. 对于synchronized同步方法:
①public synchronized void method(){...} 锁定当前对象
②public static synchronized void method(){...} 锁定当前类的所有对象
3.① 等价于public void method(){ synchronized(this){} }
3.② 等价于public void method(){ synchronized(this.class){} }
4. Reentrant同步锁:
线程锁定靠的是Reentrant锁, 依靠调用其lock(),unlock()方法来决定何时何地执行同步.
(1) 与synchronized相比, 使用ReentrantLock(public class ReentrantLock extends Object implements Lock, Serializable),可更加精准的控制线程.
①可以判断线程持有(isHeldByCurrentThread(),isLocked())
②可以用于中断线程(lockInterruptibly())
③设置线程排队的公平性(构造方法)
(2) 专门读写文件, 可使用ReentrantReadWriteLock(public class ReentrantReadWriteLock extends Object implements ReadWriteLock, Serializable)
①该锁专门为多用户读取文件设计,因此封装了读和写之间的关系:允许多用户读(读-读)以及写文件时读(写-读),禁止读文件时写(读-写)以及写文件时再次写(写-写).
②包含两个属性readLock和writeLock, 这两个属性封装好了上述功能, 调用时直接lock(),unlock()即可.
③readLock和writeLock是通过升级(readLock -> writeLock)以及降级(writeLock -> readLock)来完成上述功能, 以下为Java API中的例子:
class CachedData {
Object data;
volatile boolean cacheValid;
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
// Recheck state because another thread might have acquired
// write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
rwl.writeLock().unlock(); // Unlock write, still hold read
}
use(data);
rwl.readLock().unlock();
}
}
以下代码为自己练习操作ReentrantReadWriteLock的代码:
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 对原始数据进行操作的类
*
* @author Vincent.zheng
*/
class Oper {
private Map<String, String> sourceMap = new HashMap<String, String>();
private ReadWriteLock lock = new ReentrantReadWriteLock(true);
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
protected Oper() {
setSourceMap();
}
// 读取单个值
public String read(String key) {
try {
readLock.lock();
launchPeriod();
return key + ":" + sourceMap.get(key);
} finally {
readLock.unlock();
}
}
//
public String read() {
try {
readLock.lock();
launchPeriod();
StringBuffer results = new StringBuffer();
String[] str = new String[sourceMap.size()];
String[] keys = sourceMap.keySet().toArray(str);
for (String key : keys) {
results.append(key + ":" + sourceMap.get(key) + "; ");
}
return new String(results);
} finally {
readLock.unlock();
}
}
public String write(String key, String value) {
try {
writeLock.lock();
launchPeriod();
sourceMap.put(key, value);
return key + ":" + value;
} finally {
writeLock.unlock();
}
}
public String write(Map<String, String> map) {
try {
writeLock.lock();
launchPeriod();
sourceMap.putAll(map);
StringBuffer results = new StringBuffer();
String[] str = new String[map.size()];
String[] keys = map.keySet().toArray(str);
for (String key : keys) {
results.append(key + ":" + map.get(key) + "; ");
}
return new String(results);
} finally {
writeLock.unlock();
}
}
// 保证每个线程运行时间在5秒以上
private void launchPeriod() {
long currentTime = System.currentTimeMillis();
for (;;) {
if (System.currentTimeMillis() - currentTime > 5000) {
break;
}
}
}
// 原始数据
private void setSourceMap() {
for (int i = 0; i < 1000; i++) {
sourceMap.put("SourceKey" + i, "SourceValue" + i);
}
}
}
class Reader extends Thread {
public Oper oper;
public Reader(String name, Oper oper) {
super(name);
this.oper = oper;
}
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name + " Start Reading");
// 读全部数据
// String results = oper.read();
// System.out.println(name + " Read=======" + results);
// 读单个随机值
String result = oper.read("SourceKey" + new Random().nextInt(1000));
System.out.println(name + " Read=======" + result);
}
}
class Writer extends Thread {
public Oper oper;
public Writer(String str, Oper oper) {
super(str);
this.oper = oper;
}
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name + " Start Writing");
// 写全部数据
// String results = oper.write(getWriteData());
// System.out.println(name + " Write=======" + results);
// 写单个值
String result = oper.write("WriteSoloKeyIn" + name, "WriteSoloValueIn"
+ name);
System.out.println(name + " Write=======" + result);
System.out.println(name + " Read=======" + oper.read());
}
// 写入数据
private Map<String, String> getWriteData() {
Map<String, String> writeMap = new HashMap<String, String>();
for (int i = 0; i < 10; i++) {
writeMap.put("WriteKey" + (i + 1), "WriteValue" + (i + 1));
}
return writeMap;
}
}
public class Test {
/**
* @param args
*/
public static void main(String[] args) {
final Oper oper = new Oper();
ArrayList<Thread> list = new ArrayList<Thread>();
for (int i = 0; i < 100; i++) {
Reader reader = new Reader("Reader" + i, oper);
list.add(reader);
}
for (int i = 0; i < 10; i++) {
Writer writer = new Writer("Writer" + i, oper);
list.add(writer);
}
ArrayList<Integer> data = new ArrayList<Integer>();
for (int i = 0; i < list.size(); i++) {
data.add(i);
}
for (int i = 0; i < list.size(); i++) {
Integer random = new Random(i).nextInt(list.size());
if (data.contains(random)) {
data.remove(random);
list.get(random).start();
}
}
}
}