這篇文章主要講解了“Hadoop序列化怎么實現”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Hadoop序列化怎么實現”吧!
Hdfs: % hadoop fs -cat hdfs://namenode/data/a.txt
LocalFS: % hadoop fs -cat file:///tmp/a.txt
generate crc check sum file
%hadoop fs -copyToLocal -crc /data/a.txt file:///data/a.txt
check sum file: .a.txt.crc is a hidden file.
Ref: CRC-32,循環冗余校驗算法,error-detecting.
io.bytes.per.checksum is deprecated, it's dfs.bytes-per-checksum, default is 512, Must not be larger than dfs.stream-buffer-size,which is the size of buffer to stream files. The size of this buffer should probably be a multiple of hardware page size (4096 on Intel x86), and it determines how much data is buffered during read and write operations.
常用算法
讀書時,hadoop支持四種壓縮算法,如果調解空間和效率的話,-1 ~ -9,代表從最優速度到最優空間. 壓縮算法支持在org.apache.hadoop.io.compress.*.
deflate (.deflate), 就是常用的gzip, package ..DefaultCodec
Gzip (.gz),在deflate格式加了文件頭和尾. 壓縮速度(適中),解壓速度(適中),壓縮效率(適中),package ..GzipCodec, both of java and native
bzip2 (.bz2), 壓縮速度(最差),< 解壓速度(最差),壓縮效率 (最好),特點是支持可切分(splitable),對map-red非常友好。,package ..BZip2Codec,java only
LZO (.lzo), 壓縮速度(最快),解壓速度(最快),壓縮效率(最差),,package com.hadoop.compressiojn.lzo.lzopCodec, native only
如果禁用原生庫,使用hadoop.native.lib.
如果使用原生庫,可能對象創建的成本較高,所以可以使用CodecPool,重復使用這些對象。
對于一個非常大的數據文件,存儲如下方案:
使用支持切分的bzip2
手動切分,并使壓縮后的part接近于block size.
使用Sequence File, 它支持壓縮和切分
使用Avro數據文件,它也支持壓縮和切分,而且增加了很多編程語言的可讀寫性。
如果Map-Red的output自動壓縮:
conf.setBoolean ("mared.output.compress",true); conf.setClass("mapred.output.compression.codec",GzipCodec.class,CompressionCodec.class);
如果Map-Red的中間結果的自動壓縮:
//or conf.setCompressMapOutput(true); conf.setBoolean ("mared.compress.map.output",true); //or conf.setMapOutputComressorClass(GzipCodec.class) conf.setClass("mapred.map.output.compression.codec",GzipCodec.class,CompressionCodec.class);
// core class for hadoop public interface Writable{ void write(DataOutput out) throw IOException; void readFields(DataInput in) throw IOException; } public interface Comparable<T>{ int compareTo(T o); } //core class for map-reduce shuffle public interface WritableComparable<T> extends Writable, Comparable<T> { } // Sample public class MyWritableComparable implements WritableComparable { // Some data private int counter; private long timestamp; public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public int compareTo(MyWritableComparable o) { int thisValue = this.value; int thatValue = o.value; return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); } public int hashCode() { final int prime = 31; int result = 1; result = prime * result + counter; result = prime * result + (int) (timestamp ^ (timestamp >>> 32)); return result } } //optimize for stream comparasion public interface RawComparator<T> extends Comparator<T>{ // s1 start position, l1, length of bytes public int compare(byte[] b1, int s1,int l1,byte[] b2,int s2,int l2); } public class WritableComparator implements RawComparator{ }
WritableComparator 提供了原始compator的compare反序列化對象的實現,性能較差。不過它作為RawComparator實例的工廠:
RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);
// 注冊一個經過優化的比較算子。Register an optimized comparator for a WritableComparable implementation.
static void define(Class c, WritableComparator comparator);
// 獲得一個WritableComparable的比較算子. Get a comparator for a WritableComparable implementation.
static WritableComparator get(Class<? extends WritableComparable> c);
public MyWritableComparator extends WritableComparator{ static{ define(MyWritableComparable.class, new MyWritableComparator()); } public MyWritableComparator { super(MyWritableComparable.class); } @Override public int compare(byte[] b1, int s1,int l1,byte[] b2,int s2,int l2){ } }
注: 要使static initializer被調用,除非有該類的實例被創建,或某靜態方法或成員被訪問?;蛘咧苯訌娭?,代碼如:
Class.forName("package.yourclass"); 它會強制初始化靜態initializer.
BooleanWritable, 1
ByteWritable, 1,
BytesWritable,
IntWritable,4
VIntWritable,1~5
FloatWritable,4,
LongWritable,8,
VLongWritable,1~9
DoubleWritable,8
NullWritable,Immutable singletone.
Text,4~
MD5Hash,
ObjectWritable,
GenericWritable
ArrayWritable
TwoDArrayWritable
AbstractMapWritable
MapWritable
SortedMapWritable
值得一提的是Text的序列化方式是Zero-compressed encoding,這個看過一些資料,其實是一種編碼方式,意圖是省略掉高位0所占用的空間,對于小數,它能節省空間,對于大數會額外占用空間。相比壓縮,它能比較快速。其實類似于VIntWritable, VLongWritable的編碼方式。
- 如何選擇變長和定長數值呢?
1. 定長適合分布非常均勻的數值(如hash),變長適合分布非常不均勻的數值。
2. 變長可以節省空間,而且可以在VIntWritable 和VLongWritable之間轉換。
- Text和String的區別
1。String是char序列,Text是UTF-8的byte序列.
UTF-8類不能對字符串大于32767的進行utf-8編碼。
(Indexing)索引:對于ASCII來說, Text和String是一樣的, 對于Unicode就不同了。String類的長度是其所含char編碼單元的長度,然而Text是UTF-8的字節碼的長度。CodePointAt表示一個真正的Unicode字符,它可以是2char,4bytes的unicode。
Iteration(迭代): 將Text轉換ByteBuffer,然后反復調用bytesToCodePoint()靜態方法,可以取到整型的Unicode.
Mutable(易變性): 可以set,類似writable 和StringBuffer,getLength()返回有效字串長度,getbytes().length,返回空間大小。
這是二進制數組的封裝,類似于windows下的BSTR,都是前面一個整型表示字節長度,后面是字節的二進制流。
它也是mutable,getLength() != getBytes().length
NullWritable是Writable的一個特殊類型。它的序列化長度為0,其實只是一個占位符,既不讀入,也不寫出。只是存在于程序體中。
Immutable,是一個singleton。
ObjectWritable是Java的Array, String, 以及Primitive類型的通用封裝 (注:不包含Integer)。它的序列化則使用java的類型序列化,寫入類型信息等,比較占用空間。
通過兩個特殊的構造:
public ObjectWritable(Object instance);
public ObjectWritable(Class declaredClass,Object instance);
舉例子:
ObjectWritable objectw = new ObjectWritable(int.class,5);
首先這是一個抽象類,需要被具象化才能使用。
觀察下面這個實列,它以一種Union方式,顯示的代理一個Writable實例,解決了Reduce函數的參數聲明問題。
public class MyGenericWritable extends GenericWritable { private static Class<? extends Writable>[] CLASSES = null; static { CLASSES = (Class<? extends Writable>[]) new Class[] { IntWritable.class, Text.class //add as many different Writable class as you want }; } @Override protected Class<? extends Writable>[] getTypes() { return CLASSES; } @Override public String toString() { return "MyGenericWritable [getTypes()=" + Arrays.toString(getTypes()) + "]"; } // override hashcode(); } public class Reduce extends Reducer<Text, MyGenericWritable, Text, Text> { public void reduce(Text key, Iterable<MyGenericWritable> values, Context context) throws IOException, InterruptedException { }
ArrayWritable aw = new ArrayWriable(Text.class);
實現了java.util.Map<Writable,Writable> 和SortedMap...
它的serialize, 使用先寫map<classname,id>,然后后邊每個類的類型,以id來替代,節省空間。這些都在父類AbstractMapWritable中實現。
集合小結:
1. 如果是單類型的列表,使用ArrayWritable就足夠了
2。如果是把不同類型的Writable存儲在一個列表中:
-- 可以使用GenerickWritable,把元素封裝在一個ArrayWritable,這個貌似只能同一類型。
public class MyGenericWritable extends GenericWritable { private static Class<? extends Writable>[] CLASSES = null; static { CLASSES = (Class<? extends Writable>[]) new Class[] { ArrayWritable.class, //add as many different Writable class as you want }; } @Override protected Class<? extends Writable>[] getTypes() { return CLASSES; }
-- 可以使用寫一個仿照MapWritable的ListWritable
//注意實現hashcode,equals,toString, comparTo (if possible)
//hashcode尤其重要,HashPartitioner通常用hashcode來選擇reduce分區,所以為你的類寫一個比較好的hashcode非常必要。
public class ListWritable extends ArrayList<Writable> implements Writable {
}
/** * @author cloudera * */ public class ListWritable extends ArrayList<Writable> implements Writable { private List<Writable> list = new ArrayList<Writable>(); public void set(Writable writable){ list.add(writable); } @Override public void readFields(DataInput in) throws IOException { int nsize = in.readInt(); Configuration conf = new Configuration(); Text className = new Text(); while(nsize-->0){ Class theClass = null; try { className.readFields(in); theClass = Class.forName(className.toString()); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } Writable w = (Writable)ReflectionUtils.newInstance(theClass,conf); w.readFields(in); add(w); } } @Override public void write(DataOutput out) throws IOException { Writable w = null; out.writeInt(size()); for(int i = 0;i<size();i++){ w = get(i); new Text(w.getClass().getName()).write(out); w.write(out); } } }
感謝各位的閱讀,以上就是“Hadoop序列化怎么實現”的內容了,經過本文的學習后,相信大家對Hadoop序列化怎么實現這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。