首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

[通译][Trident] Storm Trident state 原理

2014-01-25 
[翻译][Trident] Storm Trident state 原理(当然,在Kafka开启replication功能时,transactional spout也是

[翻译][Trident] Storm Trident state 原理

(当然,在Kafka开启replication功能时,transactional spout也是可以做到容错的)

在讨论"opaque transactional" spout之前,我们先来看看你应该怎样设计一个State来实现transactional spout的有且只有一次执行的语义。这个State的类型是"transactional state" 并且它利用了任何一个txid总是对应同样的tuple序列这个语义。

假如说你有一个用来计算单词出现次数的topology,你想要将单词的出现次数以key/value对的形式存储到数据库中。key就是单词,value就是这个这个单词出现的次数。你已经看到只是存储一个数量是不足以知道你是否已经处理过一个batch的。你可以通过将value和txid一起存储到数据库中。这样的话,当更新这个count之前,你可以先去比较数据库中存储的txid和现在要存储的txid。如果一样,就跳过什么都不做,因为这个value之前已经被处理过了。如果不一样,就执行存储。这个逻辑可以工作的前提就是txid永不改变,并且Trident保证状态的更新是在batch之间严格顺序进行的。

考虑下面这个例子的运行逻辑, 假定你在处理一个txid为3的包含下面tuple的batch:

?

Opaque transactional state有着最为强大的容错性。但是这是以存储更多的信息作为代价的。Transactional states 需要存储较少的状态信息,但是仅能和 transactional spouts协同工作. Finally, non-transactional state所需要存储的信息最少,但是却不能实现有且只有一次被成功处理的语义。

State和Spout类型的选择其实是一种在容错性和存储消耗之间的权衡,你的应用的需要会决定那种组合更适合你。

State APIs

你已经看到一些错综复杂的方法来实现有且只有一次被执行的语义。Trident这样做的好处把所有容错相关的逻辑都放在了State里面。 作为一个用户,你并不需要自己去处理复杂的txid,存储多余的信息到数据库中,或者是任何其他类似的事情。你只需要写如下这样简单的code:

TridentTopology topology = new TridentTopology();TridentState wordCounts =      topology.newStream("spout1", spout)        .each(new Fields("sentence"), new Split(), new Fields("word"))        .groupBy(new Fields("word"))        .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))        .parallelismHint(6);

所有管理opaque transactional state所需的逻辑都在MemcachedState.opaque方法的调用中被涵盖了,除此之外,数据库的更新会自动以batch的形式来进行以避免多次访问数据库。

?

State的基本接口只包含下面两个方法:

public interface State {    void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream    void commit(Long txid);}

?当一个State更新开始时,以及当一个State更新结束时你都会被告知,并且会告诉你该次的txid。Trident并没有对你的state的工作方式有任何的假定。

?

?

假定你自己搭了一套数据库来存储用户位置信息,并且你想要在Trident中去访问这个数据。你的state的实现应该有用户信息的set、get方法

public class LocationDB implements State {      public void beginCommit(Long txid) {          }            public void commit(Long txid) {          }            public void setLocation(long userId, String location) {        // code to access database and set location      }            public String getLocation(long userId) {        // code to get location from database      }  }

?然后你还需要提供给Trident一个StateFactory来在Trident的task中创建你的State对象。LocationDB 的?StateFactory可能会如下所示:

public class LocationDBFactory implements StateFactory {     public State makeState(Map conf, int partitionIndex, int numPartitions) {        return new LocationDB();     }   }  

?Trident提供了一个QueryFunction接口用来实现Trident中在一个source state上查询的功能。同时还提供了一个StateUpdater来实现Trident中更新source state的功能。比如说,让我们写一个查询地址的操作,这个操作会查询LocationDB来找到用户的地址。让我们以怎样在topology中实现该功能开始,假定这个topology会接受一个用户id作为输入数据流。

TridentTopology topology = new TridentTopology();  TridentState locations = topology.newStaticState(new LocationDBFactory());  topology.newStream("myspout", spout)          .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))  

?接下来让我们一起来看看QueryLocation 的实现应该是什么样的:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {      public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {          List<String> ret = new ArrayList();          for(TridentTuple input: inputs) {              ret.add(state.getLocation(input.getLong(0)));          }          return ret;      }        public void execute(TridentTuple tuple, String location, TridentCollector collector) {          collector.emit(new Values(location));      }      }  

?QueryFunction的执行分为两部分。首先Trident收集了一个batch的read操作并把他们统一交给batchRetrieve。在这个例子中,batchRetrieve会接受到多个用户id。batchRetrieve应该返还一个和输入tuple数量相同的result序列。result序列中的第一个元素对应着第一个输入tuple的结果,result序列中的第二个元素对应着第二个输入tuple的结果,以此类推。

你可以看到,这段代码并没有像Trident那样很好的利用batch的优势,而是为每个输入tuple去查询了一次LocationDB。所以一种更好的操作LocationDB方式应该是这样的:

public class LocationDB implements State {      public void beginCommit(Long txid) {          }            public void commit(Long txid) {          }            public void setLocationsBulk(List<Long> userIds, List<String> locations) {        // set locations in bulk      }            public List<String> bulkGetLocations(List<Long> userIds) {        // get locations in bulk      }  }  

?接下来,你可以这样改写上面的QueryLocation:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {      public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {          List<Long> userIds = new ArrayList<Long>();          for(TridentTuple input: inputs) {              userIds.add(input.getLong(0));          }          return state.bulkGetLocations(userIds);      }        public void execute(TridentTuple tuple, String location, TridentCollector collector) {          collector.emit(new Values(location));      }      }  

?通过有效减少访问数据库的次数,这段代码比上一个实现会高效的多。

?

如何你要更新State,你需要使用StateUpdater接口。下面是一个StateUpdater的例子用来将新的地址信息更新到LocationDB当中。

public class LocationUpdater extends BaseStateUpdater<LocationDB> {      public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {          List<Long> ids = new ArrayList<Long>();          List<String> locations = new ArrayList<String>();          for(TridentTuple t: tuples) {              ids.add(t.getLong(0));              locations.add(t.getString(1));          }          state.setLocationsBulk(ids, locations);      }  }  

?下面列出了你应该如何在Trident topology中使用上面声明的LocationUpdater:

TridentTopology topology = new TridentTopology();  TridentState locations =       topology.newStream("locations", locationsSpout)          .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())  

?partitionPersist 操作会更新一个State。其内部是将?State和一批更新的tuple交给StateUpdater,由StateUpdater完成相应的更新操作。

在这段代码中,只是简单的从输入的tuple中提取处userid和对应的location,并一起更新到State中。

partitionPersist 会返回一个TridentState对象来表示被这个Trident topoloy更新过的location db。 然后你就可以使用这个state在topology的任何地方进行查询操作了。

同时,你也可以看到我们传了一个TridentCollector给StateUpdaters。 emit到这个collector的tuple就会去往一个新的stream。在这个例子中,我们并没有去往一个新的stream的需要,但是如果你在做一些事情,比如说更新数据库中的某个count,你可以emit更新的count到这个新的stream。然后你可以通过调用TridentState#newValuesStream方法来访问这个新的stream来进行其他的处理。

persistentAggregate

Trident有另外一种更新State的方法叫做persistentAggregate。 你在之前的word count例子中应该已经见过了,如下:

TridentTopology topology = new TridentTopology();          TridentState wordCounts =        topology.newStream("spout1", spout)          .each(new Fields("sentence"), new Split(), new Fields("word"))          .groupBy(new Fields("word"))          .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))  

?persistentAggregate是在partitionPersist之上的另外一层抽象。它知道怎么去使用一个Trident 聚合器来更新State。在这个例子当中,因为这是一个group好的stream,Trident会期待你提供的state是实现了MapState接口的。用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中。MapState接口看上去如下所示:

public interface MapState<T> extends State {      List<T> multiGet(List<List<Object>> keys);      List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);      void multiPut(List<List<Object>> keys, List<T> vals);  }  

?当你在一个未经过group的stream上面进行聚合的话,Trident会期待你的state实现Snapshottable接口:

public interface Snapshottable<T> extends State {      T get();      T update(ValueUpdater updater);      void set(T o);  }  

?MemoryMapState?和?MemcachedState?都实现了上面的2个接口。

Implementing Map States

在Trident中实现MapState是非常简单的,它几乎帮你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 类实现了所有相关的逻辑,包括容错的逻辑。你只需要将一个IBackingMap 的实现提供给这些类就可以了。IBackingMap接口看上去如下所示:

public interface IBackingMap<T> {      List<T> multiGet(List<List<Object>> keys);       void multiPut(List<List<Object>> keys, List<T> vals);   }  

?OpaqueMap's会用OpaqueValue的value来调用multiPut方法,TransactionalMap's会提供TransactionalValue中的value,而NonTransactionalMaps只是简单的把从Topology获取的object传递给multiPut。

Trident还提供了一种CachedMap类来进行自动的LRU cache。

另外,Trident 提供了?SnapshottableMap?类将一个MapState 转换成一个 Snapshottable 对象.

大家可以看看?MemcachedState的实现,从而学习一下怎样将这些工具组合在一起形成一个高性能的MapState实现。MemcachedState是允许大家选择使用opaque transactional, transactional, 还是 non-transactional 语义的。

?

作者:derekjiang

转载:[翻译][Trident] Storm Trident state 原理

?

热点排行