在线时间 514 小时 最后登录 2023-12-1 注册时间 2018-7-17 听众数 15 收听数 0 能力 0 分 体力 40015 点 威望 0 点 阅读权限 255 积分 12716 相册 0 日志 0 记录 0 帖子 1419 主题 1178 精华 0 分享 0 好友 15
TA的每日心情 开心 2023-7-31 10:17
签到天数: 198 天
[LV.7]常住居民III
自我介绍 数学中国浅夏
数据结构+spark
赫夫曼树
赫夫曼编码-数据压缩
压缩
解码
数据的加载和保存
加载数据
保存数据
MySql
Hive
SparkStreaming
背压机制
wordcount
RDD队列
自定义数据采集器
赫夫曼树
1)给定n个权值作为n个叶子结点,构造一棵二叉树,若该树的带权路径长度(wpl)达到最小,称这样的二叉树为最优二叉树,也称为哈夫曼树(Huffman Tree), 还有的书翻译为霍夫曼树。
2)赫夫曼树是带权路径长度最短的树,权值较大的结点离根较近。
构成赫夫曼树的步骤:
1)从小到大进行排序, 将每一个数据,每个数据都是一个节点 , 每个节点可以看成是一颗最简单的二叉树
2)取出根节点权值最小的两颗二叉树
3)组成一颗新的二叉树, 该新的二叉树的根节点的权值是前面两颗二叉树根节点权值的和
4)再将这颗新的二叉树,以根节点的权值大小 再次排序, 不断重复 1-2-3-4 的步骤,直到数列中,所有的数据都被处理,就得到一颗赫夫曼树
package huffmantree;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
public class HuffmanTree {
public static void main(String[] args) {
int []arr={13,7,8,3,29,6,1};
Node root = createHuffmanTree(arr);
root.preOrder();
}
//创建赫夫曼树的方法
public static Node createHuffmanTree(int[]arr ) {
//1、遍历arr数组
//2、将arr的每个元素构成一个node
//3、将Node放入到ArrayList中
List<Node> nodes = new ArrayList<>();
for (int value : arr) {
nodes.add(new Node(value));
}
while (nodes.size() > 1) {
//排序 从小到大
Collections.sort(nodes);
//取出根节点权值最小的两颗二叉树
//(1)取出权值最小的节点(二叉树)
Node leftNode = nodes.get(0);
//(2)取出权值第二小的节点(二叉树)
Node rightNode = nodes.get(1);
//(3)构建一颗新的二叉树
Node parent = new Node(leftNode.value + rightNode.value);
parent.left = leftNode;
parent.right = rightNode;
//(4)从ArrayList中删除处理过的二叉树
nodes.remove(leftNode);
nodes.remove(rightNode);
//(5)将parent加入到nodes
nodes.add(parent);
}
return nodes.get(0);
}
}
//创建节点类
//为了让Node 对象支持排序Collections集合排序
//让Node 实现Comparable接口
class Node implements Comparable<Node>{
int value;//节点权值
Node left;//指向左子节点
Node right;//指向右子节点
public Node(int value){
this.value=value;
}
//前序遍历
public void preOrder(){
System.out.println(this.value);
if(this.left!=null)this.left.preOrder();
if(this.right!=null)this.right.preOrder();
}
@Override
public String toString() {
return "Node{" +
"value=" + value +
'}';
}
@Override
public int compareTo(Node o) {
//从小到大排序
return this.value-o.value;
}
}
赫夫曼编码-数据压缩
1)赫夫曼编码也翻译为哈夫曼编码(Huffman Coding),又称霍夫曼编码,是一种编码方式, 属于一种程序算法
2)赫夫曼编码是赫哈夫曼树在电讯通信中的经典的应用之一。
3)赫夫曼编码广泛地用于数据文件压缩。其压缩率通常在20%~90%之间
4)赫夫曼码是可变字长编码(VLC)的一种。Huffman于1952年提出一种编码方法,称之为最佳编码
压缩
package huffmantree.huffmancode;
import java.util.*;
public class HuffmanCode {
public static void main(String[] args) {
String content="i like like like java do you like a java";
byte[]contentBytes=content.getBytes();
byte[] huffmanCodesBytes = huffmanZip(contentBytes);
System.out.println("压缩后的结果是:"+Arrays.toString(huffmanCodesBytes));
}
//使用一个方法,将前面的方法封装起来,便于调用
private static byte[] huffmanZip(byte[]bytes){
//
List<Node>node=getNodes(bytes);
//更具node创建赫夫曼树
Node huffmanTreeRoot=createHuffmanTree(node);
Map<Byte, String> huffmanCodes = getCodes(huffmanTreeRoot);
//根据生成的赫夫曼编码,压缩得到压缩后的赫夫曼编码字节数组
return zip(bytes,huffmanCodes);
}
/**
*
* @param bytes 这是原始的字符串对应的byte[]
* @param huffmanCodes 生成的赫夫曼编码map
* @return 返回赫夫曼编码处理后的byte []
*/
private static byte[] zip(byte[]bytes,Map<Byte,String> huffmanCodes){
//1、利用huffmanCodes 将 bytes 转成 赫夫曼编码对应的字符串
StringBuilder stringBuilder = new StringBuilder();
for(byte b:bytes){
stringBuilder.append(huffmanCodes.get(b));
}
//将stringBuilder转成 byte[]
//统计返回 byte[] huffmanCode 长度
int len;
if(stringBuilder.length()%8==0){
len=stringBuilder.length()/8;
}else{
len =stringBuilder.length()/8+1;
}
//创建 存储压缩后的byte数组
byte[]huffmanCodeBytes=new byte[len];
int index=0;
byte[]by=new byte[len];
for (int i = 0; i < stringBuilder.length(); i+=8) {//每八位对应一个byte,所以步长+8
String strByte;
if(i+8>stringBuilder.length()){//不够八位
strByte=stringBuilder.substring(i);
}else {
strByte = stringBuilder.substring(i, i + 8);
}
//将strByte 转换成一个 byte ,放入到 huffmanCodeBytes
huffmanCodeBytes[index]=(byte) Integer.parseInt(strByte,2);
index++;
}
return huffmanCodeBytes;
}
private static List<Node>getNodes(byte[]bytes){
ArrayList <Node>nodes=new ArrayList<>();
//遍历bytes,统计每一个byte出现次数->map[key,value]
Map<Byte,Integer> counts=new HashMap<>();
for(byte b:bytes){
Integer count=counts.getOrDefault(b,0);
counts.put(b,count+1);
}
//把每一个键值对转成一个Node对象,并加入到nodes集合
for(Map.Entry<Byte,Integer>entry:counts.entrySet()){
nodes.add(new Node(entry.getKey(),entry.getValue()));
}
return nodes;
}
//创建对应的赫夫曼树
private static Node createHuffmanTree(List<Node>nodes){
while(nodes.size()>1){
//从小到大
Collections.sort(nodes);
Node left=nodes.get(0);
Node right=nodes.get(1);
Node parent=new Node(null,left.weight+right.weight);
parent.left=left;
parent.right=right;
nodes.remove(left);
nodes.remove(right);
nodes.add(parent);
}
return nodes.get(0);
}
//生成赫夫曼树对应的赫夫曼编码
static Map<Byte,String> huffmanCodes=new HashMap<>();
//static StringBuilder stringBuilder=new StringBuilder();
private static Map<Byte,String>getCodes(Node root){
if(root==null){
return null;
}
//处理root的左子树
getCodes(root.left,"0",new StringBuilder());
//处理root的右子树
getCodes(root.right,"1",new StringBuilder());
return huffmanCodes;
}
/**
* 功能:将传入的node结点的赫夫曼编码得到,并放入到huffmanCodes集合
* @param node 传入结点
* @param code 路径:左子结点是0,右子结点是1
* @param stringBuilder 用于拼接路径
*/
private static void getCodes(Node node,String code,StringBuilder stringBuilder){
StringBuilder stringBuilder2 = new StringBuilder(stringBuilder);
//将code加到stringBuilder2
stringBuilder2.append(code);
if(node!=null){//如果node==null 不处理
//判断当前node 是叶子结点还是非叶子结点
if(node.data==null){//非叶子结点
//递归处理
//向左
getCodes(node.left,"0",stringBuilder2);
//向右递归
getCodes(node.right,"1",stringBuilder2);
}else{//说明是一个叶子结点
//表示找到某个叶子结点的最后
huffmanCodes.put(node.data,stringBuilder2.toString());
}
}
}
}
class Node implements Comparable<Node>{
Byte data;//存放数据
int weight;//权值,表示字符出现的次数
Node left;
Node right;
public Node(Byte data, int weight) {
this.data = data;
this.weight = weight;
}
@Override
public int compareTo(Node o) {
//从小到大排序
return this.weight-o.weight;
}
@Override
public String toString() {
return "Node{" +
"data=" + data +
", weight=" + weight +
'}';
}
//前序遍历
public void preOrder(){
System.out.println(this);
if(this.left!=null)this.left.preOrder();
if(this.right!=null)this.right.preOrder();
}
}
解码
public class HuffmanCode {
public static void main(String[] args) {
String content = "i like like like java do you like a java";
byte[] contentBytes = content.getBytes();
byte[] huffmanCodesBytes = huffmanZip(contentBytes);
System.out.println("压缩后的结果是:" + Arrays.toString(huffmanCodesBytes));
byte[] sourceBytes = decode(huffmanCodes, huffmanCodesBytes);
System.out.println("原来的字符串:"+new String(sourceBytes));
}
//编写一个方法,完成对压缩数据的解码
/**
*
* @param huffmanCodes 赫夫曼编码 map
* @param huffmanBytes 赫夫曼编码得到字节数组
* @return 就是原来的字符串对应的数组
*/
private static byte[]decode(Map<Byte,String>huffmanCodes,byte[]huffmanBytes){
//先得到huffmanBytes对应的二进制的字符串,
StringBuilder stringBuilder = new StringBuilder();
//将byte数组转成二进制的字符串
for (int i = 0; i < huffmanBytes.length; i++) {
byte b=huffmanBytes;
//判断是不是最后一个字节
boolean flag=(i==huffmanBytes.length-1);
stringBuilder.append(byteToBitString(!flag,b));
}
System.out.println(stringBuilder);
//把字符按串按照指定的赫夫曼编码进行解码
//把赫夫曼编码表进行调换
Map<String,Byte>map=new HashMap<>();
for(Map.Entry <Byte,String>entry:huffmanCodes.entrySet()){
map.put(entry.getValue(),entry.getKey());
}
//创建一个集合,存放byte
List<Byte>list=new ArrayList<>();
//i 可以理解成一个索引 扫描 stringBuilder
for(int i=0;i<stringBuilder.length();){
int count=1;//
boolean flag=true;
Byte b=null;
while(flag){
//取出一个字符
String key=stringBuilder.substring(i,i+count);//i不懂,让count移动,知道匹配到一个字符
b=map.get(key);
if(b==null){//说明没有匹配到
count++;
}else{
//匹配到
flag=false;
}
}
list.add(b);
i+=count;//i 直接移动到 count
}
//当for循环结束后 list中就存放了所有的字符
//把list中的数据放入到byte[]并返回
byte b[]=new byte[list.size()];
for (int i = 0; i <list.size() ; i++) {
b=list.get(i);
}
return b;
}
//完成数据的解码
//重写转成 赫夫曼编码对应的二进制字符串
// 赫夫曼编码对应的二进制的字符串 =》 对照 赫夫曼编码
/**
* 将一个byte 转成一个二进制的字符串
* @param flag 标识是否需要补高位 如果是true 表示需要补高位,如果是false表示不补
* @param b 传入的byte
* @return 是该b 对应的二进制的字符串,(注意是按补码返回)
*/
private static String byteToBitString(boolean flag,byte b) {
//使用变量保存 b
int temp = b;//将b 转成int
//如果是正数还需要补高位
if(flag)
temp |= 256;//按位与256 1 0000 0000 | 0000 0001 =》1 0000 0001
String str = Integer.toBinaryString(temp);//返回的二进制的补码
if(flag){
return str.substring(str.length()-8);
}else{
return str;
}
}
数据的加载和保存
SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的 API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式 为 parquet
加载数据
spark.read.load 是加载数据的通用方法
csv format jdbc json load option options orc parquet schema
table text textFile
如果读取不同格式的数据,可以对不同的数据格式进行设定
scala> spark.read.format("…")[.option("…")].load("…")
➢ format("…"):指定加载的数据类型,包括"csv"、“jdbc”、“json”、“orc”、"parquet"和 “textFile”。
➢ load("…"):在"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile"格式下需要传入加载 数据的路径。 ➢ option(”…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 我们前面都是使用 read API 先把文件加载到 DataFrame 然后再查询,其实,我们也可以直 接在文件上进行查询: 文件格式.文件路径
scala>spark.sql(“select * from json./opt/module/data/user.json”).show
保存数据
df.write.save 是保存数据的通用方法
csv jdbc json orc parquet textFile… …
如果保存不同格式的数据,可以对不同的数据格式进行设定
scala>df.write.format("…")[.option("…")].save("…")
➢ format("…"):指定保存的数据类型,包括"csv"、“jdbc”、“json”、“orc”、"parquet"和 “textFile”。
➢ save ("…"):在"csv"、“orc”、"parquet"和"textFile"格式下需要传入保存数据的路径。
➢ option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。 有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。
SaveMode 是一个枚举类,其中的常量包括:
Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) “error”(default) 如果文件已经存在则抛出异常
SaveMode.Append “append” 如果文件已经存在则追加
SaveMode.Overwrite “overwrite” 如果文件已经存在则覆盖
SaveMode.Ignore “ignore” 如果文件已经存在则忽略
df.write.mode(“append”).json("/opt/module/data/output")
MySql
package sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object SparkSqlJDBC {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("sparkSQL")
val spark: SparkSession =SparkSession.builder().config(sparkConf).getOrCreate()
//读取mysql数据
val df: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/jw?user=root&password=1234")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("password", "1234")
.option("dbtable", "course")
.load()
df.show()
//保存数据
df.write
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/test?user=root&password=1234")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("password", "1234")
.option("dbtable", "course")
.mode(SaveMode.Append)
.save()
}
}
Hive
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
将 hive-site.xml 文件拷贝到项目的 resources 目录中,代码实现
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
#显示表头
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop103:3306/metastore?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
#密码
<value>123456</value>
<description>password to use against metastore database</description>
</property>
</configuration>
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local ").setAppName("sparkSQL")
val spark: SparkSession =SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
spark.sql("show tables").show()
}
SparkStreaming
Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语 如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
SparkStreaming 准实时(秒,分钟),微批次(时间) 的数据处理框架
DStream 就是对 RDD 在实时数据处理场景的一种封装。
背压机制
为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制 数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值 false,即不启用。
wordcount
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并 统计不同单词出现的次数
package sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object WordCount {
def main(args: Array[String]): Unit = {
//创建环境对象
val sparkConf = new SparkConf().setMaster("local ").setAppName("wordCont")
//第一个参数表示环境配置
//第二个参数表示批量处理的周期(采集周期)
val ssc: StreamingContext =new StreamingContext(sparkConf,Seconds(3))
//获取端口数据
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordToOne: DStream[(String, Int)] = words.map((_, 1))
val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)
wordToCount.print()
wordToCount.saveAsTextFiles("datas/result")
//由于SparkStreaming采集器是长期执行的任务 不能直接关闭
//如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕
//1、启动采集器
ssc.start()
//2、等待采集器的关闭
ssc.awaitTermination()
}
}
RDD队列
测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到 这个队列中的 RDD,都会作为一个 DStream 处理。
➢ 需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount
package streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object Queue {
def main(args: Array[String]): Unit = {
//1.初始化 Spark 配置信息
val conf: SparkConf = new SparkConf().setMaster("local ").setAppName("RDDStream")
//2.初始化 SparkStreamingContext
val ssc: StreamingContext = new StreamingContext(conf, Seconds(4))
//3.创建 RDD 队列
val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()
//4.创建 QueueInputDStream
val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue,oneAtATime = false)
//5.处理队列中的 RDD 数据
val mappedStream: DStream[(Int, Int)] = inputStream.map((_,1))
val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey(_ + _)
//6.打印结果
reducedStream.print()
//7.启动任务
ssc.start()
//8.循环创建并向 RDD 队列中放入 RDD
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
}
ssc.awaitTermination()
}
}
自定义数据采集器
package streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.Random
object DIY {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local ").setAppName("RDDStream")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(4))
val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
messageDS.print()
ssc.start()
ssc.awaitTermination()
}
}
/**
* 自定义数据采集器
*/
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
private var flag=true;
override def onStart(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
while(flag){
val message: String ="采集的数据为"+new Random().nextInt(10).toString
store(message)
Thread.sleep(500)
}
}
}).start()
}
override def onStop(): Unit = {
flag=false
}
}
zan