Kafka Producer在发送消息时必须配置的参数为:bootstrap.servers、key.serializer、value.serializer。序列化操作是在拦截器(Interceptor)执行之后并且在分配分区(partitions)之前执行的。
首先我们通过一段示例代码来看下普通情况下Kafka Producer如何编写:
public class ProducerJavaDemo { public static final String brokerList = "192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092"; public static final String topic = "hidden-topic"; public static void main(String[] args) { Properties properties = new Properties(); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("client.id", "hidden-producer-client-id-1"); properties.put("bootstrap.servers", brokerList); Producer<String,String> producer = new KafkaProducer<String,String>(properties); while (true) { String message = "kafka_message-" + new Date().getTime() + "-edited by hidden.zhu"; ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message); try { Future<RecordMetadata> future = producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.print(metadata.offset()+" "); System.out.print(metadata.topic()+" "); System.out.println(metadata.partition()); } }); } catch (Exception e) { e.printStackTrace(); } try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } }
这里采用的客户端不是0.8.x.x时代的Scala版本,而是Java编写的新Kafka Producer, 相应的Maven依赖如下:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency>
上面的程序中使用的是Kafka客户端自带的org.apache.kafka.common.serialization.StringSerializer,除了用于String类型的序列化器之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口,此接口有三种方法:
public void configure(Map<String, ?> configs, boolean isKey):用来配置当前类。
public byte[] serialize(String topic, T data):用来执行序列化。
public void close():用来关闭当前序列化器。一般情况下这个方法都是个空方法,如果实现了此方法,必须确保此方法的幂等性,因为这个方法很可能会被KafkaProducer调用多次。
下面我们来看看Kafka中org.apache.kafka.common.serialization.StringSerializer的具体实现,源码如下:
public class StringSerializer implements Serializer<String> { private String encoding = "UTF8"; @Override public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) encodingValue = configs.get("serializer.encoding"); if (encodingValue != null && encodingValue instanceof String) encoding = (String) encodingValue; } @Override public byte[] serialize(String topic, String data) { try { if (data == null) return null; else return data.getBytes(encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); } } @Override public void close() { // nothing to do } }
首先看下StringSerializer中的configure(Map<String, ?> configs, boolean isKey)方法,这个方法的执行是在创建KafkaProducer实例的时候调用的,即执行代码Producer<String,String> producer = new KafkaProducer<String,String>(properties)时调用,主要用来确定编码类型,不过一般key.serializer.encoding或serializer.encoding都不会配置,更确切的来说在Kafka Producer Configs列表里都没有此项,所以一般情况下encoding的值就是UTF-8。serialize(String topic, String data)方法非常的直观,就是将String类型的data转为byte[]类型即可。
如果Kafka自身提供的诸如String、ByteArray、ByteBuffer、Bytes、Double、Integer、Long这些类型的Serializer都不能满足需求,读者可以选择使用如Avro、JSON、Thrift、ProtoBuf或者Protostuff等通用的序列化工具来实现,亦或者是使用自定义类型的Serializer来实现。下面就以一个简单的例子来介绍下如何自定义类型的使用方法。
假设我们要发送的消息都是Company对象,这个Company的定义很简单,只有名称name和地址address,具体如下:
public class Company { private String name; private String address; //省略Getter, Setter, Constructor & toString方法 }
接下去我们来实现Company类型的Serializer,即下面代码示例中的DemoSerializer。
package com.hidden.client; public class DemoSerializer implements Serializer<Company> { public void configure(Map<String, ?> configs, boolean isKey) {} public byte[] serialize(String topic, Company data) { if (data == null) { return null; } byte[] name, address; try { if (data.getName() != null) { name = data.getName().getBytes("UTF-8"); } else { name = new byte[0]; } if (data.getAddress() != null) { address = data.getAddress().getBytes("UTF-8"); } else { address = new byte[0]; } ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length); buffer.putInt(name.length); buffer.put(name); buffer.putInt(address.length); buffer.put(address); return buffer.array(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return new byte[0]; } public void close() {} }
使用时只需要在Kafka Producer的config中修改value.serializer属性即可,示例如下:
properties.put("value.serializer", "com.hidden.client.DemoSerializer"); //记得也要将相应的String类型改为Company类型,如: //Producer<String,Company> producer = new KafkaProducer<String,Company>(properties); //Company company = new Company(); //company.setName("hidden.cooperation-" + new Date().getTime()); //company.setAddress("Shanghai, China"); //ProducerRecord<String, Company> producerRecord = new ProducerRecord<String, Company>(topic,company);
示例中只修改了value.serializer,而key.serializer和value.serializer没有什么区别,如果有真实需要,修改以下也未尝不可。
接下一篇:Kafka消息序列化和反序列化(下)
欢迎支持笔者的作品《深入理解Kafka: 核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客(ID: hiddenkafka)。