Kafka(2)Install ubuntu and Try more JAVA client
Kafka(2)Install ubuntu and Try more JAVA client
1. Try to setup this on windows.
download and install this file
http://scalasbt.artifactoryonline.com/scalasbt/sbt-native-packages/org/scala-sbt/sbt-launcher/0.11.3/sbt.msi
Unzip the kafka to working directory:
D:\tool\kafka-0.7.0
>sbt update
>sbt package
sbt is installed on windows, but still, it is hard to install kafka on windows
2. Try to setup on ubuntu12.04
>wget http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
>tar zxvf kafka-0.7.0-incubating-src.tar.gz
>mv kafka-0.7.0-incubating-src /opt/tools/kafka-0.7.0
>cd /opt/tools/kafka-0.7.0
>./sbt update
>./sbt package
start the server
>bin/zookeeper-server-start.sh config/zookeeper.properties
>bin/kafka-server-start.sh config/server.properties
3. Fix the Java Client Problem
Error Message:
[2012-06-11 17:55:00,109] WARN Exception causing close of session 0x137daf68ab70001 due to java.io.IOException: Connection reset by peer (org.apache.zookeeper.server.NIOServerCnxn)
[2012-06-11 17:55:00,110] INFO Closed socket connection for client /192.168.56.1:62003 which had sessionid 0x137daf68ab70001 (org.apache.zookeeper.server.NIOServerCnxn)
Solution:
server.properties
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
# may not be what you want.
hostname=x.x.x.x
#zk.connect=localhost:2181
zk.connect=x.x.x.x:2181
# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.sessiontimeout.ms=60000
zookeeper.properties
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=8000
We need to use real ip address here in configuration.
The Java Client sample codes are under this directory: D:\book\distributed\kafka-0.7.0-incubating-src\examples\src\main\java\kafka\examples
The class will be as follow:
package com.sillycat.magicneptune.example;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;
public class TestProducerMain {
public static void main(String[] args) {
Properties props2 = new Properties();
props2.put("zk.connect", "192.168.56.101:2181");
props2.put("serializer.class", "kafka.serializer.StringEncoder");
// This is added by myself for changing the default timeout 6000.
props2.put("zk.connectiontimeout.ms", "15000");
ProducerConfig config = new ProducerConfig(props2);
Producer<String, String> producer = new Producer<String, String>(config);
// The message is sent to a randomly selected partition registered in ZK
ProducerData<String, String> data = new ProducerData<String, String>(
"test", "test-message,it is ok now.adsfasdf1111222");
producer.send(data);
producer.close();
}
}
package com.sillycat.magicneptune.example;
import java.net.InetAddress;
import java.net.UnknownHostException;
import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
public class TestConsumerMain {
public static void main(String[] args) {
try {
System.out.println(InetAddress.getLocalHost().getHostAddress());
} catch (UnknownHostException e) {
e.printStackTrace();
}
SimpleConsumer consumer = new SimpleConsumer("192.168.56.101", 9092, 10000,
1024000);
long offset = 0;
while (true) {
// create a fetch request for topic test , partition 0, current
// offset, and fetch size of 1MB
FetchRequest fetchRequest = new FetchRequest("test", 0, offset,
1000000);
// get the message set from the consumer and print them out
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for (MessageAndOffset msg : messages) {
System.out.println(ExampleUtils.getMessage(msg.message()) + "offset=" + offset);
// advance the offset after consuming each message
offset = msg.offset();
}
}
//consumer.close();
}
}
package com.sillycat.magicneptune.example;
import java.nio.ByteBuffer;
import kafka.message.Message;
public class ExampleUtils
{
public static String getMessage(Message message)
{
ByteBuffer buffer = message.payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
}
}
references:
http://www.jonzobrist.com/2012/04/17/install-apache-kafka-and-zookeeper-on-ubuntu-10-04/
https://github.com/harrah/xsbt/wiki/Getting-Started-Setup
http://incubator.apache.org/kafka/faq.html
http://incubator.apache.org/kafka/quickstart.html
http://blog.sina.com.cn/s/blog_3fe961ae01011o4z.html
http://incubator.apache.org/kafka/faq.html