首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

使用Java多线程实现任务散发

2013-12-29 
使用Java多线程实现任务分发使用Java多线程实现任务分发2009-08-13 09:07 佚名 网络转载?字号:T?|?T本文向

使用Java多线程实现任务分发

使用Java多线程实现任务分发2009-08-13 09:07 佚名 网络转载?字号:T?|?T使用Java多线程实现任务散发

本文向你介绍使用Java多线程技术实现任务列表指派到线程的思路和执行过程,作者通过任务分发器、任务执行和工作线程实现了这个功能。

AD:2013云计算架构师峰会课程资料下载

?

多线程下载由来已久,如 FlashGet、NetAnts 等工具,它们都是依懒于 HTTP 协议的支持(Range 字段指定请求内容范围),首先能读取出请求内容 (即欲下载的文件) 的大小,划分出若干区块,把区块分段分发给每个线程去下载,线程从本段起始处下载数据及至段尾,多个线程下载的内容最终会写入到同一个文件中。

只研究有用的,工作中的需求:要把多个任务分派给Java的多个线程去执行,这其中就会有一个任务列表指派到线程的策略思考:已知:1. 一个待执行的任务列表,2. 指定要启动的线程数;问题是:每个线程实际要执行哪些任务。

使用Java多线程实现这种任务分发的策略是:任务列表连续按线程数分段,先保证每线程平均能分配到的任务数,余下的任务从前至后依次附加到线程中--只是数量上,实际每个线程执行的任务都还是连续的。如果出现那种僧多(线程) 粥(任务) 少的情况,实际启动的线程数就等于任务数,一挑一。这里只实现了每个线程各扫自家门前雪,动作快的完成后眼见别的线程再累都是爱莫能助。

实现及演示代码如下:由三个类实现,写在了一个 Java 文件中:TaskDistributor 为任务分发器,Task 为待执行的任务,WorkThread 为自定的工作线程。代码中运用了命令模式,如若能配以监听器,用上观察者模式来控制 UI 显示就更绝妙不过了,就能实现像下载中的区块着色跳跃的动感了,在此定义下一步的着眼点了。

代码中有较为详细的注释,看这些注释和执行结果就很容易理解的。main() 是测试方法

  1. package?com.unmi.common;? ?
  2. import?java.util.ArrayList;? ?
  3. import?java.util.List;? ?
  4. /**? ?
  5. *?指派任务列表给线程的分发器? ?
  6. *?@author?Unmi? ?
  7. *?QQ:?1125535?Email:?fantasia@sina.com? ?
  8. *?MSN:?kypfos@msn.com?2008-03-25? ?
  9. */? ?
  10. public?class?TaskDistributor?{? ?
  11. /**? ?
  12. *?测试方法? ?
  13. *?@param?args? ?
  14. */? ?
  15. public?static?void?main(String[]?args)?{? ?
  16. //初始化要执行的任务列表? ?
  17. List?taskList?=?new?ArrayList();? ?
  18. for?(int?i?=?0;?i?<?108;?i++)?{? ?
  19. taskList.add(new?Task(i));? ?
  20. }? ?
  21. //设定要启动的工作线程数为?5?个? ?
  22. int?threadCount?=?5;? ?
  23. List[]?taskListPerThread?=?distributeTasks(taskList,?threadCount);? ?
  24. System.out.println("实际要启动的工作线程数:"+taskListPerThread.length);? ?
  25. for?(int?i?=?0;?i?<?taskListPerThread.length;?i++)?{? ?
  26. Thread?workThread?=?new?WorkThread(taskListPerThread[i],i);? ?
  27. workThread.start();? ?
  28. }? ?
  29. }? ?
  30. /**? ?
  31. *?把?List?中的任务分配给每个线程,先平均分配,剩于的依次附加给前面的线程? ?
  32. *?返回的数组有多少个元素?(List)?就表明将启动多少个工作线程? ?
  33. *?@param?taskList?待分派的任务列表? ?
  34. *?@param?threadCount?线程数? ?
  35. *?@return?列表的数组,每个元素中存有该线程要执行的任务列表? ?
  36. */? ?
  37. public?static?List[]?distributeTasks(List?taskList,?int?threadCount)?{? ?
  38. //?每个线程至少要执行的任务数,假如不为零则表示每个线程都会分配到任务? ?
  39. int?minTaskCount?=?taskList.size()?/?threadCount;? ?
  40. //?平均分配后还剩下的任务数,不为零则还有任务依个附加到前面的线程中? ?
  41. int?remainTaskCount?=?taskList.size()?%?threadCount;? ?
  42. //?实际要启动的线程数,如果工作线程比任务还多? ?
  43. //?自然只需要启动与任务相同个数的工作线程,一对一的执行? ?
  44. //?毕竟不打算实现了线程池,所以用不着预先初始化好休眠的线程? ?
  45. int?actualThreadCount?=?minTaskCount?>?0???threadCount?:?remainTaskCount;? ?
  46. //?要启动的线程数组,以及每个线程要执行的任务列表? ?
  47. List[]?taskListPerThread?=?new?List[actualThreadCount];? ?
  48. int?taskIndex?=?0;? ?
  49. //平均分配后多余任务,每附加给一个线程后的剩余数,重新声明与?remainTaskCount? ?
  50. //相同的变量,不然会在执行中改变?remainTaskCount?原有值,产生麻烦? ?
  51. int?remainIndces?=?remainTaskCount;? ?
  52. for?(int?i?=?0;?i?<?taskListPerThread.length;?i++)?{? ?
  53. taskListPerThread[i]?=?new?ArrayList();? ?
  54. //?如果大于零,线程要分配到基本的任务? ?
  55. if?(minTaskCount?>?0)?{? ?
  56. for?(int?j?=?taskIndex;?j?<?minTaskCount?+?taskIndex;?j++)?{? ?
  57. taskListPerThread[i].add(taskList.get(j));? ?
  58. }? ?
  59. taskIndex?+=?minTaskCount;? ?
  60. }? ?
  61. //?假如还有剩下的,则补一个到这个线程中? ?
  62. if?(remainIndces?>?0)?{? ?
  63. taskListPerThread[i].add(taskList.get(taskIndex++));? ?
  64. remainIndces--;? ?
  65. }? ?
  66. }? ?
  67. //?打印任务的分配情况? ?
  68. for?(int?i?=?0;?i?<?taskListPerThread.length;?i++)?{? ?
  69. System.out.println("线程?"?+?i?+?"?的任务数:"?+?? ?
  70. ?
  71. taskListPerThread[i].size()?+?"?区间["? ?
  72. +?taskListPerThread[i].get(0).getTaskId()?+?","? ?
  73. +?taskListPerThread[i].get(taskListPerThread[i].size()?-?1).getTaskId()?+?"]");? ?
  74. }? ?
  75. return?taskListPerThread;? ?
  76. }? ?
  77. }? ?
  78. /**? ?
  79. *?要执行的任务,可在执行时改变它的某个状态或调用它的某个操作? ?
  80. *?例如任务有三个状态,就绪,运行,完成,默认为就绪态? ?
  81. *?要进一步完善,可为?Task?加上状态变迁的监听器,因之决定UI的显示? ?
  82. */? ?
  83. class?Task?{? ?
  84. public?static?final?int?READY?=?0;? ?
  85. public?static?final?int?RUNNING?=?1;? ?
  86. public?static?final?int?FINISHED?=?2;? ?
  87. private?int?status;? ?
  88. //声明一个任务的自有业务含义的变量,用于标识任务? ?
  89. private?int?taskId;? ?
  90. //任务的初始化方法? ?
  91. public?Task(int?taskId){? ?
  92. this.status?=?READY;? ?
  93. this.taskId?=?taskId;? ?
  94. }? ?
  95. /**? ?
  96. *?执行任务? ?
  97. */? ?
  98. public?void?execute()?{? ?
  99. //?设置状态为运行中? ?
  100. setStatus(Task.RUNNING);? ?
  101. System.out.println("当前线程?ID?是:"?+?Thread.currentThread().getName()? ?
  102. +"?|?任务?ID?是:"+this.taskId);? ?
  103. //?附加一个延时? ?
  104. try?{? ?
  105. Thread.sleep(1000);? ?
  106. }?catch?(InterruptedException?e)?{? ?
  107. e.printStackTrace();? ?
  108. }? ?
  109. //?执行完成,改状态为完成? ?
  110. setStatus(FINISHED);? ?
  111. }? ?
  112. public?void?setStatus(int?status)?{? ?
  113. this.status?=?status;? ?
  114. }? ?
  115. public?int?getTaskId()?{? ?
  116. return?taskId;? ?
  117. }? ?
  118. }? ?
  119. /**? ?
  120. *?自定义的工作线程,持有分派给它执行的任务列表? ?
  121. */? ?
  122. class?WorkThread?extends?Thread?{? ?
  123. //本线程待执行的任务列表,你也可以指为任务索引的起始值? ?
  124. private?List?taskList?=?null;? ?
  125. private?int?threadId;? ?
  126. /**? ?
  127. *?构造工作线程,为其指派任务列表,及命名线程?ID? ?
  128. *?@param?taskList?欲执行的任务列表? ?
  129. *?@param?threadId?线程?ID? ?
  130. */? ?
  131. public?WorkThread(List?taskList,int?threadId)?{? ?
  132. this.taskList?=?taskList;? ?
  133. this.threadId?=?threadId;? ?
  134. }? ?
  135. /**? ?
  136. *?执行被指派的所有任务? ?
  137. */? ?
  138. public?void?run()?{? ?
  139. for?(Task?task?:?taskList)?{? ?
  140. task.execute();? ?
  141. }? ?
  142. }? ?
  143. }?

执行结果如下,注意观察每个Java多线程分配到的任务数量及区间。直到所有的线程完成了所分配到的任务后程序结束:

  1. 线程?0?的任务数:22?区间[0,21]? ?
  2. 线程?1?的任务数:22?区间[22,43]? ?
  3. 线程?2?的任务数:22?区间[44,65]? ?
  4. 线程?3?的任务数:21?区间[66,86]? ?
  5. 线程?4?的任务数:21?区间[87,107]? ?
  6. 实际要启动的工作线程数:5? ?
  7. 当前线程?ID?是:Thread-0?|?任务?ID?是:0? ?
  8. 当前线程?ID?是:Thread-1?|?任务?ID?是:22? ?
  9. 当前线程?ID?是:Thread-2?|?任务?ID?是:44? ?
  10. 当前线程?ID?是:Thread-3?|?任务?ID?是:66? ?
  11. 当前线程?ID?是:Thread-4?|?任务?ID?是:87? ?
  12. 当前线程?ID?是:Thread-0?|?任务?ID?是:1? ?
  13. 当前线程?ID?是:Thread-1?|?任务?ID?是:23? ?
  14. 当前线程?ID?是:Thread-2?|?任务?ID?是:45?

上面坦白来只算是基本功夫,贴出来还真见笑了。还有更为复杂的功能.

像Java多线程的下载工具的确更充分利用了网络资源,而且像 FlashGet、NetAnts 都实现了:假如某个线程下载完了欲先所分配段的内容之后,会帮其他线程下载未完成数据,直到任务完成;或某一下载线程的未完成段区间已经很小了,用不着别人来帮忙时,这就涉及到任务的进一步分配。再如,以上两个工具都能动态增加、减小或中止线程,越说越复杂了,它们原本比这复杂多了,这些实现可能定义各种队列来实现,如未完成任务队列、下载中任务队列和已完成队列等。

热点排行