首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > Mysql >

地图reduce处理结果向输出至mysql(直接插入/更新/追加式更新)

2013-09-09 
mapreduce处理结果向输出至mysql(直接插入/更新/追加式更新)Mapreduce处理结果向输出至mysql?1.写入mysql

mapreduce处理结果向输出至mysql(直接插入/更新/追加式更新)

Mapreduce处理结果向输出至mysql

?

1.写入mysql

<1>job中输出的配置:

DBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,

MySQLConstant.MYSQL_FIX_OPEN_FIRST_FIELDS);

(DBOutputFormat为hadoop自带API,将输入插入数据库)

public?final?static?String

MYSQL_FIX_USER?=

"ipj_fix_user";

public?final?static?String[]

MYSQL_FIX_OPEN_FIRST_FIELDS?= {"app_id","version","imei","first_open","date"};

?

<2>reduce中写入:

private?FixOpenAppFirstRecord?record?=?new?FixOpenAppFirstRecord();

?

record.setApp_id(Integer.parseInt(app_id));

record.setImei(imei);

record.setVersion(version);

record.setFirst_open(exactDate);

record.setDate(date);

context.write(record,?NULL);

?

<3>FixOpenAppFirstRecord中字段的顺序配置(只列出一条):

@Override

public?void?write(PreparedStatement statement)?throws?SQLException {

statement.setInt(1,?app_id);

statement.setString(2,?version);

statement.setString(3,?imei);

statement.setString(4,?first_open);

statement.setString(5,?date);

}

<4>注意:

1.reduce中record的set值的顺序无所谓,可以任意

2.job的mysql字段MYSQL_FIX_OPEN_FIRST_FIELDS的顺序一定要和类FixOpenAppFirstRecord中字段的配置顺序一致

2.更新mysql(改变值)

<1>job中输出的配置:

FixDBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,

MySQLConstant.MYSQL_FIX_IS_TAOBAO_FIELDS);

(FixDBOutputFormat为自定义Format类,用于更新mysql)

public?final?static?String

MYSQL_FIX_USER?=

"ipj_fix_user";

public?final?static?String[]

MYSQL_FIX_IS_WEIBO_FIELDS?= {"is_weibo","app_id","version","imei"};

<2>reduce中写入:

private?FixIsMallUserRecord?record?=?new?FixIsMallUserRecord();

record.setApp_id(Integer.parseInt(app_id));

record.setVersion(version);

record.setImei(imei);

record.setIs_taobao(is_taobao);

context.write(record,?NULL);

<3>FixIsMallUserRecord?中字段的顺序配置(只列出一条):

@Override

public?void?readFields(ResultSet resultSet)?throws?SQLException {

this.is_taobao?= resultSet.getInt(1);

this.app_id?= resultSet.getInt(2);

this.version?= resultSet.getString(3);

this.imei?= resultSet.getString(4);

}

<4>FixDBOutputFormat中关键的拼接sql代码:

public?String constructQuery(String table, String[] fieldNames) {

if?(fieldNames ==?null) {

throw?new?IllegalArgumentException(

"Field names may not be null");

}

?

StringBuilder query =?new?StringBuilder();

query.append("UPDATE ").append(table);

if?(fieldNames.length?> 0 && fieldNames[0] !=?null

&& fieldNames[1] !=?null&& fieldNames[2] !=?null

&& fieldNames[3] !=?null) {

query.append(" SET ");

query.append(fieldNames[0] +?" =?");

query.append(" WHERE ");

query.append(fieldNames[1] +?" =?");

query.append(" AND ");

query.append(fieldNames[2] +?" =?");

query.append(" AND ");

query.append(fieldNames[3] +?" =?");

return?query.toString();

}

return?null;

}

<5>注意:

1.reduce中record的set值的顺序无所谓,可以任意

2.job的mysql字段MYSQL_FIX_IS_WEIBO_FIELDS的顺序一定要和类

3.2中的顺序也一定要和FixDBOutputFormat类中的更新顺序一致(第一个参数为要更新的值,第二三四个参数分别为条件参数)

3.更新mysql(值的追加)

<1>job中输出的配置:

FixAppendDBOutputFormat.setOutput(this, MySQLConstant.MYSQL_FIX_USER,

MySQLConstant.MYSQL_FIX_MALL_LOGIN_FIELDS);

(FixAppendDBOutputFormat自定义Format,用户更新mysql[追加])

public?final?static?String

MYSQL_FIX_USER?=

"ipj_fix_user";

public?final?static?String[]

MYSQL_FIX_MALL_LOGIN_FIELDS?= {"login_taobao_count","app_id","version","imei"};

<2>reduce中写入:

private?FixMallTotalLoginRecord?record?=?new?FixMallTotalLoginRecord();

record.setApp_id(Integer.parseInt(app_id));

record.setVersion(version);

record.setImei(imei);

record.setLogin_taobao_count(num);

context.write(record,?NULL);

<3>FixIsMallUserRecord?中字段的顺序配置(只列出一条):

@Override

public?void?write(PreparedStatement statement)?throws?SQLException {

statement.setInt(1,?login_taobao_count);

statement.setInt(2,?app_id);

statement.setString(3,?version);

statement.setString(4,?imei);

}

<4>FixAppendDBOutputFormat中关键的拼接sql代码

public?String constructQuery(String table, String[] fieldNames) {

if?(fieldNames ==?null) {

throw?new?IllegalArgumentException

("Field names may not be null");

}

StringBuilder query =?new?StringBuilder();

query.append("UPDATE ").append(table);

if?( fieldNames.length?> 0 &&

fieldNames[0] !=?null?&&

fieldNames[1] !=?null?&&

fieldNames[2] !=?null?&&

fieldNames[3] !=?null) {

?

query.append(" SET ");

query.append(fieldNames[0] +

" = "+fieldNames[0]+"+?");

query.append(" WHERE ");

query.append(fieldNames[1] +?" =?");

query.append(" AND ");

query.append(fieldNames[2] +?" =?");

query.append(" AND ");

query.append(fieldNames[3] +?" =?");

return?query.toString();

}

return?null;

}

<5>注意:

1.reduce中record的set值的顺序无所谓,可以任意

2.job的mysql字段MYSQL_FIX_MALL_LOGIN_FIELDS的顺序一定要和类

3.2中的顺序也一定要和FixAppendDBOutputFormat类中的更新顺序一致(第一个参数为要更新的值[在原有基础上增加],第二三四个参数分别为条件参数)

热点排行