题目
建表时使用 create table ... stored as geek 来创建GeekFormat表
该表的文件类型为文本类型,非二进制类型,类似Base64TextInputFormat和Base64TextOutputFormat,GeekFormat也是用于加密
- 解密规则如下:文件中出现任何的geek,geeeek,geeeeeeeeeeek等单词时,进行过滤,即删除该单词。gek需要保留。字母中连续的“e”最大长度为256个。
- 加密规则如下:文件输出时每随机2到256个单词,就插入一个gee…k,字母e的个数等于前面出现的非gee…k单词的个数。
思路
使用 Hive 建表时可以使用 STORED AS 指定特定的关键字,如下:
CREATE TABLE tb_test ( id int ) STORED AS PARQUET;
讯享网
还可以指定以下格式:

讯享网
STORED AS PARQUET 是以下三句话的简写:
讯享网ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
ROW FORMAT SERDE是指定序列化和反序列化的,即Java对象转换为字节的转换关系,可以使用默认值STORED AS INPUTFORMAT是对输入输出的格式化操作,这是实现加解密的关键
实现可以参考 Base64TextInputFormat.java
编码
1.GeekTextInputFormat.java
InputFormat 是指怎么读取到 hive 表中
public class GeekTextInputFormat implements InputFormat<LongWritable, BytesWritable>, JobConfigurable {
public static class GeekLineRecordReader implements RecordReader<LongWritable, BytesWritable>, JobConfigurable {
LineRecordReader reader; Text text; public GeekLineRecordReader(LineRecordReader reader) {
this.reader = reader; text = reader.createValue(); } @Override public void close() throws IOException {
reader.close(); } @Override public LongWritable createKey() {
return reader.createKey(); } @Override public BytesWritable createValue() {
return new BytesWritable(); } @Override public long getPos() throws IOException {
return reader.getPos(); } @Override public float getProgress() throws IOException {
return reader.getProgress(); } @Override public boolean next(LongWritable key, BytesWritable value) throws IOException {
while (reader.next(key, text)) {
String newStr = decode(); // text -> byte[] -> value byte[] textBytes = newStr.getBytes(); int length = text.getLength(); // Trim additional bytes if (length != textBytes.length) {
textBytes = Arrays.copyOf(textBytes, length); } value.set(textBytes, 0, textBytes.length); return true; } // no more data return false; } private String decode() {
return text.toString().replaceAll("gee+k", ""); } @Override public void configure(JobConf job) {
} } TextInputFormat format; JobConf job; public GeekTextInputFormat() {
format = new TextInputFormat(); } @Override public void configure(JobConf job) {
this.job = job; format.configure(job); } public RecordReader<LongWritable, BytesWritable> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(genericSplit.toString()); GeekLineRecordReader reader = new GeekLineRecordReader( new LineRecordReader(job, (FileSplit) genericSplit)); reader.configure(job); return reader; } @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
return format.getSplits(job, numSplits); } }
关键的实现是在 GeekLineRecordReader 的 next() 加入解密规则
2.GeekTextOutputFormat.java
OutputFormat 是指怎么输出到文件中
讯享网public class GeekTextOutputFormat<K extends WritableComparable, V extends Writable> extends HiveIgnoreKeyTextOutputFormat<K, V> {
public static class GeekRecordWriter implements RecordWriter, JobConfigurable {
RecordWriter writer; BytesWritable bytesWritable; public GeekRecordWriter(RecordWriter writer) {
this.writer = writer; bytesWritable = new BytesWritable(); } @Override public void write(Writable w) throws IOException {
// Get input data byte[] input; if (w instanceof Text) {
input = encode(w.toString()); } else {
assert (w instanceof BytesWritable); input = ((BytesWritable) w).getBytes(); } // Encode byte[] output = input; bytesWritable.set(output, 0, output.length); writer.write(bytesWritable); } private byte[] encode(String content) {
String[] words = content.split(" "); StringBuilder sb = new StringBuilder(); int bound = 254; int r = new Random().nextInt(bound) + 2; int j = 0; for (int i = 0; i < words.length; i++, j++) {
sb.append(words[i]).append(" "); if (j == r) {
sb.append("g"); for (int i1 = 0; i1 < j; i1++) {
sb.append("e"); } sb.append("k").append(" "); j = 0; r = new Random().nextInt(bound) + 2; } } return sb.toString().getBytes(); } @Override public void close(boolean abort) throws IOException {
writer.close(abort); } @Override public void configure(JobConf job) {
} } @Override public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException {
GeekRecordWriter writer = new GeekRecordWriter(super .getHiveRecordWriter(jc, finalOutPath, BytesWritable.class, isCompressed, tableProperties, progress)); writer.configure(jc); return writer; } }
关键的实现是在 GeekRecordWriter 的 write() 加入加规则
打包运行
```shell # 使用 maven 打包,并上传到服务器 # 进入到 hive cli add jar /home/student/hadoop/learn_hive-1.0-SNAPSHOT.jar; # 建表 create table tb_test_format(str STRING) stored as inputformat 'com.reiser.fileformat.GeekTextInputFormat' outputformat 'com.reiser.fileformat.GeekTextOutputFormat'; # 插入数据 INSERT INTO TABLE tb_test_format values('This notebook can be used to install gek on all worker nodes, run data generation, and create the TPCDS database.');
Hive 结果

HDFS 结果

补充
以上结果虽然正确,但是没有实现 stored as geek,这里需要对自定义格式进行注册
- 新建一个存储格式描述类并继承
AbstractStorageFormatDescriptor,该类返回stored as关键字以及 InputFormat、OutputFormat 和 SerDe 类的名称。 - 将存储格式描述类的名称添加到
org.apache.hadoop.hive.ql.io.StorageFormatDescriptor注册文件中。
讯享网public class GeekStorageFormatDescriptor extends AbstractStorageFormatDescriptor {
@Override public Set<String> getNames() {
return ImmutableSet.of("geek"); } @Override public String getInputFormat() {
return GeekTextInputFormat.class.getName(); } @Override public String getOutputFormat() {
return GeekTextOutputFormat.class.getName(); } }
参考
https://issues.apache.org/jira/browse/HIVE-5976
https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide#DeveloperGuide-HiveSerDe

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/23764.html