一段分析数据的总结
业务背景:? ? 有一个表中存在1亿的数据,存着个人信息(城市、单位、学校等内容),而这些内容在字段中存储的是json,需要将这些数据分析后,算出这些内容的离散模型,取出TOP100。? ??? ? 最先的实现方案是批量查出来1W数据,分析后一条条更新,发现更新太慢。随后做了如下改进,也是总结:?1、先查询后更新的操作,用MySQL考虑 ON DUPLICATE KEY UPDATE? ?但是表中必须有主键或者唯一索引?2、大量操作要批量更新,eg:????INSERT?INTO?sina_user_count(name,type,count)?VALUES?(?,?,?),(?,?,?),(?,?,?)...? ? ? ? ? ? ? ? ? ? ? ??ON DUPLICATE KEY UPDATE?count=count+VALUES(count)?3、不要拼接字符串,要用占位符“?”,因为存储内容中有特殊字符,如果拼接SQL会遇到异常?4、循环中注意清除缓存的数据,这是写了一个Bug代码片段如下:
public void analysis() { final long countId = 1; Map<String, Integer> comps = new HashMap<String, Integer>(1000); Map<String, Object> row = this.findForMap("select * from table_count where id=?", countId); Long maxId = (Long) row.get("count"); List<Map<String, Object>> list = this.findForList("select * from table where id> ? limit ?", maxId, 10000); while (list != null && !list.isEmpty()) { for (Map<String, Object> map : list) { maxId = (Long) map.get("id"); //获取json信息 String careers = (String) map.get("careers"); if (careers != null)) { try { JSONArray array = JSONArray.fromObject(careers); for (int i = 0; i < array.size(); i++) { JSONObject obj = array.getJSONObject(i); count(comps, obj.getString("company")); } } catch (Exception e) { logger.error(e.getMessage()); } } //更新 logger.info(">>>>>>>>>>>>>>>>> update start"); updateCount(comps, 1); logger.info(">>>>>>>>>>>>>>>>>update end"); //更新最大ID和查询 this.update("update_sina_user_count_max", maxId, countId); logger.info(">>>>>>>>>>>>>>>>>>>=" + maxId); list = this.findForList("query_sina_user_1", maxId, 10000); logger.info(">>>>>>>>>>>>>>>>>list.size=" + list.size()); } } private void count(Map<String, Integer> map, String key) { if (DataUtil.isEmpty(key) || key.length() < 2) return; if (map.containsKey(key)) { Integer val = map.get(key); map.put(key, ++val); } else { map.put(key, 1); } } private void updateCount(Map<String, Integer> map, int type) { StringBuilder content = new StringBuilder(); List<Object> params = Lists.newArrayList(); int i = 0; for (String name : map.keySet()) { Integer count = map.get(name); content.append("(?,?,?)"); params.add(name); params.add(type); params.add(count); if (++i < 10000) { content.append(","); } else { String sql = "INSERT into table_count(name,type,count) VALUES " + content.toString() + " ON DUPLICATE KEY UPDATE count=count+VALUES(count)"; logger.info(">>>>>>>>>>>>>>>>>inner:" + i + ",type:" + type); this.getJdbc(0).update(sql, params.toArray()); content = new StringBuilder(); i = 0; params.clear(); } } if (!params.isEmpty()) { String sql = "INSERT into table_count(name,type,count) VALUES " + content.toString() + "('最大ID',0,0) ON DUPLICATE KEY UPDATE count=count+VALUES(count)"; logger.info(">>>>>>>>>>>>>>>>>outer:" + i + ",type:" + type); this.update(sql, params.toArray()); } //第一版程序没有此代码,导致map越来越大,低级错误啊 map.clear(); }?