数学建模社区-数学中国

标题: 数据结构+spark [打印本页]

作者: 1047521767    时间: 2021-11-22 20:13
标题: 数据结构+spark
                              数据结构+spark
赫夫曼树
赫夫曼编码-数据压缩
压缩
解码
数据的加载和保存
加载数据
保存数据
MySql
Hive
SparkStreaming
背压机制
wordcount
RDD队列
自定义数据采集器
赫夫曼树
1)给定n个权值作为n个叶子结点,构造一棵二叉树,若该树的带权路径长度(wpl)达到最小,称这样的二叉树为最优二叉树,也称为哈夫曼树(Huffman Tree), 还有的书翻译为霍夫曼树。

2)赫夫曼树是带权路径长度最短的树,权值较大的结点离根较近。

构成赫夫曼树的步骤:

1)从小到大进行排序, 将每一个数据,每个数据都是一个节点 , 每个节点可以看成是一颗最简单的二叉树

2)取出根节点权值最小的两颗二叉树

3)组成一颗新的二叉树, 该新的二叉树的根节点的权值是前面两颗二叉树根节点权值的和

4)再将这颗新的二叉树,以根节点的权值大小 再次排序, 不断重复 1-2-3-4 的步骤,直到数列中,所有的数据都被处理,就得到一颗赫夫曼树
package huffmantree;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

public class HuffmanTree {

    public static void main(String[] args) {
        int []arr={13,7,8,3,29,6,1};

        Node root = createHuffmanTree(arr);
        root.preOrder();
    }


    //创建赫夫曼树的方法

    public static Node createHuffmanTree(int[]arr ) {
        //1、遍历arr数组
        //2、将arr的每个元素构成一个node
        //3、将Node放入到ArrayList中
        List<Node> nodes = new ArrayList<>();
        for (int value : arr) {
            nodes.add(new Node(value));
        }

        while (nodes.size() > 1) {
            //排序 从小到大
            Collections.sort(nodes);

            //取出根节点权值最小的两颗二叉树
            //(1)取出权值最小的节点(二叉树)
            Node leftNode = nodes.get(0);
            //(2)取出权值第二小的节点(二叉树)

            Node rightNode = nodes.get(1);

            //(3)构建一颗新的二叉树
            Node parent = new Node(leftNode.value + rightNode.value);
            parent.left = leftNode;
            parent.right = rightNode;

            //(4)从ArrayList中删除处理过的二叉树
            nodes.remove(leftNode);
            nodes.remove(rightNode);

            //(5)将parent加入到nodes
            nodes.add(parent);
        }

        return nodes.get(0);
    }
}

//创建节点类
//为了让Node 对象支持排序Collections集合排序
//让Node 实现Comparable接口

class Node implements Comparable<Node>{
    int value;//节点权值
    Node left;//指向左子节点
    Node right;//指向右子节点
    public Node(int value){
        this.value=value;
    }

    //前序遍历
    public void preOrder(){
        System.out.println(this.value);
        if(this.left!=null)this.left.preOrder();
        if(this.right!=null)this.right.preOrder();

    }

  @Override
    public String toString() {
        return "Node{" +
                "value=" + value +
                '}';
    }

    @Override
    public int compareTo(Node o) {
        //从小到大排序
        return this.value-o.value;
    }
}
赫夫曼编码-数据压缩
1)赫夫曼编码也翻译为哈夫曼编码(Huffman Coding),又称霍夫曼编码,是一种编码方式, 属于一种程序算法

2)赫夫曼编码是赫哈夫曼树在电讯通信中的经典的应用之一。

3)赫夫曼编码广泛地用于数据文件压缩。其压缩率通常在20%~90%之间

4)赫夫曼码是可变字长编码(VLC)的一种。Huffman于1952年提出一种编码方法,称之为最佳编码


压缩
package huffmantree.huffmancode;

import java.util.*;
public class HuffmanCode {
    public static void main(String[] args) {

        String content="i like like like java do you like a java";
        byte[]contentBytes=content.getBytes();

        byte[] huffmanCodesBytes = huffmanZip(contentBytes);
        System.out.println("压缩后的结果是:"+Arrays.toString(huffmanCodesBytes));

    }

    //使用一个方法,将前面的方法封装起来,便于调用
    private static byte[] huffmanZip(byte[]bytes){
        //
        List<Node>node=getNodes(bytes);
        //更具node创建赫夫曼树
        Node huffmanTreeRoot=createHuffmanTree(node);

        Map<Byte, String> huffmanCodes = getCodes(huffmanTreeRoot);
        //根据生成的赫夫曼编码,压缩得到压缩后的赫夫曼编码字节数组
        return zip(bytes,huffmanCodes);

    }

    /**
     *
     * @param bytes 这是原始的字符串对应的byte[]
     * @param huffmanCodes 生成的赫夫曼编码map
     * @return 返回赫夫曼编码处理后的byte []
     */
    private static byte[] zip(byte[]bytes,Map<Byte,String> huffmanCodes){
        //1、利用huffmanCodes 将 bytes 转成 赫夫曼编码对应的字符串
        StringBuilder stringBuilder = new StringBuilder();
        for(byte b:bytes){
            stringBuilder.append(huffmanCodes.get(b));
        }

        //将stringBuilder转成 byte[]
        //统计返回 byte[] huffmanCode 长度
        int len;
        if(stringBuilder.length()%8==0){
            len=stringBuilder.length()/8;
        }else{
            len =stringBuilder.length()/8+1;
        }

        //创建 存储压缩后的byte数组
        byte[]huffmanCodeBytes=new byte[len];
        int index=0;

        byte[]by=new byte[len];
        for (int i = 0; i < stringBuilder.length(); i+=8) {//每八位对应一个byte,所以步长+8
            String strByte;
            if(i+8>stringBuilder.length()){//不够八位
                strByte=stringBuilder.substring(i);
            }else {
                strByte = stringBuilder.substring(i, i + 8);
            }
            //将strByte 转换成一个 byte ,放入到 huffmanCodeBytes
            huffmanCodeBytes[index]=(byte) Integer.parseInt(strByte,2);
            index++;

        }
        return huffmanCodeBytes;

    }

    private static List<Node>getNodes(byte[]bytes){
        ArrayList <Node>nodes=new ArrayList<>();

        //遍历bytes,统计每一个byte出现次数->map[key,value]
        Map<Byte,Integer> counts=new HashMap<>();
        for(byte b:bytes){
            Integer count=counts.getOrDefault(b,0);
            counts.put(b,count+1);
        }

        //把每一个键值对转成一个Node对象,并加入到nodes集合
        for(Map.Entry<Byte,Integer>entry:counts.entrySet()){
            nodes.add(new Node(entry.getKey(),entry.getValue()));
        }


        return nodes;
    }

    //创建对应的赫夫曼树
    private static Node createHuffmanTree(List<Node>nodes){
        while(nodes.size()>1){
            //从小到大
            Collections.sort(nodes);

            Node left=nodes.get(0);
            Node right=nodes.get(1);

            Node parent=new Node(null,left.weight+right.weight);
            parent.left=left;
            parent.right=right;

            nodes.remove(left);
            nodes.remove(right);

            nodes.add(parent);
        }
        return nodes.get(0);
    }

    //生成赫夫曼树对应的赫夫曼编码
    static Map<Byte,String> huffmanCodes=new HashMap<>();
    //static StringBuilder stringBuilder=new StringBuilder();

    private static Map<Byte,String>getCodes(Node root){
        if(root==null){
            return null;
        }
        //处理root的左子树
        getCodes(root.left,"0",new StringBuilder());
        //处理root的右子树
        getCodes(root.right,"1",new StringBuilder());
        return huffmanCodes;
    }

    /**
     * 功能:将传入的node结点的赫夫曼编码得到,并放入到huffmanCodes集合
     * @param node 传入结点
     * @param code 路径:左子结点是0,右子结点是1
     * @param stringBuilder 用于拼接路径
     */
    private static void getCodes(Node node,String code,StringBuilder stringBuilder){
        StringBuilder stringBuilder2 = new StringBuilder(stringBuilder);
        //将code加到stringBuilder2
        stringBuilder2.append(code);
        if(node!=null){//如果node==null 不处理
            //判断当前node 是叶子结点还是非叶子结点
            if(node.data==null){//非叶子结点
                //递归处理
                //向左
                getCodes(node.left,"0",stringBuilder2);
                //向右递归
                getCodes(node.right,"1",stringBuilder2);
            }else{//说明是一个叶子结点
                //表示找到某个叶子结点的最后
                huffmanCodes.put(node.data,stringBuilder2.toString());

            }
        }

    }




}
class Node implements Comparable<Node>{
    Byte data;//存放数据
    int weight;//权值,表示字符出现的次数

    Node left;
    Node right;

    public Node(Byte data, int weight) {
        this.data = data;
        this.weight = weight;
    }


    @Override
    public int compareTo(Node o) {
        //从小到大排序
        return this.weight-o.weight;
    }

    @Override
    public String toString() {
        return "Node{" +
                "data=" + data +
                ", weight=" + weight +
                '}';
    }

    //前序遍历
    public void preOrder(){
        System.out.println(this);
        if(this.left!=null)this.left.preOrder();
        if(this.right!=null)this.right.preOrder();
    }

}




解码
public class HuffmanCode {
    public static void main(String[] args) {

        String content = "i like like like java do you like a java";
        byte[] contentBytes = content.getBytes();

        byte[] huffmanCodesBytes = huffmanZip(contentBytes);
        System.out.println("压缩后的结果是:" + Arrays.toString(huffmanCodesBytes));

        byte[] sourceBytes = decode(huffmanCodes, huffmanCodesBytes);
        System.out.println("原来的字符串:"+new String(sourceBytes));


    }

    //编写一个方法,完成对压缩数据的解码

    /**
     *
     * @param huffmanCodes 赫夫曼编码 map
     * @param huffmanBytes 赫夫曼编码得到字节数组
     * @return 就是原来的字符串对应的数组
     */
    private static byte[]decode(Map<Byte,String>huffmanCodes,byte[]huffmanBytes){

        //先得到huffmanBytes对应的二进制的字符串,

        StringBuilder stringBuilder = new StringBuilder();
        //将byte数组转成二进制的字符串
        for (int i = 0; i < huffmanBytes.length; i++) {
            byte b=huffmanBytes;
            //判断是不是最后一个字节
            boolean flag=(i==huffmanBytes.length-1);
            stringBuilder.append(byteToBitString(!flag,b));
        }
        System.out.println(stringBuilder);


        //把字符按串按照指定的赫夫曼编码进行解码
        //把赫夫曼编码表进行调换
        Map<String,Byte>map=new HashMap<>();

        for(Map.Entry <Byte,String>entry:huffmanCodes.entrySet()){
            map.put(entry.getValue(),entry.getKey());
        }

        //创建一个集合,存放byte
        List<Byte>list=new ArrayList<>();
        //i 可以理解成一个索引 扫描 stringBuilder
        for(int i=0;i<stringBuilder.length();){
            int count=1;//
            boolean flag=true;
            Byte b=null;
            while(flag){
                //取出一个字符
                String key=stringBuilder.substring(i,i+count);//i不懂,让count移动,知道匹配到一个字符
                b=map.get(key);
                if(b==null){//说明没有匹配到
                    count++;
                }else{
                    //匹配到
                    flag=false;
                }
            }
            list.add(b);
            i+=count;//i 直接移动到 count

        }
        //当for循环结束后 list中就存放了所有的字符
        //把list中的数据放入到byte[]并返回
        byte b[]=new byte[list.size()];
        for (int i = 0; i <list.size() ; i++) {
            b=list.get(i);
        }
        return b;
    }


    //完成数据的解码
    //重写转成 赫夫曼编码对应的二进制字符串
    // 赫夫曼编码对应的二进制的字符串 =》 对照 赫夫曼编码

    /**
     * 将一个byte 转成一个二进制的字符串
     * @param flag 标识是否需要补高位 如果是true 表示需要补高位,如果是false表示不补
     * @param b 传入的byte
     * @return 是该b 对应的二进制的字符串,(注意是按补码返回)
     */
    private static String byteToBitString(boolean flag,byte b) {
        //使用变量保存 b
        int temp = b;//将b 转成int

        //如果是正数还需要补高位
        if(flag)
        temp |= 256;//按位与256 1 0000 0000 | 0000 0001 =》1 0000 0001

        String str = Integer.toBinaryString(temp);//返回的二进制的补码

        if(flag){
            return str.substring(str.length()-8);
        }else{
            return str;
        }

    }




数据的加载和保存
SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的 API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式 为 parquet

加载数据
spark.read.load 是加载数据的通用方法
csv format jdbc json load option options orc parquet schema
table text textFile

如果读取不同格式的数据,可以对不同的数据格式进行设定

scala> spark.read.format("…")[.option("…")].load("…")
➢ format("…"):指定加载的数据类型,包括"csv"、“jdbc”、“json”、“orc”、"parquet"和 “textFile”。

➢ load("…"):在"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile"格式下需要传入加载 数据的路径。 ➢ option(”…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 我们前面都是使用 read API 先把文件加载到 DataFrame 然后再查询,其实,我们也可以直 接在文件上进行查询: 文件格式.文件路径

scala>spark.sql(“select * from json./opt/module/data/user.json”).show

保存数据
df.write.save 是保存数据的通用方法
csv jdbc json orc parquet textFile… …


如果保存不同格式的数据,可以对不同的数据格式进行设定

scala>df.write.format("…")[.option("…")].save("…")
➢ format("…"):指定保存的数据类型,包括"csv"、“jdbc”、“json”、“orc”、"parquet"和 “textFile”。

➢ save ("…"):在"csv"、“orc”、"parquet"和"textFile"格式下需要传入保存数据的路径。

➢ option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。 有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。

SaveMode 是一个枚举类,其中的常量包括:

Scala/Java Any Language Meaning

SaveMode.ErrorIfExists(default) “error”(default) 如果文件已经存在则抛出异常

SaveMode.Append “append” 如果文件已经存在则追加

SaveMode.Overwrite “overwrite” 如果文件已经存在则覆盖

SaveMode.Ignore “ignore” 如果文件已经存在则忽略

df.write.mode(“append”).json("/opt/module/data/output")

MySql
package sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object SparkSqlJDBC {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local
  • ").setAppName("sparkSQL")
        val spark: SparkSession =SparkSession.builder().config(sparkConf).getOrCreate()

        //读取mysql数据
        val df: DataFrame = spark.read
          .format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/jw?user=root&password=1234")
          .option("driver", "com.mysql.cj.jdbc.Driver")
          .option("password", "1234")
          .option("dbtable", "course")
          .load()

        df.show()

        //保存数据
        df.write
          .format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/test?user=root&password=1234")
          .option("driver", "com.mysql.cj.jdbc.Driver")
          .option("password", "1234")
          .option("dbtable", "course")
          .mode(SaveMode.Append)
          .save()


      }

    }


    Hive
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.0.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.2.1</version>
    </dependency>
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
    </dependency>

    将 hive-site.xml 文件拷贝到项目的 resources 目录中,代码实现

    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
        #显示表头
        <property>
            <name>hive.cli.print.header</name>
            <value>true</value>
        </property>

        <property>
            <name>hive.cli.print.current.db</name>
            <value>true</value>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionURL</name>
            <value>jdbc:mysql://hadoop103:3306/metastore?createDatabaseIfNotExist=true</value>
            <description>JDBC connect string for a JDBC metastore</description>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionDriverName</name>
            <value>com.mysql.jdbc.Driver</value>
            <description>Driver class name for a JDBC metastore</description>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionUserName</name>
            <value>root</value>
            <description>username to use against metastore database</description>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionPassword</name>
            #密码
            <value>123456</value>
            <description>password to use against metastore database</description>
        </property>
    </configuration>
    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local
  • ").setAppName("sparkSQL")
        val spark: SparkSession =SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

        spark.sql("show tables").show()

      }

    SparkStreaming
    Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语 如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。

    SparkStreaming 准实时(秒,分钟),微批次(时间) 的数据处理框架

    DStream 就是对 RDD 在实时数据处理场景的一种封装。

    背压机制
    ​ 为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制 数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。

    ​ 通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值 false,即不启用。

    wordcount
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.0.0</version>
    </dependency>


    需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并 统计不同单词出现的次数

    package sql

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

    object WordCount {
      def main(args: Array[String]): Unit = {
        //创建环境对象
        val sparkConf = new SparkConf().setMaster("local
  • ").setAppName("wordCont")
        //第一个参数表示环境配置
        //第二个参数表示批量处理的周期(采集周期)
        val ssc: StreamingContext =new StreamingContext(sparkConf,Seconds(3))

        //获取端口数据
        val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

        val words: DStream[String] = lines.flatMap(_.split(" "))

        val wordToOne: DStream[(String, Int)] = words.map((_, 1))

        val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)

        wordToCount.print()

        wordToCount.saveAsTextFiles("datas/result")

        //由于SparkStreaming采集器是长期执行的任务 不能直接关闭
        //如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕

        //1、启动采集器
        ssc.start()
        //2、等待采集器的关闭
        ssc.awaitTermination()



      }


    }
    RDD队列
    测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到 这个队列中的 RDD,都会作为一个 DStream 处理。

    ➢ 需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount
    package streaming

    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    import scala.collection.mutable


    object Queue {
      def main(args: Array[String]): Unit = {
        //1.初始化 Spark 配置信息
        val conf: SparkConf = new SparkConf().setMaster("local
  • ").setAppName("RDDStream")
        //2.初始化 SparkStreamingContext
        val ssc: StreamingContext = new StreamingContext(conf, Seconds(4))
        //3.创建 RDD 队列
        val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()
        //4.创建 QueueInputDStream
        val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue,oneAtATime = false)
        //5.处理队列中的 RDD 数据
        val mappedStream: DStream[(Int, Int)] = inputStream.map((_,1))
        val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey(_ + _)
        //6.打印结果
        reducedStream.print()
        //7.启动任务
        ssc.start()
        //8.循环创建并向 RDD 队列中放入 RDD
        for (i <- 1 to 5) {
          rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
          Thread.sleep(2000)
        }
        ssc.awaitTermination()
      }

    }
    自定义数据采集器
    package streaming

    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.dstream.ReceiverInputDStream
    import org.apache.spark.streaming.receiver.Receiver
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    import scala.util.Random

    object DIY {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local
  • ").setAppName("RDDStream")
        val ssc: StreamingContext = new StreamingContext(conf, Seconds(4))

        val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
        messageDS.print()



        ssc.start()
        ssc.awaitTermination()
      }

    }

    /**
    * 自定义数据采集器
    */
    class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){

      private var flag=true;

      override def onStart(): Unit = {
        new Thread(new Runnable {
          override def run(): Unit = {
            while(flag){
              val message: String ="采集的数据为"+new Random().nextInt(10).toString
              store(message)
              Thread.sleep(500)
            }
          }
        }).start()

      }

      override def onStop(): Unit = {
        flag=false

      }
    }











    欢迎光临 数学建模社区-数学中国 (http://www.madio.net/) Powered by Discuz! X2.5