Labels

Thursday, January 14, 2021

GDB cheatsheet


 #strings <core_file> | grep -i hs_err_pid  /* please take this file for the java core file */ 

​#strings <core_file> | grep "java.command" 

#su - <userid_which_genrate_core_file> 

#jstack <java_binary_path> <core_file_path> 


gdb -c <core_file_path> <java_binary_path> 

(gdb) set logging on 

(gdb) set pagination off

(gdb) bt 

(gdb) thread apply all bt

(gdb) quit

How to commit changes to docker mapr PACC

Step 1) docker pull maprtech/pacc:6.2.0_7.0.0_centos8

Step 2)

[root@m2-maprts-vm72-173 ~]# docker images

REPOSITORY                TAG                   IMAGE ID            CREATED             SIZE

clientinstall             latest                80e6aa1631ca        4 hours ago         2.35 GB

docker.io/maprtech/pacc   6.2.0_7.0.0_centos8   b8c66e6edce7        5 weeks ago         1.29 GB

docker.io/centos          centos7               7e6257c9f8d8        2 months ago        203 MB


Step 3) docker run -it -e MAPR_CLUSTER=SixOne  -e MAPR_CLDB_HOSTS=pn1 -e MAPR_CONTAINER_USER=root b8c66e6edce7

Step 4) Copy the mapr packges to docker container

docker cp mapr-* 03d523786b1b:/root/

Step 5) Install the copied packages


Step 6) Save the changes made from #5 .

docker commit <containerId> <NewImageName>

e.g) docker commit 03d523786b1b clientinstall


Step 7) Login to new docker image from #6

docker run -it -e MAPR_CLUSTER=SixOne  -e MAPR_CLDB_HOSTS=pn1 -e MAPR_CONTAINER_USER=root clientinstall

Tuesday, August 11, 2020

How to produce and consume messages using Kafka Rest Proxy

The Kafka REST Proxy provides a RESTful interface for Apache Kafka clusters to consume and produce messages and to perform administrative operations. It allows you to Consume messages from topics or concrete topic partitions,Produce messages to topics or partitions.


1) Produce a message using JSON with the value '{ "foo": "bar" }' to the topic test

Command: curl -u mapr:mapr -k -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"foo":"bar"}}]}' "https://localhost:8082/topics/test1"

Output :
  {
   "offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null
  }


2) Create a consumer for JSON data, starting at the beginning of the topic's log and subscribe to a topic. Then consume some data using the base URL in the first response.

Command: curl -u mapr:mapr -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' https://localhost:8082/consumers/my_json_consumer

Output :
 {
  "instance_id":"my_consumer_instance",
  "base_uri":"http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance"
 }
 
 
3) Subscribe to topic  

Command: curl -u mapr:mapr -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["test1"]}' https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription

Output : No content in response


4) List the topics subscribed

Command: curl -u mapr:mapr -k -X GET -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["test1"]}' https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription

Output : {"topics":["/tmp/kafka:test"]}


5) Consume the records for the topic

Command: curl -u mapr:mapr -k -X GET -H "Accept: application/vnd.kafka.json.v2+json" https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

Output :
  [
   {"key":null,"value":{"foo":"bar"},"partition":0,"offset":0,"topic":"test1"}
  ]


5) Finally, close the consumer with a DELETE to make it leave the group and clean up its resources.

Command: curl -u mapr:mapr -k -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance

Output : No content in response


Additional Configurations :

1) By default consumer.instance.timeout.ms is amount of idle time (in milliseconds) before a consumer instance is automatically destroyed. Type: int. Default: 300000 (5 minutes)

Monday, February 25, 2019

File encoding option for spark YARN container

1) Configuration to pass file ecoding to YARN containers,

--conf 'spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8'

2) File encoding option to read the file using spark local mode    
option("encoding", "UTF-8") .csv(inputPath)

Thursday, December 27, 2018

Spark Structured Streaming Examples


 Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

Read Stream :
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("group.id", "mapr").option("subscribe", "/tmp/kafka:test")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", false)
.option("maxOffsetsPerTrigger", 1000).load()

Write Stream :
val query = df.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("checkpointLocation", "/tmp/checkpoint")
.outputMode("append").format("console").start()

Note : Not required to provide any values in kafka.bootstrap.servers


Example to read and write to MapR Streams:

val topicsSrc = "/tmp/kafka:test"
val topicsDst = "/tmp/kafka1:test"
val checkPointLocation = "/tmp/checkpoint"

//Read from stream
val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", topicsSrc).option("startingOffsets", "earliest").load()

//Write toStream
val query1 = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic",topicsDst).option("checkpointLocation", checkPointLocation).start()



Tuesday, December 4, 2018

Driver jars to be loaded first

Step 1)   Add the following lines to spark-submit to check which jars are being loaded.
 --conf "spark.driver.extraJavaOptions=-verbose:class"

Step 2) Then we can pass the path to jars that should be picked up first by. We can validate this using #1
  --conf spark.driver.extraClassPath=<Pass the jar to be picked up by driver first>

Tuesday, November 13, 2018

How to find memory leak in Spark driver process (MapR Only)

1) Add this property to the spark job
--conf "spark.yarn.appMasterEnv.LD_PRELOAD=/opt/mapr/lib/libmemtracker.so /usr/lib64/libstdc++.so.6"

2) Make sure /opt/mapr/lib/libmemtracker.so & /usr/lib64/libstdc++.so.6 is avaliable on all the nodes

3) To verify the library are being loaded

[mapr@pn1 jitendra]$ jps -ml | grep ApplicationMaster
29887 org.apache.spark.deploy.yarn.ApplicationMaster --class streaming.SubscribePattern --jar file:/home/mapr/YuCodes3X/target/YuCodes3X-0.0.1-SNAPSHOT.jar --arg /tmp/kafka2 --arg mapr2 --arg earliest --properties-file /tmp/hadoop-mapr/nm-local-dir/usercache/mapr/appcache/application_1541789620039_0057/container_e26_1541789620039_0057_01_000001/__spark_conf__/__spark_conf__.properties

pmap -x 29887 | grep -i libmemtracker

4) Collect the gcore when you see memory usage of spark driver(AM) being high
   e.g)  Collect during 6G,8G,9G considering driver memory is 10G.

   gcore 29887


[mapr@pn1 ~]$ file core.29887
core.29887: ELF 64-bit LSB core file x86-64, version 1 (SYSV), SVR4-style, from '/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-3.b13.el7_5.x86_64/jre/bin/java'
[mapr@pn1 ~]$ gdb /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-3.b13.el7_5.x86_64/jre/bin/java core.29887
GNU gdb (GDB) 7.8.2
Copyright (C) 2014 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-unknown-linux-gnu".
Type "show configuration" for configuration details.
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>.
Find the GDB manual and other documentation resources online at:
<http://www.gnu.org/software/gdb/documentation/>.
For help, type "help".
Type "apropos word" to search for commands related to "word"...
Reading symbols from /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-3.b13.el7_5.x86_64/jre/bin/java...
warning: Cannot parse .gnu_debugdata section; LZMA support was disabled at compile time
(no debugging symbols found)...done.
[New LWP 4583]
[New LWP 29887]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".

warning: Cannot parse .gnu_debugdata section; LZMA support was disabled at compile time

warning: Cannot parse .gnu_debugdata section; LZMA support was disabled at compile time
Core was generated by `/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-3.b13.el7_5.x86_64/jre/bin/java'.
#0  0x00007f7d6d487995 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
(gdb) info shared lib
From                To                  Syms Read   Shared Object Library
0x00007f7d6d9a0fe0  0x00007f7d6d9a17ae  Yes         /opt/mapr/lib/libmemtracker.so
0x00007f7d6d6f3510  0x00007f7d6d75a5ba  Yes (*)     /usr/lib64/libstdc++.so.6
0x00007f7d6d481900  0x00007f7d6d48ce51  Yes (*)     /lib64/libpthread.so.0

Ref : using-ld-preload-with-apache-spark-or-yarn