首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

Spark透过CQL读取写入Cassandra数据

2013-11-27 
Spark通过CQL读取写入Cassandra数据val sc new SparkContext(local[4], whitebox_test)val job ne

Spark通过CQL读取写入Cassandra数据
val sc = new SparkContext("local[4]", "whitebox_test")val job = new Job()job.setInputFormatClass(classOf[CqlPagingInputFormat])ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost")ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160")ConfigHelper.setInputColumnFamily(job.getConfiguration(), "whitebox_test", "words")ConfigHelper.setInputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.Murmur3Partitioner")CqlConfigHelper.setInputWhereClauses(job.getConfiguration(),"paragraph=english")

上面这段代码跟之前的不同之处在于多了最后一行的CqlConfigHelper,CQL其实形式上面有点类似于SQL的,所以最后添加的一句就类似于SQL中的"where paragraph=english"。

?

?

然后我们需要获取从Cassandra中读入数据的RDD,这步也和之前说道的一样,只是outputFormat不同而已,如下:

?

val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),    classOf[CqlPagingInputFormat],    classOf[Map[String, ByteBuffer]],//key    classOf[Map[String, ByteBuffer]])//value

?相关的信息可以在这里找到。这样这里casRdd的格式应该是RDD[(Map[String, ByteBuffer],Map[String, ByteBuffer])]。第一个map是key的对应关系,这里的key包括了partition key和cluster columns。Cassandra和关系型数据库一样在使用CQL的时候可以指定一个组为key。在关系型数据库中,可以指定如"CONSTRAINT pk_personid primary key (id, lastname)"。Cassandra中也可以做出类似的指定如"primary key(a, b, c, d)",这其中a就是partition key,bcd就是cluster columns。第二个map是非key部分的对应关系。然后就可以使用这个RDD了,如下

?

val paraRdd = casRdd flatMap {    case (key, value) => {        value.filter(v => {            (v._1).compareTo("content") == 0        }).map(v => ByteBufferUtil.string(v._2))    }}.map(content => (content, 1)).reduceByKey(_+_)

?以上的代码完成了最基本的字数统计的功能,和之前一样,就是计算了每个单词出现的次数。

?

接着如果需要写入数据到Cassandra,还得需要设定Output的config,跟之前的类似唯一不同的是需要指定输入语句。具体如下:

?

job.setOutputFormatClass(classOf[CqlOutputFormat])ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160")ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost")ConfigHelper.setOutputColumnFamily(job.getConfiguration, KEYSPACE, "stats")ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.Murmur3Partitioner")val ps = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + " SET num = ?, largerfive=?, content = ?, ip=?"CqlConfigHelper.setOutputCql(job.getConfiguration(), ps)

?这里比较有意思的是,无法使用insert语句,只能使用update语句,而且也不需要指定key之类的信息,key的信息是在RDD中进行指定的。这里的?其实就是preparedStatement中的?一样的,只是占位符。在后面的RDD中才去指定值。如下

?

counts.map{    case (word, count) => {        val partitionMap = new util.LinkedHashMap[String, ByteBuffer]{}        if(word!="") {            partitionMap.put("word", ByteBufferUtil.bytes(word))        } else {            partitionMap.put("word", ByteBufferUtil.bytes("empty"))        }        val columnList = new ArrayList[ByteBuffer]        columnList.add(ByteBufferUtil.bytes(count))        if(count>5){            columnList.add(ByteBuffer.wrap(TRUE))        } else {            columnList.add(ByteBuffer.wrap(FALSE))        }        columnList.add(ByteBufferUtil.bytes("Statistics for "+word))        val address = InetAddress.getByAddress(ip);        columnList.add(ByteBufferUtil.bytes(address))        (partitionMap, columnList)    }}.saveAsNewAPIHadoopFile(KEYSPACE,classOf[Map[String, ByteBuffer]],    classOf[List[ByteBuffer]],classOf[CqlOutputFormat], job.getConfiguration()

这里也是需要注意的,partitionMap就是key的对应,这个还是比较好理解的。但是非key部分这里使用的是List[ByteBuffer]。这样子的话,这个list中的顺序就必须和之前那个update语句中声明的顺序一致了。否则将会抛出错误。还有值得说的就是,ByteBufferUtil里面有个function是给IP地址使用的,但是我们需要先将ip String转换成InetAddress的形式才能够使用。

?

到了这里,就基本到spark和cassandra通讯的部分基本说完了。还有一些复杂的设定,可以根据自己需求来设置。

周六晚上,enjoySpark透过CQL读取写入Cassandra数据?

?

?

?

?

热点排行