SparkR from R interactive shell
SparkR is an R package that provides a light-weight frontend to use MapR Spark from R. In MapR-Spark 2.1.0, SparkR provides a distributed data frame implementation that supports operations like selection, filtering, aggregation etc. The entry point into SparkR is the SparkSession which connects your R program to a MapR/Spark cluster.We can create SparkSession using sparkR.session and pass in options such as the application name, any spark packages depended on etc.
Here are few examples which would connect R program to MapR cluster from R shell.
Set the SPARK_HOME , load the R packages , create sparkR session with passing the required arguments and execute the program.
Usecase 1) Local Data Frames - Convert a local R data frame into a SparkDataFrame. The following creates a SparkDataFrame based using the faithful dataset from R.
Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
# results is now a SparkDataFrame
df <- as.DataFrame(faithful)
head(df)
Usecase 2) From Data Sources - The method for creating SparkDataFrames from data sources is read.df. This method takes in the path for the file to load and the type of data source, and the currently active SparkSession will be used automatically which supports reading JSON,CSV and Parquet.
Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0")
# results is now a SparkDataFrame
people <- read.df("file:///opt/mapr/spark/spark-2.1.0/examples/src/main/resources/people.json", "json")
head(people)
Usecase 3) From Data Sources in yarn mode
Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "yarn", sparkConfig = list(spark.driver.memory = "2g"))
sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0")
# results is now a SparkDataFrame
people <- read.df("file:///opt/mapr/spark/spark-2.1.0/examples/src/main/resources/people.json", "json")
head(people)
Usecase 4) Create SparkDataFrames from Hive tables - In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (enableHiveSupport = TRUE).
Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session()
# Queries can be expressed in HiveQL.
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH '/opt/mapr/spark/spark-2.1.0/examples/src/main/resources/kv1.txt' INTO TABLE src")
recordCount <- sql("SELECT count(*) from src")
# results is now a SparkDataFrame
head(recordCount)
Prerequisites :
1) yum install R
2) yum install mapr-spark-2.1.0.201703271134-1.noarch
3) yum install mapr-spark-master-2.1.0.201703271134-1.noarch
Program Execution Snippet for R Program from R shell
[mapr@sn2 bin]$ R
R version 3.3.3 (2017-03-06) -- "Another Canoe"
Copyright (C) 2017 The R Foundation for Statistical Computing
Platform: x86_64-redhat-linux-gnu (64-bit)
R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.
Natural language support but running in an English locale
R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.
Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.
> Sys.setenv(SPARK_HOME="/opt/mapr/spark/spark-2.1.0")
> library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
Attaching package: ‘SparkR’
The following objects are masked from ‘package:stats’:
cov, filter, lag, na.omit, predict, sd, var, window
The following objects are masked from ‘package:base’:
as.data.frame, colnames, colnames<-, drop, endsWith, intersect,
rank, rbind, sample, startsWith, subset, summary, transform, union
> sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
Spark package found in SPARK_HOME: /opt/mapr/spark/spark-2.1.0
Launching java with spark-submit command /opt/mapr/spark/spark-2.1.0/bin/spark-submit --driver-memory "2g" sparkr-shell /tmp/RtmpzKbIS6/backend_port310323a10d39
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Java ref type org.apache.spark.sql.SparkSession id 1
> df <- as.DataFrame(faithful)
> head(df)
eruptions waiting
1 3.600 79
2 1.800 54
3 3.333 74
4 2.283 62
5 4.533 85
6 2.883 55
No comments:
Post a Comment