這篇文章主要講解了“Hadoop序列化怎么實現”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Hadoop序列化怎么實現”吧!
Hdfs: % hadoop fs -cat hdfs://namenode/data/a.txt
LocalFS: % hadoop fs -cat file:///tmp/a.txt
generate crc check sum file
%hadoop fs -copyToLocal -crc /data/a.txt file:///data/a.txt
check sum file: .a.txt.crc is a hidden file.
Ref: CRC-32,循環冗余校驗算法,error-detecting.
io.bytes.per.checksum is deprecated, it's dfs.bytes-per-checksum, default is 512, Must not be larger than dfs.stream-buffer-size,which is the size of buffer to stream files. The size of this buffer should probably be a multiple of hardware page size (4096 on Intel x86), and it determines how much data is buffered during read and write operations.
常用算法
讀書時,hadoop支持四種壓縮算法,如果調解空間和效率的話,-1 ~ -9,代表從最優速度到最優空間. 壓縮算法支持在org.apache.hadoop.io.compress.*.
deflate (.deflate), 就是常用的gzip, package ..DefaultCodec
Gzip (.gz),在deflate格式加了文件頭和尾. 壓縮速度(適中),解壓速度(適中),壓縮效率(適中),package ..GzipCodec, both of java and native
bzip2 (.bz2), 壓縮速度(最差),< 解壓速度(最差),壓縮效率 (最好),特點是支持可切分(splitable),對map-red非常友好。,package ..BZip2Codec,java only
LZO (.lzo), 壓縮速度(最快),解壓速度(最快),壓縮效率(最差),,package com.hadoop.compressiojn.lzo.lzopCodec, native only
如果禁用原生庫,使用hadoop.native.lib.
如果使用原生庫,可能對象創建的成本較高,所以可以使用CodecPool,重復使用這些對象。
對于一個非常大的數據文件,存儲如下方案:
使用支持切分的bzip2
手動切分,并使壓縮后的part接近于block size.
使用Sequence File, 它支持壓縮和切分
使用Avro數據文件,它也支持壓縮和切分,而且增加了很多編程語言的可讀寫性。
如果Map-Red的output自動壓縮:
conf.setBoolean ("mared.output.compress",true);
conf.setClass("mapred.output.compression.codec",GzipCodec.class,CompressionCodec.class);如果Map-Red的中間結果的自動壓縮:
//or conf.setCompressMapOutput(true);
conf.setBoolean ("mared.compress.map.output",true);
//or conf.setMapOutputComressorClass(GzipCodec.class)
conf.setClass("mapred.map.output.compression.codec",GzipCodec.class,CompressionCodec.class);// core class for hadoop
public interface Writable{
void write(DataOutput out) throw IOException;
void readFields(DataInput in) throw IOException;
}
public interface Comparable<T>{
int compareTo(T o);
}
//core class for map-reduce shuffle
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
// Sample
public class MyWritableComparable implements WritableComparable {
// Some data
private int counter;
private long timestamp;
public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}
public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp = in.readLong();
}
public int compareTo(MyWritableComparable o) {
int thisValue = this.value;
int thatValue = o.value;
return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + counter;
result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
return result
}
}
//optimize for stream comparasion
public interface RawComparator<T> extends Comparator<T>{
// s1 start position, l1, length of bytes
public int compare(byte[] b1, int s1,int l1,byte[] b2,int s2,int l2);
}
public class WritableComparator implements RawComparator{
}WritableComparator 提供了原始compator的compare反序列化對象的實現,性能較差。不過它作為RawComparator實例的工廠:
RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);
// 注冊一個經過優化的比較算子。Register an optimized comparator for a WritableComparable implementation.
static void define(Class c, WritableComparator comparator);
// 獲得一個WritableComparable的比較算子. Get a comparator for a WritableComparable implementation.
static WritableComparator get(Class<? extends WritableComparable> c);
public MyWritableComparator extends WritableComparator{
static{
define(MyWritableComparable.class, new MyWritableComparator());
}
public MyWritableComparator {
super(MyWritableComparable.class);
}
@Override
public int compare(byte[] b1, int s1,int l1,byte[] b2,int s2,int l2){
}
}
注: 要使static initializer被調用,除非有該類的實例被創建,或某靜態方法或成員被訪問?;蛘咧苯訌娭?,代碼如:
Class.forName("package.yourclass"); 它會強制初始化靜態initializer.
BooleanWritable, 1
ByteWritable, 1,
BytesWritable,
IntWritable,4
VIntWritable,1~5
FloatWritable,4,
LongWritable,8,
VLongWritable,1~9
DoubleWritable,8
NullWritable,Immutable singletone.
Text,4~
MD5Hash,
ObjectWritable,
GenericWritable
ArrayWritable
TwoDArrayWritable
AbstractMapWritable
MapWritable
SortedMapWritable
值得一提的是Text的序列化方式是Zero-compressed encoding,這個看過一些資料,其實是一種編碼方式,意圖是省略掉高位0所占用的空間,對于小數,它能節省空間,對于大數會額外占用空間。相比壓縮,它能比較快速。其實類似于VIntWritable, VLongWritable的編碼方式。
- 如何選擇變長和定長數值呢?
1. 定長適合分布非常均勻的數值(如hash),變長適合分布非常不均勻的數值。
2. 變長可以節省空間,而且可以在VIntWritable 和VLongWritable之間轉換。
- Text和String的區別
1。String是char序列,Text是UTF-8的byte序列.
UTF-8類不能對字符串大于32767的進行utf-8編碼。
(Indexing)索引:對于ASCII來說, Text和String是一樣的, 對于Unicode就不同了。String類的長度是其所含char編碼單元的長度,然而Text是UTF-8的字節碼的長度。CodePointAt表示一個真正的Unicode字符,它可以是2char,4bytes的unicode。
Iteration(迭代): 將Text轉換ByteBuffer,然后反復調用bytesToCodePoint()靜態方法,可以取到整型的Unicode.
Mutable(易變性): 可以set,類似writable 和StringBuffer,getLength()返回有效字串長度,getbytes().length,返回空間大小。
這是二進制數組的封裝,類似于windows下的BSTR,都是前面一個整型表示字節長度,后面是字節的二進制流。
它也是mutable,getLength() != getBytes().length
NullWritable是Writable的一個特殊類型。它的序列化長度為0,其實只是一個占位符,既不讀入,也不寫出。只是存在于程序體中。
Immutable,是一個singleton。
ObjectWritable是Java的Array, String, 以及Primitive類型的通用封裝 (注:不包含Integer)。它的序列化則使用java的類型序列化,寫入類型信息等,比較占用空間。
通過兩個特殊的構造:
public ObjectWritable(Object instance);
public ObjectWritable(Class declaredClass,Object instance);
舉例子:
ObjectWritable objectw = new ObjectWritable(int.class,5);
首先這是一個抽象類,需要被具象化才能使用。
觀察下面這個實列,它以一種Union方式,顯示的代理一個Writable實例,解決了Reduce函數的參數聲明問題。
public class MyGenericWritable extends GenericWritable {
private static Class<? extends Writable>[] CLASSES = null;
static {
CLASSES = (Class<? extends Writable>[]) new Class[] {
IntWritable.class,
Text.class
//add as many different Writable class as you want
};
}
@Override
protected Class<? extends Writable>[] getTypes() {
return CLASSES;
}
@Override
public String toString() {
return "MyGenericWritable [getTypes()=" + Arrays.toString(getTypes()) + "]";
}
// override hashcode();
}
public class Reduce extends Reducer<Text, MyGenericWritable, Text, Text> {
public void reduce(Text key, Iterable<MyGenericWritable> values, Context context) throws IOException, InterruptedException {
}ArrayWritable aw = new ArrayWriable(Text.class);
實現了java.util.Map<Writable,Writable> 和SortedMap...
它的serialize, 使用先寫map<classname,id>,然后后邊每個類的類型,以id來替代,節省空間。這些都在父類AbstractMapWritable中實現。
集合小結:
1. 如果是單類型的列表,使用ArrayWritable就足夠了
2。如果是把不同類型的Writable存儲在一個列表中:
-- 可以使用GenerickWritable,把元素封裝在一個ArrayWritable,這個貌似只能同一類型。
public class MyGenericWritable extends GenericWritable {
private static Class<? extends Writable>[] CLASSES = null;
static {
CLASSES = (Class<? extends Writable>[]) new Class[] {
ArrayWritable.class,
//add as many different Writable class as you want
};
}
@Override
protected Class<? extends Writable>[] getTypes() {
return CLASSES;
}-- 可以使用寫一個仿照MapWritable的ListWritable
//注意實現hashcode,equals,toString, comparTo (if possible)
//hashcode尤其重要,HashPartitioner通常用hashcode來選擇reduce分區,所以為你的類寫一個比較好的hashcode非常必要。
public class ListWritable extends ArrayList<Writable> implements Writable {
}
/**
* @author cloudera
*
*/
public class ListWritable extends ArrayList<Writable> implements Writable {
private List<Writable> list = new ArrayList<Writable>();
public void set(Writable writable){
list.add(writable);
}
@Override
public void readFields(DataInput in) throws IOException {
int nsize = in.readInt();
Configuration conf = new Configuration();
Text className = new Text();
while(nsize-->0){
Class theClass = null;
try {
className.readFields(in);
theClass = Class.forName(className.toString());
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Writable w = (Writable)ReflectionUtils.newInstance(theClass,conf);
w.readFields(in);
add(w);
}
}
@Override
public void write(DataOutput out) throws IOException {
Writable w = null;
out.writeInt(size());
for(int i = 0;i<size();i++){
w = get(i);
new Text(w.getClass().getName()).write(out);
w.write(out);
}
}
}感謝各位的閱讀,以上就是“Hadoop序列化怎么實現”的內容了,經過本文的學習后,相信大家對Hadoop序列化怎么實現這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。