1) Install R
2) Install RStudio Desktop version
a) RStudio Desktop 0.98.1103. Compatible with older Linux systems with glibc < 2.14 (such as RHEL/CentOS 6)
rstudio-0.98.1103-1.x86_64
3) Login to node where #1 and #2 are installed using command ssh "-X " user@<hostname> to avoid any authorization exception and login to rstudio terminal
4) Install sparklyr package from CRAN
5) You should also install a local version of Spark for development purposes:
6) Connect to Spark using yarn mode via spark_connect function
7) We can now use all of the available dplyr verbs against the tables within the cluster. We’ll start by copying some datasets from R into the Spark cluster (note that you may need to install the nycflights13 and Lahman packages in order to execute this code)
Program Execution Snippet :
8) Now check the Resource Manager page to validate if submitted yarn job has been completed with FinalStatus "SUCCEEDED"
https://maprdocs.mapr.com/52/Spark/IntegrateSpark_R.html
2) Install RStudio Desktop version
https://support.rstudio.com/hc/en-us/articles/206569407-Older-Versions-of-RStudio-DesktopNote :
a) RStudio Desktop 0.98.1103. Compatible with older Linux systems with glibc < 2.14 (such as RHEL/CentOS 6)
rstudio-0.98.1103-1.x86_64
3) Login to node where #1 and #2 are installed using command ssh "-X " user@<hostname> to avoid any authorization exception and login to rstudio terminal
ssh -X mapr@<hostname>
/usr/bin/rstudio
4) Install sparklyr package from CRAN
install.packages("sparklyr")
5) You should also install a local version of Spark for development purposes:
library(sparklyr) spark_install(version = "2.1.0")
6) Connect to Spark using yarn mode via spark_connect function
library(sparklyr) sc <- spark_connect(master = "yarn")
7) We can now use all of the available dplyr verbs against the tables within the cluster. We’ll start by copying some datasets from R into the Spark cluster (note that you may need to install the nycflights13 and Lahman packages in order to execute this code)
install.packages(c("nycflights13", "Lahman"))
library(dplyr) iris_tbl <- copy_to(sc, iris) flights_tbl <- copy_to(sc, nycflights13::flights, "flights") batting_tbl <- copy_to(sc, Lahman::Batting, "batting") src_tbls(sc)
Program Execution Snippet :
R version 3.3.3 (2017-03-06) -- "Another Canoe"
Copyright (C) 2017 The R Foundation for Statistical Computing
Platform: x86_64-redhat-linux-gnu (64-bit)
R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.
Natural language support but running in an English locale
R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.
Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.
[Workspace loaded from ~/.RData]
> Sys.setenv(SPARK_HOME = "/opt/mapr/spark/spark-2.1.0")
Warning message:
R graphics engine version 11 is not supported by this version of RStudio. The Plots tab will be disabled until a newer version of RStudio is installed.
> library(sparklyr)
> sc <- spark_connect(master = "yarn")
> library(dplyr)
Attaching package: ‘dplyr’
The following objects are masked from ‘package:stats’:
filter, lag
The following objects are masked from ‘package:base’:
intersect, setdiff, setequal, union
> iris_tbl <- copy_to(sc, iris)
> flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 9, sn1, executor 1): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of timestamp
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, year), IntegerType) AS year#151
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, year), IntegerType)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 0
:- null
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, year), IntegerType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, year)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, month), IntegerType) AS month#152
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, month), IntegerType)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 1
:- null
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, month), IntegerType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, month)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, day), IntegerType) AS day#153
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, day), IntegerType)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 2
:- null
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, day), IntegerType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, day)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, dep_time), IntegerType) AS dep_time#154
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, dep_time), IntegerType)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 3
:- null
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, dep_time), IntegerType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, dep_time)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, sched_dep_time), IntegerType) AS sched_dep_time#155
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, sched_dep_time), IntegerType)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 4
:- null
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, sched_dep_time), IntegerType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 4, sched_dep_time)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, dep_delay), DoubleType) AS dep_delay#156
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, dep_delay), DoubleType)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 5
:- null
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, dep_delay), DoubleType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 5, dep_delay)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 6, arr_time), IntegerType) AS arr_time#157
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 6, arr_time), IntegerType)
:- assertnotnull(
> batting_tbl <- copy_to(sc, Lahman::Batting, "batting")
> src_tbls(sc)
[1] "batting" "emp" "emp1" "emp2" "flights" "hbase_table_1" "iris"
[8] "k1" "mapr_table_1" "mapr_table_2" "mapr_table_3" "mapr_table_4" "mapr_table_5" "mapr_table_6"
[15] "maprdb" "names" "orders" "pokes3" "sample1" "src" "std"
[22] "t1" "t10" "t2" "web_logs"
> quit()
8) Now check the Resource Manager page to validate if submitted yarn job has been completed with FinalStatus "SUCCEEDED"
No comments:
Post a Comment