问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

spark打包翻译的优点

发布网友 发布时间:2022-04-08 18:34

我来回答

2个回答

懂视网 时间:2022-04-08 22:56

  • Java
  • JavaSparkContext sc = ...; // An existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

    除了基本的SQLContext,也可以创建HiveContext。SQLContext和HiveContext区别与联系为:

  • SQLContext现在只支持SQL语法解析器(SQL-92语法)
  • HiveContext现在支持SQL语法解析器和HiveSQL语法解析器,默认为HiveSQL语法解析器,用户可以通过配置切换成SQL语法解析器,来运行HiveSQL不支持的语法。
  • 使用HiveContext可以使用Hive的UDF,读写Hive表数据等Hive操作。SQLContext不可以对Hive进行操作。
  • Spark SQL未来的版本会不断丰富SQLContext的功能,做到SQLContext和HiveContext的功能容和,最终可能两者会统一成一个Context
  • HiveContext包装了Hive的依赖包,把HiveContext单独拿出来,可以在部署基本的Spark的时候就不需要Hive的依赖包,需要使用HiveContext时再把Hive的各种依赖包加进来。

    SQL的解析器可以通过配置spark.sql.dialect参数进行配置。在SQLContext中只能使用Spark SQL提供的”sql“解析器。在HiveContext中默认解析器为”hiveql“,也支持”sql“解析器。

     

    2.2 创建DataFrames(Creating DataFrames)

    使用SQLContext,spark应用程序(Application)可以通过RDD、Hive表、JSON格式数据等数据源创建DataFrames。下面是基于JSON文件创建DataFrame的示例:

  • Scala
  • val sc: SparkContext // An existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    val df = sqlContext.read.json("examples/src/main/resources/people.json")
    
    // Displays the content of the DataFrame to stdout
    df.show()
  • Java
  • JavaSparkContext sc = ...; // An existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    
    DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");
    
    // Displays the content of the DataFrame to stdout
    df.show();

     

    2.3 DataFrame操作(DataFrame Operations)

    DataFrames支持Scala、Java和Python的操作接口。下面是Scala和Java的几个操作示例:

  • Scala
  • val sc: SparkContext // An existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    // Create the DataFrame
    val df = sqlContext.read.json("examples/src/main/resources/people.json")
    
    // Show the content of the DataFrame
    df.show()
    // age name
    // null Michael
    // 30 Andy
    // 19 Justin
    
    // Print the schema in a tree format
    df.printSchema()
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)
    
    // Select only the "name" column
    df.select("name").show()
    // name
    // Michael
    // Andy
    // Justin
    
    // Select everybody, but increment the age by 1
    df.select(df("name"), df("age") + 1).show()
    // name (age + 1)
    // Michael null
    // Andy 31
    // Justin 20
    
    // Select people older than 21
    df.filter(df("age") > 21).show()
    // age name
    // 30 Andy
    
    // Count people by age
    df.groupBy("age").count().show()
    // age count
    // null 1
    // 19 1
    // 30 1
  • Java
  • JavaSparkContext sc // An existing SparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    // Create the DataFrame
    DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");
    
    // Show the content of the DataFrame
    df.show();
    // age name
    // null Michael
    // 30 Andy
    // 19 Justin
    
    // Print the schema in a tree format
    df.printSchema();
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)
    
    // Select only the "name" column
    df.select("name").show();
    // name
    // Michael
    // Andy
    // Justin
    
    // Select everybody, but increment the age by 1
    df.select(df.col("name"), df.col("age").plus(1)).show();
    // name (age + 1)
    // Michael null
    // Andy 31
    // Justin 20
    
    // Select people older than 21
    df.filter(df.col("age").gt(21)).show();
    // age name
    // 30 Andy
    
    // Count people by age
    df.groupBy("age").count().show();
    // age count
    // null 1
    // 19 1
    // 30 1

    详细的DataFrame API请参考 API Documentation。

    除了简单列引用和表达式,DataFrames还有丰富的library,功能包括string操作、date操作、常见数学操作等。详细内容请参考 DataFrame Function Reference。

     

    2.4 运行SQL查询程序(Running SQL Queries Programmatically)

    Spark Application可以使用SQLContext的sql()方法执行SQL查询操作,sql()方法返回的查询结果为DataFrame格式。代码如下:

  • Scala
  • val sqlContext = ... // An existing SQLContext
    val df = sqlContext.sql("SELECT * FROM table")
  • Java
  • SQLContext sqlContext = ... // An existing SQLContext
    DataFrame df = sqlContext.sql("SELECT * FROM table")

     

    2.5 DataFrames与RDDs的相互转换(Interoperating with RDDs)

    Spark SQL支持两种RDDs转换为DataFrames的方式:

  • 使用反射获取RDD内的Schema
  • 当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。
  • 通过编程接口指定Schema
  • 通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。
  • 这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema
  •  

    2.5.1 使用反射获取Schema(Inferring the Schema Using Reflection)

    Spark SQL支持将JavaBean的RDD自动转换成DataFrame。通过反射获取Bean的基本信息,依据Bean的信息定义Schema。当前Spark SQL版本(Spark 1.5.2)不支持嵌套的JavaBeans和复杂数据类型(如:List、Array)。创建一个实现Serializable接口包含所有属性getters和setters的类来创建一个JavaBean。通过调用createDataFrame并提供JavaBean的Class object,指定一个Schema给一个RDD。示例如下:

    public static class Person implements Serializable {
     private String name;
     private int age;
    
     public String getName() {
     return name;
     }
    
     public void setName(String name) {
     this.name = name;
     }
    
     public int getAge() {
     return age;
     }
    
     public void setAge(int age) {
     this.age = age;
     }
    }
    // sc is an existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    
    // Load a text file and convert each line to a JavaBean.
    JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
     new Function<String, Person>() {
     public Person call(String line) throws Exception {
     String[] parts = line.split(",");
    
     Person person = new Person();
     person.setName(parts[0]);
     person.setAge(Integer.parseInt(parts[1].trim()));
    
     return person;
     }
     });
    
    // Apply a schema to an RDD of JavaBeans and register it as a table.
    DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
    schemaPeople.registerTempTable("people");
    
    // SQL can be run over RDDs that have been registered as tables.
    DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
    
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
     public String call(Row row) {
     return "Name: " + row.getString(0);
     }
    }).collect();

     

    2.5.2 通过编程接口指定Schema(Programmatically Specifying the Schema)

    当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步:

  • 从原来的RDD创建一个Row格式的RDD
  • 创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema
  • 通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema
  • 示例如下:

    import org.apache.spark.api.java.function.Function;
    // Import factory methods provided by DataTypes.
    import org.apache.spark.sql.types.DataTypes;
    // Import StructType and StructField
    import org.apache.spark.sql.types.StructType;
    import org.apache.spark.sql.types.StructField;
    // Import Row.
    import org.apache.spark.sql.Row;
    // Import RowFactory.
    import org.apache.spark.sql.RowFactory;
    
    // sc is an existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    
    // Load a text file and convert each line to a JavaBean.
    JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt");
    
    // The schema is encoded in a string
    String schemaString = "name age";
    
    // Generate the schema based on the string of schema
    List<StructField> fields = new ArrayList<StructField>();
    for (String fieldName: schemaString.split(" ")) {
     fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
    }
    StructType schema = DataTypes.createStructType(fields);
    
    // Convert records of the RDD (people) to Rows.
    JavaRDD<Row> rowRDD = people.map(
     new Function<String, Row>() {
     public Row call(String record) throws Exception {
     String[] fields = record.split(",");
     return RowFactory.create(fields[0], fields[1].trim());
     }
     });
    
    // Apply the schema to the RDD.
    DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
    
    // Register the DataFrame as a table.
    peopleDataFrame.registerTempTable("people");
    
    // SQL can be run over RDDs that have been registered as tables.
    DataFrame results = sqlContext.sql("SELECT name FROM people");
    
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    List<String> names = results.javaRDD().map(new Function<Row, String>() {
     public String call(Row row) {
     return "Name: " + row.getString(0);
     }
    }).collect();

     

    3 数据源(Data Source)

    Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。Data Sources这部分首先描述了对Spark的数据源执行加载和保存的常用方法,然后对内置数据源进行深入介绍。

     

    3.1 一般Load/Save方法

    Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。修改配置项spark.sql.sources.default,可修改默认数据源格式。读取Parquet文件示例如下:

  • Scala
  • val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
    df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
  • Java
  • DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet");
    df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

     

    3.1.1 手动指定选项(Manually Specifying Options)

    当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称(json,parquet,jdbc)。通过指定的数据源格式名,可以对DataFrames进行类型转换操作。示例如下:

  • Scala
  • val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
    df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
  • Java
  • DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json");
    df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

     

    3.1.2 存储模式(Save Modes)

    可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表:

    技术分享

     

    3.1.3 持久化到表(Saving to Persistent Tables)

    当使用HiveContext时,可以通过saveAsTable方法将DataFrames存储到表中。与registerTempTable方法不同的是,saveAsTable将DataFrame中的内容持久化到表中,并在HiveMetastore中存储元数据。存储一个DataFrame,可以使用SQLContext的table方法。table先创建一个表,方法参数为要创建的表的表名,然后将DataFrame持久化到这个表中。

    默认的saveAsTable方法将创建一个“managed table”,表示数据的位置可以通过metastore获得。当存储数据的表被删除时,managed table也将自动删除。

     

    3.2 Parquet文件

    Parquet是一种支持多种数据处理系统的柱状的数据格式,Parquet文件中保留了原始数据的模式。Spark SQL提供了Parquet文件的读写功能。

     

    3.2.1 读取Parquet文件(Loading Data Programmatically)

    读取Parquet文件示例如下:

  • Scala
  • // sqlContext from the previous example is used in this example.
    // This is used to implicitly convert an RDD to a DataFrame.
    import sqlContext.implicits._
    
    val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
    
    // The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
    people.write.parquet("people.parquet")
    
    // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
    // The result of loading a Parquet file is also a DataFrame.
    val parquetFile = sqlContext.read.parquet("people.parquet")
    
    //Parquet files can also be registered as tables and then used in SQL statements.
    parquetFile.registerTempTable("parquetFile")
    val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
    teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
  • Java
  • // sqlContext from the previous example is used in this example.
    
    DataFrame schemaPeople = ... // The DataFrame from the previous example.
    
    // DataFrames can be saved as Parquet files, maintaining the schema information.
    schemaPeople.write().parquet("people.parquet");
    
    // Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
    // The result of loading a parquet file is also a DataFrame.
    DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
    
    // Parquet files can also be registered as tables and then used in SQL statements.
    parquetFile.registerTempTable("parquetFile");
    DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
    List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
     public String call(Row row) {
     return "Name: " + row.getString(0);
     }
    }).collect();

     

    3.2.2 解析分区信息(Partition Discovery)

    对表进行分区是对数据进行优化的方式之一。在分区的表内,数据通过分区列将数据存储在不同的目录下。Parquet数据源现在能够自动发现并解析分区信息。例如,对人口数据进行分区存储,分区列为gender和country,使用下面的目录结构:

    path
    └── to
     └── table
     ├── gender=male
     │ ├── ...
     │ │
     │ ├── country=US
     │ │ └── data.parquet
     │ ├── country=CN
     │ │ └── data.parquet
     │ └── ...
     └── gender=female
      ├── ...
      │
      ├── country=US
      │ └── data.parquet
      ├── country=CN
      │ └── data.parquet
      └── ...

    通过传递path/to/table给 SQLContext.read.parquet或SQLContext.read.load,Spark SQL将自动解析分区信息。返回的DataFrame的Schema如下:

    root
    |-- name: string (nullable = true)
    |-- age: long (nullable = true)
    |-- gender: string (nullable = true)
    |-- country: string (nullable = true)

    需要注意的是,数据的分区列的数据类型是自动解析的。当前,支持数值类型和字符串类型。自动解析分区类型的参数为:spark.sql.sources.partitionColumnTypeInference.enabled,默认值为true。如果想关闭该功能,直接将该参数设置为disabled。此时,分区列数据格式将被默认设置为string类型,不再进行类型解析。

     

    3.2.3 Schema合并(Schema Merging)

    像ProtocolBuffer、Avro和Thrift那样,Parquet也支持Schema evolution(Schema演变)。用户可以先定义一个简单的Schema,然后逐渐的向Schema中增加列描述。通过这种方式,用户可以获取多个有不同Schema但相互兼容的Parquet文件。现在Parquet数据源能自动检测这种情况,并合并这些文件的schemas。

    因为Schema合并是一个高消耗的操作,在大多数情况下并不需要,所以Spark SQL从1.5.0开始默认关闭了该功能。可以通过下面两种方式开启该功能:

  • 当数据源为Parquet文件时,将数据源选项mergeSchema设置为true
  • 设置全局SQL选项spark.sql.parquet.mergeSchema为true
  • 示例如下:

  • Scala
  • // sqlContext from the previous example is used in this example.
    // This is used to implicitly convert an RDD to a DataFrame.
    import sqlContext.implicits._
    
    // Create a simple DataFrame, stored into a partition directory
    val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
    df1.write.parquet("data/test_table/key=1")
    
    // Create another DataFrame in a new partition directory,
    // adding a new column and dropping an existing column
    val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
    df2.write.parquet("data/test_table/key=2")
    
    // Read the partitioned table
    val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
    df3.printSchema()
    
    // The final schema consists of all 3 columns in the Parquet files together
    // with the partitioning column appeared in the partition directory paths.
    // root
    // |-- single: int (nullable = true)
    // |-- double: int (nullable = true)
    // |-- triple: int (nullable = true)
    // |-- key : int (nullable = true)

     

    3.2.4 Hive metastore Parquet表转换(Hive metastore Parquet table conversion)

    当向Hive metastore中读写Parquet表时,Spark SQL将使用Spark SQL自带的Parquet SerDe(SerDe:Serialize/Deserilize的简称,目的是用于序列化和反序列化),而不是用Hive的SerDe,Spark SQL自带的SerDe拥有更好的性能。这个优化的配置参数为spark.sql.hive.convertMetastoreParquet,默认值为开启。

     

    3.2.4.1 Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation)

    从表Schema处理的角度对比Hive和Parquet,有两个区别:

  • Hive区分大小写,Parquet不区分大小写
  • hive允许所有的列为空,而Parquet不允许所有的列全为空
  • 由于这两个区别,当将Hive metastore Parquet表转换为Spark SQL Parquet表时,需要将Hive metastore schema和Parquet schema进行一致化。一致化规则如下:

  • 这两个schema中的同名字段必须具有相同的数据类型。一致化后的字段必须为Parquet的字段类型。这个规则同时也解决了空值的问题。
  • 一致化后的schema只包含Hive metastore中出现的字段。
  • 忽略只出现在Parquet schema中的字段
  • 只在Hive metastore schema中出现的字段设为nullable字段,并加到一致化后的schema中
  •  

    3.2.4.2 元数据刷新(Metadata Refreshing)

    Spark SQL缓存了Parquet元数据以达到良好的性能。当Hive metastore Parquet表转换为enabled时,表修改后缓存的元数据并不能刷新。所以,当表被Hive或其它工具修改时,则必须手动刷新元数据,以保证元数据的一致性。示例如下:

  • Scala
  • // sqlContext is an existing HiveContext
    sqlContext.refreshTable("my_table")
  • Java
  • // sqlContext is an existing HiveContext
    sqlContext.refreshTable("my_table")

     

    3.2.5 配置(Configuration)

    配置Parquet可以使用SQLContext的setConf方法或使用SQL执行SET key=value命令。详细参数说明如下:

    技术分享

     

    3.3 JSON数据集

    Spark SQL能自动解析JSON数据集的Schema,读取JSON数据集为DataFrame格式。读取JSON数据集方法为SQLContext.read().json()。该方法将String格式的RDD或JSON文件转换为DataFrame。

    需要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自满足有效的JSON对象。如果用多行描述一个JSON对象,会导致读取出错。读取JSON数据集示例如下:

  • Scala
  • // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    // A JSON dataset is pointed to by path.
    // The path can be either a single text file or a directory storing text files.
    val path = "examples/src/main/resources/people.json"
    val people = sqlContext.read.json(path)
    
    // The inferred schema can be visualized using the printSchema() method.
    people.printSchema()
    // root
    // |-- age: integer (nullable = true)
    // |-- name: string (nullable = true)
    
    // Register this DataFrame as a table.
    people.registerTempTable("people")
    
    // SQL statements can be run by using the sql methods provided by sqlContext.
    val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
    
    // Alternatively, a DataFrame can be created for a JSON dataset represented by
    // an RDD[String] storing one JSON object per string.
    val anotherPeopleRDD = sc.parallelize(
     """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
  • Java
  • // sc is an existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    
    // A JSON dataset is pointed to by path.
    // The path can be either a single text file or a directory storing text files.
    DataFrame people = sqlContext.read().json("examples/src/main/resources/people.json");
    
    // The inferred schema can be visualized using the printSchema() method.
    people.printSchema();
    // root
    // |-- age: integer (nullable = true)
    // |-- name: string (nullable = true)
    
    // Register this DataFrame as a table.
    people.registerTempTable("people");
    
    // SQL statements can be run by using the sql methods provided by sqlContext.
    DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
    
    // Alternatively, a DataFrame can be created for a JSON dataset represented by
    // an RDD[String] storing one JSON object per string.
    List<String> jsonData = Arrays.asList(
     "{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}");
    JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
    DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);

     

    3.4 Hive表

    Spark SQL支持对Hive的读写操作。需要注意的是,Hive所依赖的包,没有包含在Spark assembly包中。增加Hive时,需要在Spark的build中添加 -Phive 和 -Phivethriftserver配置。这两个配置将build一个新的assembly包,这个assembly包含了Hive的依赖包。注意,必须上这个心的assembly包到所有的worker节点上。因为worker节点在访问Hive中数据时,会调用Hive的 serialization and deserialization libraries(SerDes),此时将用到Hive的依赖包。

    Hive的配置文件为conf/目录下的hive-site.xml文件。在YARN上执行查询命令之前,lib_managed/jars目录下的datanucleus包和conf/目录下的hive-site.xml必须可以被driverhe和所有的executors所访问。确保被访问,最方便的方式就是在spark-submit命令中通过--jars选项和--file选项指定。

    操作Hive时,必须创建一个HiveContext对象,HiveContext继承了SQLContext,并增加了对MetaStore和HiveQL的支持。除了sql方法,HiveContext还提供了一个hql方法,hql方法可以执行HiveQL语法的查询语句。示例如下:

  • Scala
  • // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    
    sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
    sqlContext.sql("LOAD DATA LOCAL INPATH ‘examples/src/main/resources/kv1.txt‘ INTO TABLE src")
    
    // Queries are expressed in HiveQL
    sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
  • Java
  • // sc is an existing JavaSparkContext.
    HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc);
    
    sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
    sqlContext.sql("LOAD DATA LOCAL INPATH ‘examples/src/main/resources/kv1.txt‘ INTO TABLE src");
    
    // Queries are expressed in HiveQL.
    Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();

     

    3.4.1 访问不同版本的Hive Metastore(Interacting with Different Versions of Hive Metastore)

    Spark SQL经常需要访问Hive metastore,Spark SQL可以通过Hive metastore获取Hive表的元数据。从Spark 1.4.0开始,Spark SQL只需简单的配置,就支持各版本Hive metastore的访问。注意,涉及到metastore时Spar SQL忽略了Hive的版本。Spark SQL内部将Hive反编译至Hive 1.2.1版本,Spark SQL的内部操作(serdes, UDFs, UDAFs, etc)都调用Hive 1.2.1版本的class。版本配置项见下面表格:

    技术分享

     

    3.5 JDBC To Other Databases

    Spark SQL支持使用JDBC访问其他数据库。当时用JDBC访问其它数据库时,最好使用JdbcRDD。使用JdbcRDD时,Spark SQL操作返回的DataFrame会很方便,也会很方便的添加其他数据源数据。JDBC数据源因为不需要用户提供ClassTag,所以很适合使用Java或Python进行操作。
    使用JDBC访问数据源,需要在spark classpath添加JDBC driver配置。例如,从Spark Shell连接postgres的配置为:

    SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

    远程数据库的表,可用DataFrame或Spark SQL临时表的方式调用数据源API。支持的参数有:

    技术分享

    代码示例如下:

  • Scala
  • val jdbcDF = sqlContext.read.format("jdbc").options( 
     Map("url" -> "jdbc:postgresql:dbserver",
     "dbtable" -> "schema.tablename")).load()
  • Java
  • Map<String, String> options = new HashMap<String, String>();
    options.put("url", "jdbc:postgresql:dbserver");
    options.put("dbtable", "schema.tablename");
    
    DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load();

     

    3.6 故障排除(Troubleshooting)

  • 在客户端session和所有的executors上,JDBC driver必须对启动类加载器(primordial class loader)设置为visible。因为当创建一个connection时,Java的DriverManager类会执行安全验证,安全验证将忽略所有对启动类加载器为非visible的driver。一个很方便的解决方法是,修改所有worker节点上的compute_classpath.sh脚本,将driver JARs添加至脚本。
  • 有些数据库(例:H2)将所有的名字转换为大写,所以在这些数据库中,Spark SQL也需要将名字全部大写。
  •  

    4 性能调优

     

    4.1 缓存数据至内存(Caching Data In Memory)

    Spark SQL可以通过调用sqlContext.cacheTable("tableName") 或者dataFrame.cache(),将表用一种柱状格式( an in­memory columnar format)缓存至内存中。然后Spark SQL在执行查询任务时,只需扫描必需的列,从而以减少扫描数据量、提高性能。通过缓存数据,Spark SQL还可以自动调节压缩,从而达到最小化内存使用率和降低GC压力的目的。调用sqlContext.uncacheTable("tableName")可将缓存的数据移出内存。

    可通过两种配置方式开启缓存数据功能:

  • 使用SQLContext的setConf方法
  • 执行SQL命令 SET key=value
  • 技术分享

     

    4.2 调优参数(Other Configuration Options)

    可以通过配置下表中的参数调节Spark SQL的性能。在后续的Spark版本中将逐渐增强自动调优功能,下表中的参数在后续的版本中或许将不再需要配置。

    技术分享

     

    5 分布式SQL引擎

    使用Spark SQL的JDBC/ODBC或者CLI,可以将Spark SQL作为一个分布式查询引擎。终端用户或应用不需要编写额外的代码,可以直接使用Spark SQL执行SQL查询。

     

    5.1 运行Thrift JDBC/ODBC服务

    这里运行的Thrift JDBC/ODBC服务与Hive 1.2.1中的HiveServer2一致。可以在Spark目录下执行如下命令来启动JDBC/ODBC服务:

    ./sbin/start-thriftserver.sh

    这个命令接收所有 bin/spark-submit 命令行参数,添加一个 --hiveconf 参数来指定Hive的属性。详细的参数说明请执行命令 ./sbin/start-thriftserver.sh --help 。
    服务默认监听端口为localhost:10000。有两种方式修改默认监听端口:

  • 修改环境变量:
  • export HIVE_SERVER2_THRIFT_PORT=<listening-port>
    export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
    ./sbin/start-thriftserver.sh --master <master-uri> ...
  • 修改系统属性
  • ./sbin/start-th 
    
    
    
    
                                            

    热心网友 时间:2022-04-08 20:04

    摘要3 Spark的四大优点快:与Hadoop的MapRece相比,Spark基于内存的运算要快100倍以上;而基于磁盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效地处理数据流。容易使用:Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同应用。而且Spark支持交互式的Python和Scala的Shell,这意味着可以非常方便的在这些Shell中使用Spark集群来验证解决问题的方法,而不是像以前一样,需要打包、上传集群、验证等。这对于原型开发非常重要。通用性:Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(通用Spark SQL)、实时流处理(通过Spark Streaming)、机器学习(通过Spark MLlib)和图计算(通过Spark GraphX)。 这些不同类型的处理都可以在同一应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台处理问题,减少开发和维护的人力成本和部署平台的物理成本。当然还有,作为统一的解决方案,Spark并没有以牺牲性能为代价。相反,在性能方面Spark具有巨大优势。可融合性:Spark非常方便的与其他开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassanda等。这对于已部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark强大的处理能力。Spark也可以不依赖第三方的资源管理器和调度器,它实现了Standalone作为其内置资源管理器和调度框架,这样进一步降低了Spark的使用门槛,使得所有人可以非常容易地部署和使用Spark。此外Spark还提供了在EC2上部署Standalone的Spark集群的工具。咨询记录 · 回答于2021-12-18spark打包翻译的优点3 Spark的四大优点快:与Hadoop的MapRece相比,Spark基于内存的运算要快100倍以上;而基于磁盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效地处理数据流。容易使用:Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同应用。而且Spark支持交互式的Python和Scala的Shell,这意味着可以非常方便的在这些Shell中使用Spark集群来验证解决问题的方法,而不是像以前一样,需要打包、上传集群、验证等。这对于原型开发非常重要。通用性:Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(通用Spark SQL)、实时流处理(通过Spark Streaming)、机器学习(通过Spark MLlib)和图计算(通过Spark GraphX)。 这些不同类型的处理都可以在同一应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台处理问题,减少开发和维护的人力成本和部署平台的物理成本。当然还有,作为统一的解决方案,Spark并没有以牺牲性能为代价。相反,在性能方面Spark具有巨大优势。可融合性:Spark非常方便的与其他开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassanda等。这对于已部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark强大的处理能力。Spark也可以不依赖第三方的资源管理器和调度器,它实现了Standalone作为其内置资源管理器和调度框架,这样进一步降低了Spark的使用门槛,使得所有人可以非常容易地部署和使用Spark。此外Spark还提供了在EC2上部署Standalone的Spark集群的工具。
    声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
    长沙到西昌。坐火车先从长沙到成都、成都东,再到西昌,哪个方便一些 S先生与P先生谜题的题面 为什么首都设在襄阳 改姓可以不随父母性吗 韩艺瑟怎么改姓? 纸、墨、笔、砚是中国传统的文房四宝,墨的使用最早在 [ ] A.商代后期... 想问下创维光伏E企赢模式有哪些优势,到底值不值得投资啊?有没有合作... 太平洋太享e保百万医疗值得入手吗?每年花多少钱? 爱e满分适合哪些人买?注意哪些问题? 太平洋太享e保百万医疗适合哪些人买?价格多少? 哪里需要健康管理师? 睡觉前锻炼身体有益吗 电信宽带比移动宽带慢的原因? 睡觉前,锻炼身体,好吗? 最近,家里电信 宽带网速感觉变慢了很多,是因为套餐的的原因吗? 手游直播投屏买啥台式电脑 手游直播为什么要投屏电脑呢 防疫三看两查指什么 新冠疫情三防融合三防是指 疫情防控三验一带是什么 新冠疫情防控三防是指哪三防 新冠防控五同时是指什么? 林俊杰不潮不用花钱的mv那俩女的是谁 疫情防控3+1指的是什么 《不潮不用花钱MV》里的那两个女生是谁啊? 手机是NOKIA N85 想下林俊杰不潮不用花钱的MV 林俊杰新歌《不潮不用花钱》的MV中的两个女生是谁 林俊杰 不潮不用花钱mv下载 哪里能下载中国好声音2018 里面的 陈佳杏 不潮不用花钱 不潮不用花钱全部的舞蹈教学那里有? 哪里有需要健康管理师挂靠的? 线上赚钱网站可靠吗?你们有什么靠谱的推荐吗? 网上赚钱靠谱吗?会不会是骗人的? 现在网上那么多挣钱的方法,为什么大家都以为是骗人的,不敢尝试呢? 现在的健康管理师证需要认证和入库吗? 请问怎么把粘在白墙上的挂钩弄下来? 你们觉得qq音乐dts音效好用吗 QQ音乐的DTS和Super Sound差别大吗?杜比音效还要冲钻有这个必要吗? 怎样更换一加的锁屏壁纸,我是低能儿 一加锁屏壁纸怎么换 this is 是不是有连读,英标能帮我写出来吗 is this怎么连读? this is 怎么读 this is a car 怎么连读?this is a三个单词需要连起来吗? this is的缩写怎么读 怎么教小朋友this is的读法? is this怎么连读 this is, what&#39;s this resist them that&#39;s the怎么连读 this is 连读时哪个被省略掉了??? Thisismysister有几处连读