Spark通过CQL读取写入Cassandra数据
val sc = new SparkContext("local[4]", "whitebox_test")val job = new Job()job.setInputFormatClass(classOf[CqlPagingInputFormat])ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost")ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160")ConfigHelper.setInputColumnFamily(job.getConfiguration(), "whitebox_test", "words")ConfigHelper.setInputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.Murmur3Partitioner")CqlConfigHelper.setInputWhereClauses(job.getConfiguration(),"paragraph=english")
上面这段代码跟之前的不同之处在于多了最后一行的CqlConfigHelper,CQL其实形式上面有点类似于SQL的,所以最后添加的一句就类似于SQL中的"where paragraph=english"。
?
?
然后我们需要获取从Cassandra中读入数据的RDD,这步也和之前说道的一样,只是outputFormat不同而已,如下:
?
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[CqlPagingInputFormat], classOf[Map[String, ByteBuffer]],//key classOf[Map[String, ByteBuffer]])//value
?相关的信息可以在这里找到。这样这里casRdd的格式应该是RDD[(Map[String, ByteBuffer],Map[String, ByteBuffer])]。第一个map是key的对应关系,这里的key包括了partition key和cluster columns。Cassandra和关系型数据库一样在使用CQL的时候可以指定一个组为key。在关系型数据库中,可以指定如"CONSTRAINT pk_personid primary key (id, lastname)"。Cassandra中也可以做出类似的指定如"primary key(a, b, c, d)",这其中a就是partition key,bcd就是cluster columns。第二个map是非key部分的对应关系。然后就可以使用这个RDD了,如下
?
val paraRdd = casRdd flatMap { case (key, value) => { value.filter(v => { (v._1).compareTo("content") == 0 }).map(v => ByteBufferUtil.string(v._2)) }}.map(content => (content, 1)).reduceByKey(_+_)
?以上的代码完成了最基本的字数统计的功能,和之前一样,就是计算了每个单词出现的次数。
?
接着如果需要写入数据到Cassandra,还得需要设定Output的config,跟之前的类似唯一不同的是需要指定输入语句。具体如下:
?
job.setOutputFormatClass(classOf[CqlOutputFormat])ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160")ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost")ConfigHelper.setOutputColumnFamily(job.getConfiguration, KEYSPACE, "stats")ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.Murmur3Partitioner")val ps = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + " SET num = ?, largerfive=?, content = ?, ip=?"CqlConfigHelper.setOutputCql(job.getConfiguration(), ps)
?这里比较有意思的是,无法使用insert语句,只能使用update语句,而且也不需要指定key之类的信息,key的信息是在RDD中进行指定的。这里的?其实就是preparedStatement中的?一样的,只是占位符。在后面的RDD中才去指定值。如下
?
counts.map{ case (word, count) => { val partitionMap = new util.LinkedHashMap[String, ByteBuffer]{} if(word!="") { partitionMap.put("word", ByteBufferUtil.bytes(word)) } else { partitionMap.put("word", ByteBufferUtil.bytes("empty")) } val columnList = new ArrayList[ByteBuffer] columnList.add(ByteBufferUtil.bytes(count)) if(count>5){ columnList.add(ByteBuffer.wrap(TRUE)) } else { columnList.add(ByteBuffer.wrap(FALSE)) } columnList.add(ByteBufferUtil.bytes("Statistics for "+word)) val address = InetAddress.getByAddress(ip); columnList.add(ByteBufferUtil.bytes(address)) (partitionMap, columnList) }}.saveAsNewAPIHadoopFile(KEYSPACE,classOf[Map[String, ByteBuffer]], classOf[List[ByteBuffer]],classOf[CqlOutputFormat], job.getConfiguration()
这里也是需要注意的,partitionMap就是key的对应,这个还是比较好理解的。但是非key部分这里使用的是List[ByteBuffer]。这样子的话,这个list中的顺序就必须和之前那个update语句中声明的顺序一致了。否则将会抛出错误。还有值得说的就是,ByteBufferUtil里面有个function是给IP地址使用的,但是我们需要先将ip String转换成InetAddress的形式才能够使用。
?
到了这里,就基本到spark和cassandra通讯的部分基本说完了。还有一些复杂的设定,可以根据自己需求来设置。
周六晚上,enjoy?
?
?
?
?