首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

Memcached CAS 协议(处理memcached的多线程并发有关问题)

2014-01-26 
Memcached CAS 协议(处理memcached的多线程并发问题)什么是CAS协议?Memcached于1.2.4版本新增CAS(Check an

Memcached CAS 协议(处理memcached的多线程并发问题)

什么是CAS协议?

Memcached于1.2.4版本新增CAS(Check and Set)协议类同于Java并发的CAS(Compare and Swap)原子操作,处理同一item被多个线程更改过程的并发问题。?

在Memcached中,每个key关联有一个64-bit长度的long型惟一数值,表示该key对应value的版本号。这个数值由Memcached server产生,从1开始,且同一Memcached server不会重复。在两种情况下这个版本数值会加1:1、新增一个key-value对;2、对某已有key对应的value值更新成功。删除item版本值不会减小。?

例如?

Java代码??Memcached CAS 协议(处理memcached的多线程并发有关问题)
  1. MemcachedClient?client?=?new?MemcachedClient();??
  2. ???client.set("fKey",?"fValue");???
  3. ??//第一次set,?在Memcached?server中会维护fKey对应的value的版本号,假设是548;??
  4. ??
  5. ???client.set("fKey",?"sValue");???
  6. ??//再次set,则这个fKey对应的value的版本号变为549;??
  7. ??
  8. ???CASValue?casValue?=?client.gets("fKey");??
  9. ??//这样就可以得到对应key的cas版本号和实际value(各个Memcached?client都有类似的对象表示,名字可能不一样,但效果类同),如?casValue.getValue?=?"sValue",casValue.getCas=549;??



CAS协议解决的问题?

模拟多个Memcached client并发set同一个key的场景。如clientA想把当前key的value set为"x",且操作成功;clientB却把当前key的value值由"x"覆盖set为"y",这时clientA再根据key去取value时得到"y"而不是期望的"x",它使用这个值,但不知道这个值已经被其它线程修改过,就可能会出现问题。?

CAS协议解决这种并发修改问题。有线程试图修改当前key-value对的value时,先由gets方法得到item的版本号,操作完成提交数据时,使用cas方法谨慎变更,如果在本地对item操作过程中这个key-value对在Memcached server端被其它线程更改过,就放弃此次修改(乐观锁概念)。?


Java代码??Memcached CAS 协议(处理memcached的多线程并发有关问题)
  1. CASValue?casValue?=?client.gets(key);??
  2. //*****??
  3. //本地的各种处理??
  4. //*****??
  5. CASResponse?response?=?client.cas(key,?newValue,?casValue);??
  6. //在我取数据时item的版本号是casValue.getCas(),所以提交时我期望item的版本号是没有改变过的。如果被修改过,不是我取数据时的版本号,那么Memcached?server对这次提交什么也不做,返回true或false由用户自己来提出解决方案(什么也不做或是重新获取版本号,再次重试提交等)??



并发环境下的正确性验证?

用多个Memcached client并发更改同一个key值,将value递增,如果? 操作次数-CAS失败次数 = value增加的值,表示并发环境下CAS处理没有问题。?

Java代码??Memcached CAS 协议(处理memcached的多线程并发有关问题)
  1. import?java.io.IOException;??
  2. import?java.net.InetSocketAddress;??
  3. ??
  4. import?net.spy.memcached.CASResponse;??
  5. import?net.spy.memcached.CASValue;??
  6. import?net.spy.memcached.MemcachedClient;??
  7. ??
  8. ??
  9. public?class?CASTest?{??
  10. ??????
  11. ????private?static?MemcachedClient?client?=?null;??
  12. ??????
  13. ????static?{??
  14. ????????try?{??
  15. ????????????client?=?new?MemcachedClient(??
  16. ????????????????????????????????new?InetSocketAddress("localhost",?11211));??
  17. ????????}?catch?(IOException?o)?{??
  18. ????????????o.printStackTrace();??
  19. ????????}??
  20. ????}??
  21. ??
  22. ????public?static?void?main(String[]?args)?throws?Exception?{??
  23. ????????//Firstly,?the?key?should?exist.??
  24. ????????//key?is?"number",?value?is?Integer?1,?7845?is?expire?time??
  25. ????????client.set("number",?7845,?1);??
  26. ??????????
  27. ??????????
  28. ????????CASTest?testObj?=?new?CASTest();??
  29. ????????//start?the?multithread?environment??
  30. ????????for?(int?i?=?0;?i?<?10;?i++)?{??
  31. ????????????testObj.new?ThreadTest("Thread-"?+?(i?+?1)).start();??
  32. ????????}??
  33. ????}??
  34. ??????
  35. ????/**?
  36. ?????*?Each?thread?runs?many?times?
  37. ?????*/??
  38. ????private?class?ThreadTest?extends?Thread?{??
  39. ??????????
  40. ????????private??MemcachedClient?client?=?null;??
  41. ????????ThreadTest(String?name)?throws?IOException?{??
  42. ????????????super(name);??
  43. ????????????client?=?new?MemcachedClient(??
  44. ??????????????????????????????????new?InetSocketAddress("localhost",?11211));??
  45. ????????}??
  46. ??????????
  47. ????????public?void?run()?{??
  48. ????????????int?i?=?0;??
  49. ????????????int?success?=?0;??
  50. ????????????while?(i?<?10)?{??
  51. ????????????????i++;??
  52. ????????????????CASValue<Object>?uniqueValue?=client.gets("number");??
  53. ????????????????CASResponse?response?=?client.cas("number",?????
  54. ?????????????????uniqueValue.getCas(),?(Integer)uniqueValue.getValue()?+?1);??
  55. ??
  56. ????????????????if?(response.toString().equals("OK"))?{??
  57. ????????????????????success++;??
  58. ????????????????}??
  59. ????????????????System.out.println(Thread.currentThread().getName()?+?"?"?+??i???
  60. ??????????????????+?"?time?"?+?"?update?oldValue?:?"?+?uniqueValue???
  61. ??????????????????+??"?,?result?:?"?+?response);??
  62. ????????????}??
  63. ??????????????
  64. ????????????if?(success?<?10)?{??
  65. ????????????????System.out.println(Thread.currentThread().getName()??
  66. ??????????????????????+?"?unsuccessful?times?:?"?+?(10?-?success?));??
  67. ????????????}??
  68. ????????}??
  69. ????}??
  70. }??



每次执行的结果都会不一样,如其中某次的执行结果为: 总共操作100次,冲突47次,且最后value由1涨到53,那么表示验证成功。?

热点排行