首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

Kafka(二)Install ubuntu and Try more JAVA client

2012-09-09 
Kafka(2)Install ubuntu and Try more JAVA clientKafka(2)Install ubuntu and Try more JAVA client1. Tr

Kafka(2)Install ubuntu and Try more JAVA client
Kafka(2)Install ubuntu and Try more JAVA client

1. Try to setup this on windows.
download and install this file
http://scalasbt.artifactoryonline.com/scalasbt/sbt-native-packages/org/scala-sbt/sbt-launcher/0.11.3/sbt.msi

Unzip the kafka to working directory:
D:\tool\kafka-0.7.0
>sbt update
>sbt package

sbt is installed on windows, but still, it is hard to install kafka on windows

2. Try to setup on ubuntu12.04
>wget http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
>tar zxvf kafka-0.7.0-incubating-src.tar.gz
>mv kafka-0.7.0-incubating-src /opt/tools/kafka-0.7.0
>cd /opt/tools/kafka-0.7.0
>./sbt update
>./sbt package

start the server
>bin/zookeeper-server-start.sh config/zookeeper.properties
>bin/kafka-server-start.sh config/server.properties


3. Fix the Java Client Problem
Error Message:
[2012-06-11 17:55:00,109] WARN Exception causing close of session 0x137daf68ab70001 due to java.io.IOException: Connection reset by peer (org.apache.zookeeper.server.NIOServerCnxn)
[2012-06-11 17:55:00,110] INFO Closed socket connection for client /192.168.56.1:62003 which had sessionid 0x137daf68ab70001 (org.apache.zookeeper.server.NIOServerCnxn)

Solution:
server.properties
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
# may not be what you want.
hostname=x.x.x.x

#zk.connect=localhost:2181
zk.connect=x.x.x.x:2181

# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.sessiontimeout.ms=60000

zookeeper.properties
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=8000

We need to use real ip address here in configuration.

The Java Client sample codes are under this directory: D:\book\distributed\kafka-0.7.0-incubating-src\examples\src\main\java\kafka\examples

The class will be as follow:
package com.sillycat.magicneptune.example;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;

public class TestProducerMain {

public static void main(String[] args) {
Properties props2 = new Properties();
props2.put("zk.connect", "192.168.56.101:2181");
props2.put("serializer.class", "kafka.serializer.StringEncoder");
// This is added by myself for changing the default timeout 6000.
props2.put("zk.connectiontimeout.ms", "15000");
ProducerConfig config = new ProducerConfig(props2);
Producer<String, String> producer = new Producer<String, String>(config);

// The message is sent to a randomly selected partition registered in ZK
ProducerData<String, String> data = new ProducerData<String, String>(
"test", "test-message,it is ok now.adsfasdf1111222");
producer.send(data);
producer.close();
}
}

package com.sillycat.magicneptune.example;

import java.net.InetAddress;
import java.net.UnknownHostException;

import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;

public class TestConsumerMain {

public static void main(String[] args) {


try {
System.out.println(InetAddress.getLocalHost().getHostAddress());
} catch (UnknownHostException e) {
e.printStackTrace();
}
SimpleConsumer consumer = new SimpleConsumer("192.168.56.101", 9092, 10000,
1024000);

long offset = 0;
while (true) {
// create a fetch request for topic   test  , partition 0, current
// offset, and fetch size of 1MB
FetchRequest fetchRequest = new FetchRequest("test", 0, offset,
1000000);

// get the message set from the consumer and print them out
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for (MessageAndOffset msg : messages) {
System.out.println(ExampleUtils.getMessage(msg.message()) + "offset=" + offset);
// advance the offset after consuming each message
offset = msg.offset();
}
}
//consumer.close();
}

}


package com.sillycat.magicneptune.example;

import java.nio.ByteBuffer;

import kafka.message.Message;

public class ExampleUtils
{
  public static String getMessage(Message message)
  {
    ByteBuffer buffer = message.payload();
    byte [] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    return new String(bytes);
  }
}


references:
http://www.jonzobrist.com/2012/04/17/install-apache-kafka-and-zookeeper-on-ubuntu-10-04/
https://github.com/harrah/xsbt/wiki/Getting-Started-Setup
http://incubator.apache.org/kafka/faq.html
http://incubator.apache.org/kafka/quickstart.html
http://blog.sina.com.cn/s/blog_3fe961ae01011o4z.html
http://incubator.apache.org/kafka/faq.html

热点排行