首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

getting start with storm 通译 第八章 part-3

2013-10-10 
getting start with storm 翻译 第八章 part-3转载请注明出处:http://blog.csdn.net/lonelytrooper/articl

getting start with storm 翻译 第八章 part-3

转载请注明出处:http://blog.csdn.net/lonelytrooper/article/details/12435753分区的事务性Spouts

对于spout来说,从一个分区的集合中读取批量的元组是很常见的。继续这个例子,你可以有几个Redis数据库并且tweets分散在这些Redis数据库中。通过实现IPartitionedTransactionalSpout,Storm提供了一些工具来管理每个分区的状态并确保重放的能力。

我们看一下怎样修改前边的TweetsTransactionalSpout以使得它可以处理分区。

首先,继承BasePartitionedTransactionalSpout,它实现了IPartitionedTransactionalSpout接口。

public class TweetsPartitionedTransactionalSpoutextends

BasePartitionedTransactionalSpout<TransactionMetadata> {

...

}

告诉Storm,哪个是你的协调器。

public static class TweetsPartitionedTransactionalCoordinatorimplementsCoordinator{

@Override

public intnumPartitions() {

return 4;

}

@Override

public booleanisReady() {

return true;

}

@Override

public voidclose() {

}

}

在这个例子中,协调器非常简单。在numPartitions方法中,告诉Storm你有多少个分区。同时也要注意到你没有返回任何元数据。在一个IPartitionedTransactionalSpout中,元数据被发射器直接管理。

我们看一下发射器的实现。

public static class TweetsPartitionedTransactionalEmitter

implements Emitter<TransactionMetadata> {

PartitionedRQ rq =newPartitionedRQ();

@Override

public TransactionMetadataemitPartitionBatchNew(TransactionAttempt tx,

BatchOutputCollector collector,intpartition,

TransactionMetadata lastPartitionMeta) {

long nextRead;

if(lastPartitionMeta==null)

nextRead =rq.getNextRead(partition);

else {

nextRead =lastPartitionMeta.from+lastPartitionMeta.quantity;

rq.setNextRead(partition,nextRead);// Move the cursor

}

long quantity=rq.getAvailableToRead(partition,nextRead);

quantity =quantity>MAX_TRANSACTION_SIZE?MAX_TRANSACTION_SIZE:quantity;

TransactionMetadata metadata =newTransactionMetadata(nextRead, (int)quantity);

emitPartitionBatch(tx,collector,partition,metadata);

return metadata;

}

@Override

public voidemitPartitionBatch(TransactionAttempttx,BatchOutputCollector

collector,intpartition,TransactionMetadatapartitionMeta) {

if(partitionMeta.quantity<=0)

return ;

List<String>messages=rq.getMessages(partition,partitionMeta.from,

partitionMeta.quantity);

long tweetId=partitionMeta.from;

for (String msg:messages) {

collector.emit(newValues(tx,""+tweetId,msg));

tweetId ++;

}

}

@Override

public voidclose() {

}

}

这里有两个重要的方法,emitPartitionBatchNew 和emitPartitionBatch。在emitPartitionBatch中,你从Storm接收partition参数,它告诉你从哪个分区检索批量的元组。在这个方法中,决定检索哪些tweets,并产生相应的元数据,调用emitPartitionBatch,返回元数据,它会被立即存储在zookeeper中。

因为事务遍布所有的分区,所以Storm会为每个分区发送相同的事务ID。在emitPartitionBatch中,从分区中读取tweets并且发送该批次的元组到topology。如果该批次失败了,Storm会用存储的元数据来调用emitPartitionBatch以实现该批次的重放。

你可以在ch08-transactionaltopologies on GitHub检查代码。

不透明事务Topologies

目前为止,你可能会假定对于相同的事务ID,重放一个批次的元组总是可能的。但是在一些场景下,它可能是不可行的。到底发生了什么?

原来,你仍然可以完成明确的一次语义,但是假如事务被Storm重放的话,你需要更多的开发工作来保存之前的状态。因为你可以获得相同事务ID的不同元组,当在不同时刻发射时,你需要重置到之前的状态并从那里开始。

例如,你要对收到的tweets的总数计数,你现在已经数了五个,在最后一个事务ID为321的事务中,你数了八个。你可以保存这三个值--previousCount=5, currentCount=13和lastTransactionId=321。假如事务ID为321的事务被再次发送并且因为你得到了不同的元组,你数了四个而不是八个,提交者会检测到这是同一个事务ID,它会将previousCount重置为5,加上新得到的4并将currentCount更新到9。

另外,如果前一个事务被取消,那么每个被并行处理的当前事务都会被取消掉。这是为了确保你不会在中间过程丢掉任何东西。

你的spout应该实现IOpaquePartitionedTransactionalSpout接口并且正如你看到的,协调器和发射器非常简单。

public static class TweetsOpaquePartitionedTransactionalSpoutCoordinatorimplements

IOpaquePartitionedTransactionalSpout.Coordinator{

@Override

public booleanisReady() {

return true;

}

}

public static class TweetsOpaquePartitionedTransactionalSpoutEmitterimplements

IOpaquePartitionedTransactionalSpout.Emitter<TransactionMetadata> {

PartitionedRQ rq =newPartitionedRQ();

@Override

public TransactionMetadataemitPartitionBatch(TransactionAttempt tx,

BatchOutputCollector collector,intpartition,

TransactionMetadata lastPartitionMeta) {

long nextRead;

if(lastPartitionMeta==null)

nextRead =rq.getNextRead(partition);

else {

nextRead =lastPartitionMeta.from+lastPartitionMeta.quantity;

rq.setNextRead(partition,nextRead);// Move the cursor

}

long quantity=rq.getAvailableToRead(partition,nextRead);

quantity =quantity>MAX_TRANSACTION_SIZE?MAX_TRANSACTION_SIZE:quantity;

TransactionMetadata metadata =newTransactionMetadata(nextRead, (int)quantity);

emitMessages(tx,collector,partition,metadata);

return metadata;

}

private voidemitMessages(TransactionAttempttx,BatchOutputCollectorcollector,

int partition,TransactionMetadatapartitionMeta) {

if(partitionMeta.quantity<=0)

return ;

List<String>messages=

rq.getMessages(partition,partitionMeta.from,partitionMeta.quantity);

long tweetId=partitionMeta.from;

for (String msg:messages) {

collector.emit(newValues(tx,""+tweetId,msg));

tweetId ++;

}

}

@Override

public intnumPartitions() {

return 4;

}

@Override

public voidclose() {

}

}

最有趣的方法是emitPartitionBatch,它接收前边提交的元数据。你应该使用该元数据信息来产生一个批次的元组。该批次不必完全相同,正如前边所述,你可能无法重新制造相同的批次。剩余的工作由提交者bolts处理,它使用之前的状态。

热点排行