QQ登录

只需要一步,快速开始

 注册地址  找回密码
查看: 2668|回复: 0
打印 上一主题 下一主题

[代码资源] 数据结构+spark

[复制链接]
字体大小: 正常 放大

1178

主题

15

听众

1万

积分

  • TA的每日心情
    开心
    2023-7-31 10:17
  • 签到天数: 198 天

    [LV.7]常住居民III

    自我介绍
    数学中国浅夏
    跳转到指定楼层
    1#
    发表于 2021-11-22 20:13 |只看该作者 |倒序浏览
    |招呼Ta 关注Ta
                                  数据结构+spark
    赫夫曼树
    赫夫曼编码-数据压缩
    压缩
    解码
    数据的加载和保存
    加载数据
    保存数据
    MySql
    Hive
    SparkStreaming
    背压机制
    wordcount
    RDD队列
    自定义数据采集器
    赫夫曼树
    1)给定n个权值作为n个叶子结点,构造一棵二叉树,若该树的带权路径长度(wpl)达到最小,称这样的二叉树为最优二叉树,也称为哈夫曼树(Huffman Tree), 还有的书翻译为霍夫曼树。

    2)赫夫曼树是带权路径长度最短的树,权值较大的结点离根较近。

    构成赫夫曼树的步骤:

    1)从小到大进行排序, 将每一个数据,每个数据都是一个节点 , 每个节点可以看成是一颗最简单的二叉树

    2)取出根节点权值最小的两颗二叉树

    3)组成一颗新的二叉树, 该新的二叉树的根节点的权值是前面两颗二叉树根节点权值的和

    4)再将这颗新的二叉树,以根节点的权值大小 再次排序, 不断重复 1-2-3-4 的步骤,直到数列中,所有的数据都被处理,就得到一颗赫夫曼树
    package huffmantree;

    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.List;

    public class HuffmanTree {

        public static void main(String[] args) {
            int []arr={13,7,8,3,29,6,1};

            Node root = createHuffmanTree(arr);
            root.preOrder();
        }


        //创建赫夫曼树的方法

        public static Node createHuffmanTree(int[]arr ) {
            //1、遍历arr数组
            //2、将arr的每个元素构成一个node
            //3、将Node放入到ArrayList中
            List<Node> nodes = new ArrayList<>();
            for (int value : arr) {
                nodes.add(new Node(value));
            }

            while (nodes.size() > 1) {
                //排序 从小到大
                Collections.sort(nodes);

                //取出根节点权值最小的两颗二叉树
                //(1)取出权值最小的节点(二叉树)
                Node leftNode = nodes.get(0);
                //(2)取出权值第二小的节点(二叉树)

                Node rightNode = nodes.get(1);

                //(3)构建一颗新的二叉树
                Node parent = new Node(leftNode.value + rightNode.value);
                parent.left = leftNode;
                parent.right = rightNode;

                //(4)从ArrayList中删除处理过的二叉树
                nodes.remove(leftNode);
                nodes.remove(rightNode);

                //(5)将parent加入到nodes
                nodes.add(parent);
            }

            return nodes.get(0);
        }
    }

    //创建节点类
    //为了让Node 对象支持排序Collections集合排序
    //让Node 实现Comparable接口

    class Node implements Comparable<Node>{
        int value;//节点权值
        Node left;//指向左子节点
        Node right;//指向右子节点
        public Node(int value){
            this.value=value;
        }

        //前序遍历
        public void preOrder(){
            System.out.println(this.value);
            if(this.left!=null)this.left.preOrder();
            if(this.right!=null)this.right.preOrder();

        }

      @Override
        public String toString() {
            return "Node{" +
                    "value=" + value +
                    '}';
        }

        @Override
        public int compareTo(Node o) {
            //从小到大排序
            return this.value-o.value;
        }
    }
    赫夫曼编码-数据压缩
    1)赫夫曼编码也翻译为哈夫曼编码(Huffman Coding),又称霍夫曼编码,是一种编码方式, 属于一种程序算法

    2)赫夫曼编码是赫哈夫曼树在电讯通信中的经典的应用之一。

    3)赫夫曼编码广泛地用于数据文件压缩。其压缩率通常在20%~90%之间

    4)赫夫曼码是可变字长编码(VLC)的一种。Huffman于1952年提出一种编码方法,称之为最佳编码


    压缩
    package huffmantree.huffmancode;

    import java.util.*;
    public class HuffmanCode {
        public static void main(String[] args) {

            String content="i like like like java do you like a java";
            byte[]contentBytes=content.getBytes();

            byte[] huffmanCodesBytes = huffmanZip(contentBytes);
            System.out.println("压缩后的结果是:"+Arrays.toString(huffmanCodesBytes));

        }

        //使用一个方法,将前面的方法封装起来,便于调用
        private static byte[] huffmanZip(byte[]bytes){
            //
            List<Node>node=getNodes(bytes);
            //更具node创建赫夫曼树
            Node huffmanTreeRoot=createHuffmanTree(node);

            Map<Byte, String> huffmanCodes = getCodes(huffmanTreeRoot);
            //根据生成的赫夫曼编码,压缩得到压缩后的赫夫曼编码字节数组
            return zip(bytes,huffmanCodes);

        }

        /**
         *
         * @param bytes 这是原始的字符串对应的byte[]
         * @param huffmanCodes 生成的赫夫曼编码map
         * @return 返回赫夫曼编码处理后的byte []
         */
        private static byte[] zip(byte[]bytes,Map<Byte,String> huffmanCodes){
            //1、利用huffmanCodes 将 bytes 转成 赫夫曼编码对应的字符串
            StringBuilder stringBuilder = new StringBuilder();
            for(byte b:bytes){
                stringBuilder.append(huffmanCodes.get(b));
            }

            //将stringBuilder转成 byte[]
            //统计返回 byte[] huffmanCode 长度
            int len;
            if(stringBuilder.length()%8==0){
                len=stringBuilder.length()/8;
            }else{
                len =stringBuilder.length()/8+1;
            }

            //创建 存储压缩后的byte数组
            byte[]huffmanCodeBytes=new byte[len];
            int index=0;

            byte[]by=new byte[len];
            for (int i = 0; i < stringBuilder.length(); i+=8) {//每八位对应一个byte,所以步长+8
                String strByte;
                if(i+8>stringBuilder.length()){//不够八位
                    strByte=stringBuilder.substring(i);
                }else {
                    strByte = stringBuilder.substring(i, i + 8);
                }
                //将strByte 转换成一个 byte ,放入到 huffmanCodeBytes
                huffmanCodeBytes[index]=(byte) Integer.parseInt(strByte,2);
                index++;

            }
            return huffmanCodeBytes;

        }

        private static List<Node>getNodes(byte[]bytes){
            ArrayList <Node>nodes=new ArrayList<>();

            //遍历bytes,统计每一个byte出现次数->map[key,value]
            Map<Byte,Integer> counts=new HashMap<>();
            for(byte b:bytes){
                Integer count=counts.getOrDefault(b,0);
                counts.put(b,count+1);
            }

            //把每一个键值对转成一个Node对象,并加入到nodes集合
            for(Map.Entry<Byte,Integer>entry:counts.entrySet()){
                nodes.add(new Node(entry.getKey(),entry.getValue()));
            }


            return nodes;
        }

        //创建对应的赫夫曼树
        private static Node createHuffmanTree(List<Node>nodes){
            while(nodes.size()>1){
                //从小到大
                Collections.sort(nodes);

                Node left=nodes.get(0);
                Node right=nodes.get(1);

                Node parent=new Node(null,left.weight+right.weight);
                parent.left=left;
                parent.right=right;

                nodes.remove(left);
                nodes.remove(right);

                nodes.add(parent);
            }
            return nodes.get(0);
        }

        //生成赫夫曼树对应的赫夫曼编码
        static Map<Byte,String> huffmanCodes=new HashMap<>();
        //static StringBuilder stringBuilder=new StringBuilder();

        private static Map<Byte,String>getCodes(Node root){
            if(root==null){
                return null;
            }
            //处理root的左子树
            getCodes(root.left,"0",new StringBuilder());
            //处理root的右子树
            getCodes(root.right,"1",new StringBuilder());
            return huffmanCodes;
        }

        /**
         * 功能:将传入的node结点的赫夫曼编码得到,并放入到huffmanCodes集合
         * @param node 传入结点
         * @param code 路径:左子结点是0,右子结点是1
         * @param stringBuilder 用于拼接路径
         */
        private static void getCodes(Node node,String code,StringBuilder stringBuilder){
            StringBuilder stringBuilder2 = new StringBuilder(stringBuilder);
            //将code加到stringBuilder2
            stringBuilder2.append(code);
            if(node!=null){//如果node==null 不处理
                //判断当前node 是叶子结点还是非叶子结点
                if(node.data==null){//非叶子结点
                    //递归处理
                    //向左
                    getCodes(node.left,"0",stringBuilder2);
                    //向右递归
                    getCodes(node.right,"1",stringBuilder2);
                }else{//说明是一个叶子结点
                    //表示找到某个叶子结点的最后
                    huffmanCodes.put(node.data,stringBuilder2.toString());

                }
            }

        }




    }
    class Node implements Comparable<Node>{
        Byte data;//存放数据
        int weight;//权值,表示字符出现的次数

        Node left;
        Node right;

        public Node(Byte data, int weight) {
            this.data = data;
            this.weight = weight;
        }


        @Override
        public int compareTo(Node o) {
            //从小到大排序
            return this.weight-o.weight;
        }

        @Override
        public String toString() {
            return "Node{" +
                    "data=" + data +
                    ", weight=" + weight +
                    '}';
        }

        //前序遍历
        public void preOrder(){
            System.out.println(this);
            if(this.left!=null)this.left.preOrder();
            if(this.right!=null)this.right.preOrder();
        }

    }




    解码
    public class HuffmanCode {
        public static void main(String[] args) {

            String content = "i like like like java do you like a java";
            byte[] contentBytes = content.getBytes();

            byte[] huffmanCodesBytes = huffmanZip(contentBytes);
            System.out.println("压缩后的结果是:" + Arrays.toString(huffmanCodesBytes));

            byte[] sourceBytes = decode(huffmanCodes, huffmanCodesBytes);
            System.out.println("原来的字符串:"+new String(sourceBytes));


        }

        //编写一个方法,完成对压缩数据的解码

        /**
         *
         * @param huffmanCodes 赫夫曼编码 map
         * @param huffmanBytes 赫夫曼编码得到字节数组
         * @return 就是原来的字符串对应的数组
         */
        private static byte[]decode(Map<Byte,String>huffmanCodes,byte[]huffmanBytes){

            //先得到huffmanBytes对应的二进制的字符串,

            StringBuilder stringBuilder = new StringBuilder();
            //将byte数组转成二进制的字符串
            for (int i = 0; i < huffmanBytes.length; i++) {
                byte b=huffmanBytes;
                //判断是不是最后一个字节
                boolean flag=(i==huffmanBytes.length-1);
                stringBuilder.append(byteToBitString(!flag,b));
            }
            System.out.println(stringBuilder);


            //把字符按串按照指定的赫夫曼编码进行解码
            //把赫夫曼编码表进行调换
            Map<String,Byte>map=new HashMap<>();

            for(Map.Entry <Byte,String>entry:huffmanCodes.entrySet()){
                map.put(entry.getValue(),entry.getKey());
            }

            //创建一个集合,存放byte
            List<Byte>list=new ArrayList<>();
            //i 可以理解成一个索引 扫描 stringBuilder
            for(int i=0;i<stringBuilder.length();){
                int count=1;//
                boolean flag=true;
                Byte b=null;
                while(flag){
                    //取出一个字符
                    String key=stringBuilder.substring(i,i+count);//i不懂,让count移动,知道匹配到一个字符
                    b=map.get(key);
                    if(b==null){//说明没有匹配到
                        count++;
                    }else{
                        //匹配到
                        flag=false;
                    }
                }
                list.add(b);
                i+=count;//i 直接移动到 count

            }
            //当for循环结束后 list中就存放了所有的字符
            //把list中的数据放入到byte[]并返回
            byte b[]=new byte[list.size()];
            for (int i = 0; i <list.size() ; i++) {
                b=list.get(i);
            }
            return b;
        }


        //完成数据的解码
        //重写转成 赫夫曼编码对应的二进制字符串
        // 赫夫曼编码对应的二进制的字符串 =》 对照 赫夫曼编码

        /**
         * 将一个byte 转成一个二进制的字符串
         * @param flag 标识是否需要补高位 如果是true 表示需要补高位,如果是false表示不补
         * @param b 传入的byte
         * @return 是该b 对应的二进制的字符串,(注意是按补码返回)
         */
        private static String byteToBitString(boolean flag,byte b) {
            //使用变量保存 b
            int temp = b;//将b 转成int

            //如果是正数还需要补高位
            if(flag)
            temp |= 256;//按位与256 1 0000 0000 | 0000 0001 =》1 0000 0001

            String str = Integer.toBinaryString(temp);//返回的二进制的补码

            if(flag){
                return str.substring(str.length()-8);
            }else{
                return str;
            }

        }




    数据的加载和保存
    SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的 API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式 为 parquet

    加载数据
    spark.read.load 是加载数据的通用方法
    csv format jdbc json load option options orc parquet schema
    table text textFile

    如果读取不同格式的数据,可以对不同的数据格式进行设定

    scala> spark.read.format("…")[.option("…")].load("…")
    ➢ format("…"):指定加载的数据类型,包括"csv"、“jdbc”、“json”、“orc”、"parquet"和 “textFile”。

    ➢ load("…"):在"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile"格式下需要传入加载 数据的路径。 ➢ option(”…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 我们前面都是使用 read API 先把文件加载到 DataFrame 然后再查询,其实,我们也可以直 接在文件上进行查询: 文件格式.文件路径

    scala>spark.sql(“select * from json./opt/module/data/user.json”).show

    保存数据
    df.write.save 是保存数据的通用方法
    csv jdbc json orc parquet textFile… …


    如果保存不同格式的数据,可以对不同的数据格式进行设定

    scala>df.write.format("…")[.option("…")].save("…")
    ➢ format("…"):指定保存的数据类型,包括"csv"、“jdbc”、“json”、“orc”、"parquet"和 “textFile”。

    ➢ save ("…"):在"csv"、“orc”、"parquet"和"textFile"格式下需要传入保存数据的路径。

    ➢ option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。 有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。

    SaveMode 是一个枚举类,其中的常量包括:

    Scala/Java Any Language Meaning

    SaveMode.ErrorIfExists(default) “error”(default) 如果文件已经存在则抛出异常

    SaveMode.Append “append” 如果文件已经存在则追加

    SaveMode.Overwrite “overwrite” 如果文件已经存在则覆盖

    SaveMode.Ignore “ignore” 如果文件已经存在则忽略

    df.write.mode(“append”).json("/opt/module/data/output")

    MySql
    package sql

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

    object SparkSqlJDBC {

      def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local
  • ").setAppName("sparkSQL")
        val spark: SparkSession =SparkSession.builder().config(sparkConf).getOrCreate()

        //读取mysql数据
        val df: DataFrame = spark.read
          .format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/jw?user=root&password=1234")
          .option("driver", "com.mysql.cj.jdbc.Driver")
          .option("password", "1234")
          .option("dbtable", "course")
          .load()

        df.show()

        //保存数据
        df.write
          .format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/test?user=root&password=1234")
          .option("driver", "com.mysql.cj.jdbc.Driver")
          .option("password", "1234")
          .option("dbtable", "course")
          .mode(SaveMode.Append)
          .save()


      }

    }


    Hive
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.0.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.2.1</version>
    </dependency>
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
    </dependency>

    将 hive-site.xml 文件拷贝到项目的 resources 目录中,代码实现

    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
        #显示表头
        <property>
            <name>hive.cli.print.header</name>
            <value>true</value>
        </property>

        <property>
            <name>hive.cli.print.current.db</name>
            <value>true</value>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionURL</name>
            <value>jdbc:mysql://hadoop103:3306/metastore?createDatabaseIfNotExist=true</value>
            <description>JDBC connect string for a JDBC metastore</description>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionDriverName</name>
            <value>com.mysql.jdbc.Driver</value>
            <description>Driver class name for a JDBC metastore</description>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionUserName</name>
            <value>root</value>
            <description>username to use against metastore database</description>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionPassword</name>
            #密码
            <value>123456</value>
            <description>password to use against metastore database</description>
        </property>
    </configuration>
    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local
  • ").setAppName("sparkSQL")
        val spark: SparkSession =SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

        spark.sql("show tables").show()

      }

    SparkStreaming
    Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语 如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。

    SparkStreaming 准实时(秒,分钟),微批次(时间) 的数据处理框架

    DStream 就是对 RDD 在实时数据处理场景的一种封装。

    背压机制
    ​ 为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制 数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。

    ​ 通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值 false,即不启用。

    wordcount
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.0.0</version>
    </dependency>


    需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并 统计不同单词出现的次数

    package sql

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

    object WordCount {
      def main(args: Array[String]): Unit = {
        //创建环境对象
        val sparkConf = new SparkConf().setMaster("local
  • ").setAppName("wordCont")
        //第一个参数表示环境配置
        //第二个参数表示批量处理的周期(采集周期)
        val ssc: StreamingContext =new StreamingContext(sparkConf,Seconds(3))

        //获取端口数据
        val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

        val words: DStream[String] = lines.flatMap(_.split(" "))

        val wordToOne: DStream[(String, Int)] = words.map((_, 1))

        val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)

        wordToCount.print()

        wordToCount.saveAsTextFiles("datas/result")

        //由于SparkStreaming采集器是长期执行的任务 不能直接关闭
        //如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕

        //1、启动采集器
        ssc.start()
        //2、等待采集器的关闭
        ssc.awaitTermination()



      }


    }
    RDD队列
    测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到 这个队列中的 RDD,都会作为一个 DStream 处理。

    ➢ 需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount
    package streaming

    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    import scala.collection.mutable


    object Queue {
      def main(args: Array[String]): Unit = {
        //1.初始化 Spark 配置信息
        val conf: SparkConf = new SparkConf().setMaster("local
  • ").setAppName("RDDStream")
        //2.初始化 SparkStreamingContext
        val ssc: StreamingContext = new StreamingContext(conf, Seconds(4))
        //3.创建 RDD 队列
        val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()
        //4.创建 QueueInputDStream
        val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue,oneAtATime = false)
        //5.处理队列中的 RDD 数据
        val mappedStream: DStream[(Int, Int)] = inputStream.map((_,1))
        val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey(_ + _)
        //6.打印结果
        reducedStream.print()
        //7.启动任务
        ssc.start()
        //8.循环创建并向 RDD 队列中放入 RDD
        for (i <- 1 to 5) {
          rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
          Thread.sleep(2000)
        }
        ssc.awaitTermination()
      }

    }
    自定义数据采集器
    package streaming

    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.dstream.ReceiverInputDStream
    import org.apache.spark.streaming.receiver.Receiver
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    import scala.util.Random

    object DIY {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local
  • ").setAppName("RDDStream")
        val ssc: StreamingContext = new StreamingContext(conf, Seconds(4))

        val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
        messageDS.print()



        ssc.start()
        ssc.awaitTermination()
      }

    }

    /**
    * 自定义数据采集器
    */
    class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){

      private var flag=true;

      override def onStart(): Unit = {
        new Thread(new Runnable {
          override def run(): Unit = {
            while(flag){
              val message: String ="采集的数据为"+new Random().nextInt(10).toString
              store(message)
              Thread.sleep(500)
            }
          }
        }).start()

      }

      override def onStop(): Unit = {
        flag=false

      }
    }






  • zan
    转播转播0 分享淘帖0 分享分享0 收藏收藏0 支持支持0 反对反对0 微信微信
    您需要登录后才可以回帖 登录 | 注册地址

    qq
    收缩
    • 电话咨询

    • 04714969085
    fastpost

    关于我们| 联系我们| 诚征英才| 对外合作| 产品服务| QQ

    手机版|Archiver| |繁體中文 手机客户端  

    蒙公网安备 15010502000194号

    Powered by Discuz! X2.5   © 2001-2013 数学建模网-数学中国 ( 蒙ICP备14002410号-3 蒙BBS备-0002号 )     论坛法律顾问:王兆丰

    GMT+8, 2025-6-27 14:10 , Processed in 0.406545 second(s), 50 queries .

    回顶部