RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
ORC文件读写工具类和Flink输出ORC格式文件的方法

本篇内容主要讲解“ORC文件读写工具类和Flink输出ORC格式文件的方法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“ORC文件读写工具类和Flink输出ORC格式文件的方法”吧!

10年积累的成都做网站、成都网站建设经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站设计后付款的网站建设流程,更有黄陂免费网站建设让你可以放心的选择与我们合作。

一.ORC文件:

压缩

压缩比例在1:7到1:10之间,3份副本的话会节省接近10倍空间

调查数据周末要给出

数据压缩后要注意负载均衡问题,可以尝试reblance

导出

hive的orc文件使用sqoop导出到MySQL使用hcatalog直接增加一些配置参数即可

查看

以json方式查看orc文件

hive --orcfiledump -j -p /user/hive/warehouse/dim.db/dim_province/000000_0

下载

以KV形式查看orc文件

hive --orcfiledump -d /user/hive/warehouse/dim.db/dim_province/000000_0 > myfile.txt

orc读取会查找字段在min和max中的值,不包含则跳过,所以速度会快

二,orc读写工具类

注意事项: 在windows读写时,请务必保证classpath ,path中不要有hadoop的环境变量! 如果有,请先删除,并且重启IDE 

2.1 读:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;

import java.io.IOException;

public class CoreReader {
  public static void main(Configuration conf, String[] args) throws IOException {
    // Get the information from the file footer
    Reader reader = OrcFile.createReader(new Path("my-file.orc"),
                                         OrcFile.readerOptions(conf));
    System.out.println("File schema: " + reader.getSchema());
    System.out.println("Row count: " + reader.getNumberOfRows());

    // Pick the schema we want to read using schema evolution
    TypeDescription readSchema =
        TypeDescription.fromString("struct");
    // Read the row data
    VectorizedRowBatch batch = readSchema.createRowBatch();
    RecordReader rowIterator = reader.rows(reader.options()
                                             .schema(readSchema));
    LongColumnVector z = (LongColumnVector) batch.cols[0];
    BytesColumnVector y = (BytesColumnVector) batch.cols[1];
    LongColumnVector x = (LongColumnVector) batch.cols[2];
    while (rowIterator.nextBatch(batch)) {
      for(int row=0; row < batch.size; ++row) {
        int zRow = z.isRepeating ? 0: row;
        int xRow = x.isRepeating ? 0: row;
        System.out.println("z: " +
            (z.noNulls || !z.isNull[zRow] ? z.vector[zRow] : null));
        System.out.println("y: " + y.toString(row));
        System.out.println("x: " +
            (x.noNulls || !x.isNull[xRow] ? x.vector[xRow] : null));
      }
    }
    rowIterator.close();
  }

  public static void main(String[] args) throws IOException {
    main(new Configuration(), args);
  }
}

 2.2,写:

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.orc.OrcFile;import org.apache.orc.TypeDescription;import org.apache.orc.Writer;import java.io.IOException;import java.nio.charset.StandardCharsets;public class CoreWriter {  public static void main(Configuration conf, String[] args) throws IOException {
    TypeDescription schema =
      TypeDescription.fromString("struct");
    Writer writer = OrcFile.createWriter(new Path("my-file.orc"),
                                         OrcFile.writerOptions(conf)
                                          .setSchema(schema));
    VectorizedRowBatch batch = schema.createRowBatch();
    LongColumnVector x = (LongColumnVector) batch.cols[0];
    BytesColumnVector y = (BytesColumnVector) batch.cols[1];for(int r=0; r < 10000; ++r) {      int row = batch.size++;
      x.vector[row] = r;      byte[] buffer = ("Last-" + (r * 3)).getBytes(StandardCharsets.UTF_8);
      y.setRef(row, buffer, 0, buffer.length);      // If the batch is full, write it out and start over.      if (batch.size == batch.getMaxSize()) {
        writer.addRowBatch(batch);
        batch.reset();
      }
    }if (batch.size != 0) {
      writer.addRowBatch(batch);
    }
    writer.close();
  }  public static void main(String[] args) throws IOException {main(new Configuration(), args);
  }
}

2.3 Flink Sink ORC文件示例:(基于flink1.12.3版本)

import org.apache.flink.core.fs.Path;
import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;

import org.apache.hadoop.conf.Configuration;
import org.apache.orc.TypeDescription;

import java.util.Properties;

public class StreamingWriteFileOrc {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);
        env.setParallelism(1);
        DataStream dataStream = env.addSource(
                new MySource());

        //写入orc格式的属性
        final Properties writerProps = new Properties();
        writerProps.setProperty("orc.compress", "LZ4");

        //定义类型和字段名
        LogicalType[] orcTypes = new LogicalType[]{
                new IntType(), new DoubleType(), new VarCharType()};
        String[] fields = new String[]{"a", "b", "c"};
        TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(RowType.of(
                orcTypes,
                fields));

        //构造工厂类OrcBulkWriterFactory
        final OrcBulkWriterFactory factory = new OrcBulkWriterFactory<>(
                new RowDataVectorizer(typeDescription.toString(), orcTypes),
                writerProps,
                new Configuration());

        StreamingFileSink orcSink = StreamingFileSink
                .forBulkFormat(new Path("file:///tmp/aaaa"), factory)
                .build();

        dataStream.addSink(orcSink);

        env.execute();
    }

    public static class MySource implements SourceFunction{
        @Override
        public void run(SourceContext sourceContext) throws Exception{
            while (true){
                GenericRowData rowData = new GenericRowData(3);
                rowData.setField(0, (int) (Math.random() * 100));
                rowData.setField(1, Math.random() * 100);
                rowData.setField(2, org.apache.flink.table.data.StringData.fromString(String.valueOf(Math.random() * 100)));
                sourceContext.collect(rowData);
                Thread.sleep(1);
            }
        }

        @Override
        public void cancel(){

        }
    }

}

2.4 POM依赖

    
        UTF-8
        1.8
        UTF-8
        UTF-8
        1.8
        UTF-8
        1.8
        2.11
        2.11
        1.12.3
        1.2.0
        1.7.21
        1.3.1
        compile
    

    
     
        
            commons-cli
            commons-cli
            1.4
        
        
            commons-codec
            commons-codec
            1.15
        

        
        
            junit
            junit
            4.11
            test
        

        
            org.apache.hbase
            hbase-client
            ${hbase.version}
            
                
                    org.apache.hadoop
                    hadoop-yarn-common
                
                
                    org.apache.hadoop
                    hadoop-yarn-api
                
                
                    hadoop-mapreduce-client-core
                    org.apache.hadoop
                
                
                    hadoop-auth
                    org.apache.hadoop
                
                
                    hadoop-common
                    org.apache.hadoop
                
            
        


        
            commons-lang
            commons-lang
            2.6
        
        
            org.apache.commons
            commons-lang3
            3.3.2
        


        
            mysql
            mysql-connector-java
            5.1.47
        

        
            com.alibaba
            fastjson
            1.2.28
        


        
            org.apache.flink
            flink-java
            ${flink.cluster.version}
            ${scope.value}
        

        
            org.apache.flink
            flink-table
            ${flink.cluster.version}
            pom
            ${scope.value}
        


        
            org.apache.flink
            flink-table-api-scala-bridge_2.11
            ${flink.cluster.version}
            ${scope.value}
        


        
            org.apache.flink
            flink-table-api-java-bridge_2.11
            ${flink.cluster.version}
            ${scope.value}
        


        
            org.apache.flink
            flink-connector-filesystem_2.11
            1.11.3
        

        
            org.apache.flink
            flink-connector-filesystem_${scala.version}
            1.11.3
        

        
        
            org.apache.flink
            flink-orc_2.11
            1.12.3
            ${scope.value}
        


        
            org.apache.flink
            flink-ml_${scala.version}
            1.8.1
            ${scope.value}
        


        
        
            org.apache.flink
            flink-table-planner-blink_2.11
            ${flink.cluster.version}
            ${scope.value}
        

        
        
            org.apache.flink
            flink-table-common
            ${flink.cluster.version}
            ${scope.value}
        


        
        
            org.apache.flink
            flink-streaming-java_${scala.version}
            1.12.3
            ${scope.value}
        


        
            org.apache.flink
            flink-streaming-scala_${scala.version}
            ${flink.cluster.version}
            
                
                    commons-lang3
                    org.apache.commons
                
                
                    commons-cli
                    commons-cli
                
            
            ${scope.value}
        

        
            org.apache.flink
            flink-connector-kafka_${scala.version}
            ${flink.cluster.version}
            
                
                    log4j
                    log4j
                
                
                    org.slf4j
                    slf4j-log4j12
                
            
        

        
            org.apache.hadoop
            hadoop-common
            2.7.3
            ${scope.value}
        
        
            org.apache.hadoop
            hadoop-hdfs
            2.7.3
            ${scope.value}
            
                
                    xml-apis
                    xml-apis
                
            
        

        
            org.apache.flink
            flink-parquet_${scala.version}
            ${flink.cluster.version}
        

        
            org.apache.flink
            flink-avro
            ${flink.cluster.version}
        

        
        
            org.slf4j
            slf4j-api
            ${slf4j.version}
        
        
            ch.qos.logback
            logback-core
            ${logback.version}
        
        
            ch.qos.logback
            logback-classic
            ${logback.version}
        

        
        
            redis.clients
            jedis
            3.0.0
        
        
            org.apache.commons
            commons-pool2
            2.5.0
        

        
        
            com.alibaba
            druid
            1.0.11
        

        
            org.apache.flink
            flink-clients_2.11
            ${flink.cluster.version}
        


        
            org.apache.hive
            hive-jdbc
            1.2.1
        

        
        
            org.apache.hadoop
            hadoop-client
            2.7.3
        

    

到此,相信大家对“ORC文件读写工具类和Flink输出ORC格式文件的方法”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


名称栏目:ORC文件读写工具类和Flink输出ORC格式文件的方法
当前链接:http://lswzjz.com/article/jedepo.html