avro序列化+反序列化

链接

Apache Avro: a New Format for Data Interchange
Apache Avro™ 1.8.2 Getting Started (Java)
Apache Avro™ 1.8.2 Specification

用于交换大数据的序列化项目

  • Protocol Buffers 基于定义好的数据结构生成代码后组装数据,同时使用注解解决定义与数据不一致的情况,导致数据量变大,解析困难;
  • Thrift FIXME
  • avro 支持生成代码的方式,也提供一种通用的方式处理数据,使用模式进行匹配,数据缺失使用默认值,数据不匹配则忽略。

avro特性

  • 生成代码方式或通用方式
  • 兼容性(schema 上定义规则)

    When an application expects fields that are not present, Avro provides a default value, specified in the schema. Avro ignores unexpected values that are present in data

  • schema定义数据格式,json方式
  • avro为了便于MapReduce的处理定义了一种容器文件格式(Container File Format)
  • avro可应用在RPC框架上
  • avro可排序 FIXME
  • avro在Hadoop生态系统中应用,扩展整个生态系统,使得支持版本兼容,编程语言兼容;

schema类型

simple_type
complex_type

根据工具+schema生成代码实体

java -jar avro-tools-1.8.2.jar compile schema schema.avsc java .
schema.avsc为schema文件,最后生成的代码在当前目录
avro-toos-1.8.2.jar下载地址

java实现

maven坐标

1
2
3
4
5
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>

序列化(生成实例类方式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 使用泛型方式,T类型为生成的实体
public static <T> byte[] serialize(T record, Class<T> tClass) {
byte[] content = null;
try {
DatumWriter<T> recordDatumWriter = new SpecificDatumWriter<>(tClass);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(outputStream, null);
recordDatumWriter.write(record, binaryEncoder);
content = outputStream.toByteArray();
} catch (IOException e) {
logger.error(e.getMessage());
}
return content;
}

反序列化(生成实例类方式)

1
2
3
4
5
6
7
8
9
10
11
public static <T> T deserialize(byte[] data, T initInstance, Class<T> tClass) {
T record = null;
try {
DatumReader<T> recordDatumReader = new SpecificDatumReader<>(tClass);
BinaryDecoder binaryEncoder = DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream(data), null);
record = recordDatumReader.read(initInstance, binaryEncoder);
} catch (IOException e) {
logger.error(e.getMessage());
}
return record;
}

序列化(通用方式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static byte[] genericSerialize(String schemaStr, GenericRecord record) {
byte[] content = null;
try {
Schema schema = new Schema.Parser().parse(schemaStr);
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(outputStream, null);
datumWriter.write(record, binaryEncoder);
content = outputStream.toByteArray();
} catch (IOException e) {
logger.error(e.getMessage());
}
return content;
}

反序列化(通用方式:获取数据后需要自己解析数据内容)

1
2
3
4
5
6
7
8
9
10
11
12
public static GenericRecord genericDeserialize(byte[] data, String schemaStr) {
GenericRecord record = null;
try {
Schema schema = new Schema.Parser().parse(schemaStr);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
BinaryDecoder binaryEncoder = DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream(data), null);
record = datumReader.read(record, binaryEncoder);
} catch (IOException e) {
logger.error(e.getMessage());
}
return record;
}

avro的编码结构 FIXME