mapreduce处理结果向输出至mysql(直接插入/更新/追加式更新)
Mapreduce处理结果向输出至mysql
?
1.写入mysql
<1>job中输出的配置:
DBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,
MySQLConstant.MYSQL_FIX_OPEN_FIRST_FIELDS);
(DBOutputFormat为hadoop自带API,将输入插入数据库)
public?final?static?String
MYSQL_FIX_USER?=
"ipj_fix_user";
public?final?static?String[]
MYSQL_FIX_OPEN_FIRST_FIELDS?= {"app_id","version","imei","first_open","date"};
?
<2>reduce中写入:
private?FixOpenAppFirstRecord?record?=?new?FixOpenAppFirstRecord();
?
record.setApp_id(Integer.parseInt(app_id));
record.setImei(imei);
record.setVersion(version);
record.setFirst_open(exactDate);
record.setDate(date);
context.write(record,?NULL);
?
<3>FixOpenAppFirstRecord中字段的顺序配置(只列出一条):
@Override
public?void?write(PreparedStatement statement)?throws?SQLException {
statement.setInt(1,?app_id);
statement.setString(2,?version);
statement.setString(3,?imei);
statement.setString(4,?first_open);
statement.setString(5,?date);
}
<4>注意:
1.reduce中record的set值的顺序无所谓,可以任意
2.job的mysql字段MYSQL_FIX_OPEN_FIRST_FIELDS的顺序一定要和类FixOpenAppFirstRecord中字段的配置顺序一致
2.更新mysql(改变值)
<1>job中输出的配置:
FixDBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,
MySQLConstant.MYSQL_FIX_IS_TAOBAO_FIELDS);
(FixDBOutputFormat为自定义Format类,用于更新mysql)
public?final?static?String
MYSQL_FIX_USER?=
"ipj_fix_user";
public?final?static?String[]
MYSQL_FIX_IS_WEIBO_FIELDS?= {"is_weibo","app_id","version","imei"};
<2>reduce中写入:
private?FixIsMallUserRecord?record?=?new?FixIsMallUserRecord();
record.setApp_id(Integer.parseInt(app_id));
record.setVersion(version);
record.setImei(imei);
record.setIs_taobao(is_taobao);
context.write(record,?NULL);
<3>FixIsMallUserRecord?中字段的顺序配置(只列出一条):
@Override
public?void?readFields(ResultSet resultSet)?throws?SQLException {
this.is_taobao?= resultSet.getInt(1);
this.app_id?= resultSet.getInt(2);
this.version?= resultSet.getString(3);
this.imei?= resultSet.getString(4);
}
<4>FixDBOutputFormat中关键的拼接sql代码:
public?String constructQuery(String table, String[] fieldNames) {
if?(fieldNames ==?null) {
throw?new?IllegalArgumentException(
"Field names may not be null");
}
?
StringBuilder query =?new?StringBuilder();
query.append("UPDATE ").append(table);
if?(fieldNames.length?> 0 && fieldNames[0] !=?null
&& fieldNames[1] !=?null&& fieldNames[2] !=?null
&& fieldNames[3] !=?null) {
query.append(" SET ");
query.append(fieldNames[0] +?" =?");
query.append(" WHERE ");
query.append(fieldNames[1] +?" =?");
query.append(" AND ");
query.append(fieldNames[2] +?" =?");
query.append(" AND ");
query.append(fieldNames[3] +?" =?");
return?query.toString();
}
return?null;
}
<5>注意:
1.reduce中record的set值的顺序无所谓,可以任意
2.job的mysql字段MYSQL_FIX_IS_WEIBO_FIELDS的顺序一定要和类
3.2中的顺序也一定要和FixDBOutputFormat类中的更新顺序一致(第一个参数为要更新的值,第二三四个参数分别为条件参数)
3.更新mysql(值的追加)
<1>job中输出的配置:
FixAppendDBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,
MySQLConstant.MYSQL_FIX_MALL_LOGIN_FIELDS);
(FixAppendDBOutputFormat自定义Format,用户更新mysql[追加])
public?final?static?String
MYSQL_FIX_USER?=
"ipj_fix_user";
public?final?static?String[]
MYSQL_FIX_MALL_LOGIN_FIELDS?= {"login_taobao_count","app_id","version","imei"};
<2>reduce中写入:
private?FixMallTotalLoginRecord?record?=?new?FixMallTotalLoginRecord();
record.setApp_id(Integer.parseInt(app_id));
record.setVersion(version);
record.setImei(imei);
record.setLogin_taobao_count(num);
context.write(record,?NULL);
<3>FixIsMallUserRecord?中字段的顺序配置(只列出一条):
@Override
public?void?write(PreparedStatement statement)?throws?SQLException {
statement.setInt(1,?login_taobao_count);
statement.setInt(2,?app_id);
statement.setString(3,?version);
statement.setString(4,?imei);
}
<4>FixAppendDBOutputFormat中关键的拼接sql代码
public?String constructQuery(String table, String[] fieldNames) {
if?(fieldNames ==?null) {
throw?new?IllegalArgumentException
("Field names may not be null");
}
StringBuilder query =?new?StringBuilder();
query.append("UPDATE ").append(table);
if?( fieldNames.length?> 0 &&
fieldNames[0] !=?null?&&
fieldNames[1] !=?null?&&
fieldNames[2] !=?null?&&
fieldNames[3] !=?null) {
?
query.append(" SET ");
query.append(fieldNames[0] +
" = "+fieldNames[0]+"+?");
query.append(" WHERE ");
query.append(fieldNames[1] +?" =?");
query.append(" AND ");
query.append(fieldNames[2] +?" =?");
query.append(" AND ");
query.append(fieldNames[3] +?" =?");
return?query.toString();
}
return?null;
}
<5>注意:
1.reduce中record的set值的顺序无所谓,可以任意
2.job的mysql字段MYSQL_FIX_MALL_LOGIN_FIELDS的顺序一定要和类
3.2中的顺序也一定要和FixAppendDBOutputFormat类中的更新顺序一致(第一个参数为要更新的值[在原有基础上增加],第二三四个参数分别为条件参数)