RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
flinksql怎么将数据写入到文件中

本篇内容主要讲解“flinksql怎么将数据写入到文件中”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flinksql怎么将数据写入到文件中”吧!

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名与空间、虚拟主机、营销软件、网站建设、长岭网站维护、网站推广。

package com.jd.dataoutput;

import com.jd.data.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;

public class FlinkSqlOutputFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource stream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt");
//        DataStreamSource stream = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator map = stream.map(new MapFunction() {

            public SensorReading map(String s) throws Exception {
                String[] split = s.split(",");
                return new SensorReading(split[0], split[1], split[2]);
            }
        });



        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//        使用 table api
        Table table = tableEnv.fromDataStream(map);
//        table.printSchema();
        Table select = table.select("a,b");
//        select.printSchema();

//        使用 sql api
//        tableEnv.createTemporaryView("test", map);
//        Table select = tableEnv.sqlQuery(" select a, b from test");
//        select.printSchema();


//        DataStream sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class);
//        sensorReading2DataStream.map(new MapFunction() {
//            @Override
//            public Object map(SensorReading2 value) throws Exception {
//                System.out.println(value.a+"   "+ value.b);
//                return null;
//            }
//        });

//        tableEnv.connect(new FileSystem().path("/Users/liuhaijing/IdeaProjects/haijing3/spark/flinksqldemo/output/out.txt"))
//                .withFormat(new Csv())
//                .withSchema(
//                        new Schema()
//                                .field("a", DataTypes.STRING())
//                                .field("b", DataTypes.STRING()))
//                .inAppendMode()
//                .createTemporaryTable("outputTable");
//        select.insertInto("outputTable");


        tableEnv.connect(new FileSystem().path("/Users/liuhaijing/IdeaProjects/haijing3/spark/flinksqldemo/output/out.txt"))
                .withFormat(new OldCsv())
                .withSchema(new Schema()
                                .field("a", DataTypes.STRING())
                ).inAppendMode()
                .createTemporaryTable("outputTable");
        select.insertInto("outputTable");

        env.execute();




    }
}

到此,相信大家对“flinksql怎么将数据写入到文件中”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


当前文章:flinksql怎么将数据写入到文件中
地址分享:http://lswzjz.com/article/jggheo.html