Blogging is a conversation, not a code.

A RAM disk is a proportion of the RAM that is treated as if being a disk storage. That is, it could host a file system through which applications can deal with files in the same way as dealing with files on other disks, using the same file API, however in a much more performant manner.

 

RAM is the fastest storage medium accessible by applications when compared against other storage mediums (especially HDD and SSD). However, as a very expensive storage medium, RAM is a scarce memory resource that has a relatively limited capacity (measured in gigabytes while the capacity of other storage mediums are measured in terabytes). Moreover, it’s a volatile memory that loses data after a system restart, shutdown or other power loss scenarios. It’s important to keep these limitations in mind while thinking of a way to leverage the outstanding performance of the RAM disk.

 
Factors
 
 
  • Scarcity limits the amount of data stored, the thing that could bring a solution idea down. However, limits could be pushed further by paying more money for more capacity as RAMs are now in hundreds of gigabytes.

  • The volatility of RAM could bring a solution idea up or down; RAM does not fit for persistent data however it fits for volatile temporary data.

  • RAM performance is always a win for the solution. It’s the reason to why we think of RAM disks.
 

In the big data applications domain, we list two possible generic use cases:

  • Storing temporary and intermediate discardable data

  • Storing persistent data backed by persistent storage medium
 

Storing temporary and intermediate discardable data:

In the big data applications domain, most of the temporary and intermediate data is volatile in nature; it is nonpermanent (by definition) and discardable such that the existence of the data is tied with the existence of the application. As long as the application is running, this data is being generated and is being kept such that its existence is mandatory for the application to finish processing. Once the application is done processing, it becomes not necessary to keep this data and hence it becomes discardable.

 

Moreover, the discardability of the temp data, in the majority of the big data applications, is extended much more beyond that; it is not only discardable after the termination of the application but also during the lifetime of the application as well. That is, an application can regenerate this data (as a whole or in parts) if for some reason the data is lost. In general, this is a design principle for big data frameworks.

 

In a mapreduce framework, shuffle files are intermediate files that are written to disk and transferred through network. They are volatile in the same way described above. However, these data are comparable in size to the input data. Hence, in almost all the situations, it’s as big as the big input data unless the processing framework is doing something more with the data; something like writing these files compressed or partitioned either across time (i.e. processing one partition at a time) or across several machines. Writing these files to a RAM disk would definitely increase the shuffle performance especially when multiple jobs with different workloads are running.

 
 RAM Disk VS App and OS Buffers
 
 

It worth mentioning that there are at least two levels of buffering when making disk IOs: the application level buffers and the OS level buffers. Making the appropriate configurations for both levels, it’s possible to reach the same level of performance like that of the RAM disk. However, it’s not always possible to make the appropriate configurations; the application buffers capacity differs from an application to another and from a workload to another while the OS buffers are affected by other IO operations from the same or other applications. So, having an adaptable dedicated buffer for the temp volatile data would most often assure the required performance level.

 

Storing persistent data backed by persistent storage medium:

There are situations when computing reusable data is very expensive in terms of the consumption of computing resources. Moreover, such expensive data could be used frequently afterwards (for example, by other analytical tasks and low-latency jobs). Hence, persisting this data becomes mandatory to save computation resources and to assure real time performance of its consumers.

 

In order to devise a scheme for persisting such data, a number of factors should be considered. For how long would that data be persisted? How frequently would that data be consumed? Would the data be shared across different applications? If the data is required for a relatively long period, a persistent storage medium is obligatory. However, if the data is being consumed frequently, a reasonably fast storage medium is prefered. In addition to that, sharing data efficiently across applications is not straightforward. If there is a storage medium with which we can satisfy the three aspects of persistency, usage frequency and sharing, it would be perfect for that case. 

 
 RAM Disk Backed by Persistent Disk
 
 

All these aspects could be satisfied by leveraging different storage types in a tiered storage model. At the top tier, the RAM disk resides while being backed at lower tiers by persistent storage. RAM disk would assure high performance while the persistent storage would assure persistency. If it happens that data in the RAM disk is lost, with a suitable failover plan, data could be restored from the persistence storage. Also, sharing data among applications becomes a matter of reading and writing files which simplifies the implementation of applications. 

 

A simple implementation for the tiered storage model, is to save data into two locations: a copy in the RAM disk and another copy in the persistent disk. Then for a consumer application, it could first try reading the data from the RAM disk. If it’s not found, the consumer application could read it from the persistent disk while at the same time rewriting it to the RAM disk for consequent usages.

 

It worth mentioning that Tachyon is an in-memory distributed file system that is backed by HDFS. Also, by the time of writing this article, HDFS is being developed in phases in order to support heterogeneous storage types (like RAM disk, regular disk, SSD …) and different data storage policies (like hot data, warm data, cold data …). Both features coupled allow the implementation of the tiered storage system we seek.

 

Read More:

Running Spark On YARN

March 29th 2015, 10:30 amCategory: Big Data 0 comments

When it comes to running different distributed applications besides Spark, running Spark in standalone mode (with the embedded cluster manager) is not a good choice for better cluster resources utilization. It would be better, in terms of resources scheduling and utilization, if there is a single cluster manager that has a global view of what is running and want to run on the cluster.

 

Without that single cluster manager, there are two main approaches for resources sharing and allocation:

 
  1. Availing all cluster resources to all types of applications in the same time. However, that would lead to a great unfairly managed contention on resources.

  2. Dividing the pool of resources into smaller pools; a pool for each application type. However, that would lead to inefficient utilization of resources as some applications might require more resources than allocated in the corresponding pool while in the same time less resources are sufficient for other applications. Hence, a dynamic way of resources allocation leads to better resources utilization.

 

There are different cluster managers that can do the job and overcome the issues highlighted above. Choosing one depends on the types of applications being run on the cluster as they all should speak the language of the manager. One of the cluster managers, that Spark applications can run on, is Apache YARN. The design of Apache YARN allows different YARN applications to coexist on the same cluster, so a Spark application can run at the same time as other types of applications (like Hadoop MapReduce jobs) which brings great benefits for manageability and cluster utilization.

 

In this post we will illustrate what are the benefits of running Spark on YARN, how to run Spark on YARN and we will mention important notes to take care of when running Spark on YARN.

 

First we will try to understand the architecture of both Spark and YARN.

Spark Architecture:

Spark Architecture consists of Driver Program , Executors and Cluster Manager.

 
 

Driver Program:The driver program is responsible for managing the job flow and scheduling tasks that will run on the executors.

Executors: Executors are processes that run computation and store data for a Spark application.

Cluster Manager:Cluster Manager is responsible for starting executor processes and where and when they will be run. Spark supports pluggable cluster manager, it supports (YARN, Mesos, and its own “standalone” cluster manager)

 

YARN Architecture:

YARN Architecture consists of Resource Manager, Node Manager, Application Master and Container.

 
 

Resource Manager: manages the use of resources across the cluster.

Node Manager: launches and monitors containers on cluster machines.

Application Master: manages the lifecycle of an application running on the cluster.

Container: It represents a collection of physical resources (CPU cores + memory) on a single node at a cluster. Those resources are allocated for the use of a worker slave.

 

Spark on YARN:

When running Spark on YARN each Spark executor runs as YARN container. Spark supports two modes for running on YARN, yarn-cluster mode and yarn-client mode.

 

YARN-Client Mode:

  • In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
  • yarn-client mode makes sense for interactive and debugging uses where you want to see your application’s output immediately (on the client process side).
 
 

YARN-Cluster Mode:

  • In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application.
  • yarn-cluster mode makes sense for production jobs.
 
 

Why Run on YARN?

Running Spark on YARN has some benefits:

  • YARN allows to dynamically share the cluster resources between different frameworks that run on YARN. For example, you can run a mapreduce job after that you can run a Spark job without any changes in YARN configurations.
  • You can use YARN schedulers for categorizing, isolating, and prioritizing workloads.
  • YARN is the only cluster manager for Spark that supports security. With YARN, Spark can use secure authentication between its processes.

How to Run on YARN

We used cloudera manager to install Spark and YARN. There wasn’t any special configuration to get Spark just run on YARN, we just changed Spark’s master address to yarn-client or yarn-cluster.

 

We want to mention some important issues that we have met during running Spark on YARN:

  • Spark copies the Spark assembly JAR file to HDFS each time you run spark-submit. You can avoid doing this copy each time by manually uploading the Spark assembly JAR file to your HDFS. Then, set the SPARK_JAR environment variable to this HDFS path
     
    hdfs dfs -mkdir -p /user/spark/share/lib
    hdfs dfs -put $SPARK_HOME/assembly/lib/spark-assembly_*.jar  \     
    /user/spark/share/lib/spark-assembly.jar
    export SPARK_JAR=hdfs://<nn>:<port>/user/spark/share/lib/spark-assembly.jar
    

  • Important configuration during submitting the job:
    --executor-cores NUM       Number of cores per executor (Default: 1)
    --num-executors NUM        Number of executors to launch (Default: 2)
    --executor-memory NUM      Amount of memory to use per executor process.
    --driver-memory NUM        Amount of memory to use for the driver process.
    

  • We noticed that YARN uses more memory than we set for each executor, after searching we discovered that YARN uses:
    • executor memory + spark.yarn.executor.memoryOverhead for the executor.
    • driver memory + spark.yarn.driver.memoryOverhead for the driver.
    • We found that this memory overhead is the amount of off-heap memory (in megabytes) to be allocated per executor or driver. This is the memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).

  • The local directories, used by Spark executors in saving map output files and RDDs that are stored on disk, will be the local directories configured for YARN (Hadoop YARN config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.

  • Sharing application files (e.g. jar) with executors:
    • In yarn-client mode and Spark Standalone mode a link to the jar at the client machine is created and all executors receive this link to download the jar.
    • In yarn-cluster mode, the jar is uploaded to hdfs before running the job and all executors download the jar from hdfs, so it takes some time at the beginning to upload the jar.

Comparison Between Spark on YARN and Standalone mode:

Spark On YARN Spark Standalone
yarn-cluster yarn-client
Driver runs in Application Master Client Client
Who requests resources Application Master Client
Who starts executor processes YARN NodeManager Spark Worker (Slave)
Support for Sparkshell No Yes Yes
Sharing jar with executors uploads jar to hdfs creates link for the jar on the client creates link for the jar on the client
Share cluster resources among different frameworks Yes No
 

Example:

Running SparkPi in Standalone Mode

spark-submit \
--master spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT \
--class org.apache.spark.examples.SparkPi \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10

Running SparkPi in YARN Client Mode

spark-submit \
--master yarn-client \
--class org.apache.spark.examples.SparkPi \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10

Running SparkPi in YARN Cluster Mode

spark-submit \
--master yarn-cluster \
--class org.apache.spark.examples.SparkPi \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10
 

References:


In this post , we will show the configuration needed by hadoop to start the namenode sucessfully. Usually we format the namenode before starting hadoop, but a common problem is that the namenode formatting files are written (by default configuration) into tmp directory that is deleted by the operating system every time it starts. So we show the steps to change this defaul behaviour 

Steps 

  1. in hdfs-site.xml , put the following property 
    <property>
        <name>dfs.name.dir</name>
        <value>/<hadoop installation path>/hadoop-1.2.1/name/data</value>
    </property>
    where this is the path to write namenode metadata , run
  2. ~$ stop-all.sh
  3. Change directory permission to give user full access (First digit is 7, u)
    ~$sudo chmod 750 /<hadoop installation path>/hadoop-1.2.1/name/data  
  4. format the name node
    ~$ hadoop namenode -format
    The output should be 
    15/03/25 12:27:06 INFO namenode.NameNode: STARTUP_MSG: 
    /************************************************************
    STARTUP_MSG: Starting NameNode
    STARTUP_MSG:   host = baddar-pc/127.0.1.1
    STARTUP_MSG:   args = [-format]
    STARTUP_MSG:   version = 1.2.1
    STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.2 -r 1503152; compiled by 'mattf' on Mon Jul 22 15:23:09 PDT 2013
    STARTUP_MSG:   java = 1.8.0_40
    ************************************************************/
    Re-format filesystem in /home/baddar/hadoop-1.2.1/name/data ? (Y or N) Y
    15/03/25 12:27:11 INFO util.GSet: Computing capacity for map BlocksMap
    15/03/25 12:27:11 INFO util.GSet: VM type       = 64-bit
    15/03/25 12:27:11 INFO util.GSet: 2.0% max memory = 932184064
    15/03/25 12:27:11 INFO util.GSet: capacity      = 2^21 = 2097152 entries
    15/03/25 12:27:11 INFO util.GSet: recommended=2097152, actual=2097152
    15/03/25 12:27:11 INFO namenode.FSNamesystem: fsOwner=baddar
    15/03/25 12:27:11 INFO namenode.FSNamesystem: supergroup=supergroup
    15/03/25 12:27:11 INFO namenode.FSNamesystem: isPermissionEnabled=true
    15/03/25 12:27:11 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
    15/03/25 12:27:11 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
    15/03/25 12:27:11 INFO namenode.FSEditLog: dfs.namenode.edits.toleration.length = 0
    15/03/25 12:27:11 INFO namenode.NameNode: Caching file names occuring more than 10 times 
    15/03/25 12:27:11 INFO common.Storage: Image file /home/baddar/hadoop-1.2.1/name/data/current/fsimage of size 112 bytes saved in 0 seconds.
    15/03/25 12:27:11 INFO namenode.FSEditLog: closing edit log: position=4, editlog=/home/baddar/hadoop-1.2.1/name/data/current/edits
    15/03/25 12:27:11 INFO namenode.FSEditLog: close success: truncate to 4, editlog=/home/baddar/hadoop-1.2.1/name/data/current/edits
    15/03/25 12:27:11 INFO common.Storage: Storage directory /home/baddar/hadoop-1.2.1/name/data has been successfully formatted.
    15/03/25 12:27:11 INFO namenode.NameNode: SHUTDOWN_MSG: 
    /************************************************************
    SHUTDOWN_MSG: Shutting down NameNode at baddar-pc/127.0.1.1
    ************************************************************/

    Note that name metadata are written to the specified path
  5. make sure namenode metadata is written to the path (list all recursive)
    $ ls -aR /home/baddar/hadoop-1.2.1/name/data/
    /home/baddar/hadoop-1.2.1/name/data/:
    .  ..  current  image  in_use.lock  previous.checkpoint

    /home/baddar/hadoop-1.2.1/name/data/current:
    .  ..  edits  fsimage  fstime  VERSION

    /home/baddar/hadoop-1.2.1/name/data/image:
    .  ..  fsimage

    /home/baddar/hadoop-1.2.1/name/data/previous.checkpoint:
    .  ..  edits  fsimage  fstime  VERSION
  6. start all hadoop daemons 
    ~$ start-all.sh 
    the output should be 
    starting namenode, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-namenode-baddar-pc.out
    localhost: starting datanode, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-datanode-baddar-pc.out
    localhost: starting secondarynamenode, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-secondarynamenode-baddar-pc.out
    starting jobtracker, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-jobtracker-baddar-pc.out
    localhost: starting tasktracker, logging to /home/baddar/hadoop-1.2.1/libexec/../logs/hadoop-baddar-tasktracker-baddar-pc.out
  7. make sure that all daemons are started 
    ~$ jps

    23678 TaskTracker
    23060 NameNode
    23406 SecondaryNameNode
    23978 Jps
    23504 JobTracker

Testing application over android environment is costly because more devices  with diffrent specs have to be tested with,
so it is costly and time consuming so indeed the best solution is to test over default Android emulator, but the first question in our minds is what we should do about emulator poor performance?

Now Genymotion is here, it is a relativly fast android emulator, any app can be tested on all major devices available on the market. It is so easy to install and be fully integrated with Android Studio and Eclipse following the instructions here.

Genymotion features:
  • Easily download and run pre-configured virtual images covering most of android versions and diffrent screen sizes.
  • Networking: Ethernet (emulates WiFi connection).
  • GPS (with configurable coordinates).
  • Battery (with configurable battery levels).
  • Display: OpenGL hardware acceleration, multiscreen, full screen display.
  • Genymotion shell which allows you to interact with your VM using a command line ADB support Eclipse and Android Studio plugins Supports Linux, Windows and Mac. "Drag&Drop" APK installs "Drag&Drop" Zip support for system updates/patches
What about google play services?
  • Download the following ARM Translation Installer.
  • Download GApps according to your version 4.3, 4.2, 4.1.
  • Next Open your Genymotion VM and and drag&drop the Genymotion-ARM-Translation.zip onto it, a message says "File transfer in progress" should appear now.
  • Reboot Genymotion VM.
  • Drag&Drop the GApps-signed.zip onto it.
  • Reboot then open google play store.
  • Once in the Store go to the 'My Apps' menu and let everything update becuase this will fixe a lot of issues.
  • Search for 'Netflix' and 'Google Drive', if both apps show up in the results and you're able to Download/Install them, then congrats you now have ARM support and Google Play fully setup!

BADR, in partnership with QCRI, has developed and published TweetMogaz, a system that allows Arab users to get the maximum information from the Arabic content on Twitter, on the spot.

Basically, TweetMogaz consumes streams of Arabic tweets from Twitter, classifies them into relevant topics, then present them to the users in a much more intelligent way.

By intelligence we mean that TweetMogaz can understand tweets topics’ context, group tweets based on that, and present these groups (topics) to the user for a better user experience.
TweetMogaz also is the only Arabic events detector. It’s constantly searching the Arabic content for hot, trending tweets, gathers tweets that relate and occur in a certain timeframe, then present the user a solid, homogenous story.

To achieve that feat, a thorough research has been done (and is continuously in improvment) to get the best out of the Arabic content on Twitter. The research areas extend to: Information Retrieval, Natural Language Processing, Machine Learning, Distributed Systems and Big Data.

The first publication out of TweetMogaz is a demo paper: TweetMogaz v2: Identifying News Stories in Social Media, by Eslam Elsawy (BADR), Moamen Mokhtar (BADR) and Walid Magdy (QCRI), it's published in CIKM 2014.