Monday, April 30, 2018

HDFS Balancer and DataNode Space Usage Considerations

Symptoms:
  • It may take days or weeks to re-distribute data among DataNodes.
  • Newly added nodes use less space compared to existing DataNodes.
  • Some DataNodes use more space than other DataNodes in a cluster

Cause:

  • Uneven distribution of space usage depending upon how data is written to the cluster, and/or if new DataNodes were added to the cluster or not.
  • DataNodes limit the number of threads used for balancing so as to not eat up all the resources of the cluster/DataNode. By default the number of threads is 5. This setting was not configurable prior to Apache Hadoop 2.5.0 (prior to CDH 5.2.0).  HDFS-6595 allows this property to be modified to control the number of threads used for balancing from CDH 5.2.0 and later releases.
  • By, default dfs.datanode.balance.bandwidthPerSec is set to 10MiB. This will limit how much network bandwidth is dedicated for balancer.
  • HDFS Balancer exiting due to a known bug: HDFS-6621 Hadoop Balancer prematurely exits iterations in CDH 5.1.3 and earlier


1. How do I run the HDFS Balancer?
Only 1 HDFS Balancer is needed and can be run from any node in the cluster.   Commonly it is configured to run on a NameNode host.
Balancer will run until it is manually stopped or considers the cluster balanced.
Run from Cloudera Manager.   Navigate to Cloudera Manager > HDFS > Instances > Balancer and run Actions > Rebalance .
Run from the command line from any HDFS node using: sudo -u hdfs hdfs balancer -threshold 10


Note: It is NOT recommended to run HDFS Balancer frequently on a cluster with HBase.  Frequently running HDFS balancer will cause HBase to lose block locality and impact performance.

2. How do I schedule the HDFS Balancer to run periodically?
The balancer can not be scheduled with Cloudera Manager.  To run the balancer automatically it will need to be run with the command line using an OS tool such as cron. 



3. Why is one DataNode using more HDFS space than others?
There are multiple ways to bring files from outside of HDFS and distribute them across multiple nodes. Following are two examples and their impact:

A) Copying to HDFS from a DataNode host


Files are copied to a local filesystem or are located on a NFS mount on a DataNode host.  From the DataNode host the files are copied to HDFS with the hdfs dfs -copyFromLocal command.  Example:

hdfs dfs -copyFromLocal /mnt/myfile1 /files

When writing to HDFS the first replica of each block will be written to the local DataNode.  The subsequent replicas of each block will write to other randomly selected DataNodes. As a result, much of the data will end up on the same node that is performing the operations.  This is helpful for services that need data locality but may unbalance a DataNode during direct loading of data to HDFS.

B) Copying from a Node outside the HDFS Cluster (Edge Node / Gateway Node)


When writing to HDFS from an edge node, or any node with the HDFS client but not running a DataNode role, the NameNode directs the client to write to random DataNodes for each block replica providing more even distribution of the blocks across the cluster.



An edge node must have network access to all of the DataNodes.  If a firewall is blocking access to some of the DataNodes, writes will timeout affecting performance and data distribution.



Note: If you have inputs arriving as individual or small batches of events, you can also look at Apache Flume  to automatically collect them directly and reliably into your cluster.

4. Why are newly added nodes using less space than existing DataNodes?


When new DataNodes are added to a cluster the existing data is not automatically re-distributed between existing and newly-added DataNodes. They will only receive random writes of new block replicas to the cluster. To re-distribute data between existing and new DataNodes the HDFS balancer must be run.


5. After replacing failed disks, why does HDFS not balance data among all the disks within a DataNode?​
HDFS Balancer does not re-balance data among disks within a DataNode.  It only re-balances data among DataNodes. After running the HDFS Balancer, the percentage of total disk space used by your DataNodes will be balanced across cluster.  Within a DataNode there may be several disks where data is not evenly distributed.



By default, the DataNodes are configured to write to disks a round-robin fashion. If disks of different capacities are used, or if disks fail and are replaced, the round-robin writing algorithm continues to write to each disk at a time resulting in different percentage capacity used on each disk. 



To achieve writes that are evenly distribution in percentage of capacity on drives, change the choosing policy (dfs.datanode.fsdataset.volume.choosing.policy)to Available Space.  If using Cloudera Manager:

Navigate to HDFS > Configuration > DataNode
Change DataNode Volume Choosing Policy from Round Robin to Available Space
Click Save Changes
Restart the DataNodes
The disks will not immediately balance with the Available Space choosing policy, but will distribute writes over time.  Work is being done to re-balance disks within a DataNode via HDFS-1312.  At the time this article is written the work is not complete and there is no estimate on when it will be available.



6. How to determine HDFS balancer's re-balancing speed?
Login to the host running the balancer and go to directory and find the Balancer directory with highest number (#####) /var/run/cloudera-scm-agent/process/#####-hdfs-BALANCER/logs
The stdout.log displays the balancer's progress.
Run the following command to extract Balancer's statistics from stdout.log files:
 awk '/^[A-Z][a-zA-Z]/' stdout.log > balancer_progress.txt

This table is a sample representation of the output:
On each iteration (2nd column) the balancer determines how much data it plans to move (5th column) and decrements amount of data to be moved (4th column) as shown above.  As you can see that in this case, it took ~84 hours to transfer 9.71 TB of data, which is roughly 115.6 GB/hr (32 MB/sec). 
Depending on your cluster load, performance can be tuned as shown below.


7. How to tune HDFS Balancer performance?


Key Points To Remember:
HDFS Balancer role must be present. To verify navigate to Cloudera Manager > HDFS > Instances and look for "Balancer") go to must be run manually to re-distribute data among DataNodes. 
HDFS Balancer does NOT re-distribute data between disks within a DataNode.
Default threshold is 10%, which means that it will ONLY re-distribute data among DataNodes when the storage usage on each DataNode differs from the overall usage of the cluster (mean usage) by more than 10%.
While running HDFS balancer you can still use the cluster and doesn't have major impact on the performance of the cluster.


Navigate to Cloudera Manager > HDFS > Configuration and search for dfs.datanode.balance.bandwidthPerSec. This determines maximum balancing bandwidth.  The default is 10 MiB/sec. Cloudera recommend setting it to 100 MiB and increase as needed based on your network infrastucture.  If set too high the balancer may saturate your network.
Navigate to Cloudera Manager > HDFS-> Configuration, search for DataNode Advanced Configuration Snippet (Safety Valve) for hdfs-site.xml and add the following property:

dfs.datanode.balance.max.concurrent.moves
200
Also add search for and add this property to Balancer Advanced Configuration Snippet (Safety Valve) for hdfs-site.xml.
​This property controls the maximum concurrent block moves performed by a DataNode.

By default, dfs.datanode.balance.max.concurrent.moves is set to 5. DataNodes limit the number of threads used for balancing so as to not eat up all the resources of the cluster/DataNode.
Increase dfs.datanode.balance.max.concurrent.moves value to higher number like 200, temporarily.  Once the cluster is balanced, reduce this value to default.​
Note: For changes to take effect, perform Rolling Restart of HDFS and Deploy Client Configuration.  Once HDFS restarts, stop the balancer (if it was run from CLI) and re-run it.

How to move the Job History Server (JHS) from one host to another.

Moving the Job History Server to another node:

The following steps are the same for Cloudera Manager and non Cloudera Manager managed clusters, only the tool to perform the steps are different.
  1. Shut down and remove the Job History Server from the old node
  2. Add a new Job History Server to the new node
  3. Start the Job History Server on the new node
  4. Restart the Resource and Node Managers. In a Cloudera Manager managed cluster this will deploy and activate the updated configuration. Otherwise the updated mapred-site.xml must be distributed to all nodes before restarting.
The mapred-site.xml is also part of the client configuration and is thus present on all nodes in the cluster. The file should be updated with the new Job History Server details on all nodes.

Note: No data is stored locally on the JHS node
Note: The finished applications in the Resource Manager web UI will still reference the old JHS node, these links will not be updated.

Sunday, April 29, 2018

HBase cluster sizing issues.

  • Is it OK for the HBase Master and Hadoop NameNode (+JobTracker) to run on the same server?
    The NameNode needs memory. The HBase Master is normally not very busy. It just needs to be available when region servers check in, and for maintaining timely ZooKeeper heartbeats. As long as there is sufficient RAM on the combined NameNode + Master (+ JobTracker) such that the system never swaps, running both on the same server is OK.
    You can consider running multiple HBase masters to remove one Single-Point-Of-Failure from the deployment.  For a non-high-availability deployment it makes sense to run all on one server.  We would recommend running HBase masters with the Namenode and Secondary/Standby node, this will give you the necessary redundancy.
  • Is it OK for HBase RegionServer and Hadoop DataNode (+ TaskTracker) to run on the same server?
    Yes this is advised to ensure local data. Eventually, the data in HDFS which backs the region stores is brought local through background compaction. The MapReduce jobs that run against HBase after this happens access data locally as each split corresponds to a region and the task will be scheduled on the corresponding region server.
  • Is HBase RegionServer is a memory hungry process?
    Yes. The more RAM you can give to the region servers, the better for performance:
    • Read caching (block cache) to avoid needing to hit the file system to serve frequently accessed data
    • Write caching (MemStore) to ride over flushes and compactions without blocking clients
  • Do I need dedicated boxes for each ZooKeeper?
    It is advised to run the Zookeeper on dedicated hardware. If that is not an option, you can run Zookeeper with the Namenode, Job Tracker, and Standby node(Secondary Namenode). In a pinch you can co-locate ZooKeeper on DataNode/TaskTracker/RegionServer boxes, but it is not recommended. ZooKeeper does not take up a lot of resources on its own, but when starved for resources it can cause timeouts of Region Servers.  

    ZooKeeper is a 2N+1 fault tolerant system, so deploy 3 servers if you can stand to lose only one, or 5 if you want to be able to lose up to 2, and so on. There are diminishing returns after 7 or 9. Though this may seem like a lot of overhead just to run HBase, ZooKeeper provides value such as for providing synchronization primitives for your service or application, hosting dynamic configurations (and using watchers to get notice of changes), and managing presence and group membership.
  • What's the minimum cluster size?
For a non-high-availability system with local disk, we recommend three RegionServer-TaskTracker-DataNodes with additional servers for each HBase Master-NameNode-JobTracker and ZooKeeper for something minimally useful. Also remember to tune HDFS for such a small cluster: set the minimum replication to 1 or 2.
For a high-availability system, we recommend the same three RegionServer-TaskTracker-DataNodes with two additional servers for each HBase Master-NameNode-JobTracker and ZooKeeper.
  • What are good starting points for HBase Master and RegionServer JVM Heap sizes?
Small development clusters can start with heap sizes of 1GB for the Master and 4GB for RegionServers.

Production clusters should plan on increasing the minimum RegionServer heap size to 16GB as a starting point.  The HBase master process is not as memory intensive and will normally require less memory than RegionServers.

Accessing HBase using Spark Application

Accessing HBase using Spark Application:

Exact displayed issues can differ depending on the CDH version. General examples include:
  • ZooKeeper issues:
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
14/12/01 23:54:51 INFO ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
14/12/01 23:54:51 WARN ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused

  • HTrace class compatibility issue (CDH 5.4 and later only):

ERROR yarn.ApplicationMaster: User class threw exception: java.lang.reflect.InvocationTargetException
java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240)
...
Caused by: java.lang.ClassNotFoundException: org.apache.htrace.Trace

  • Kerberos related issue in Spark cluster mode:
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:126) 
... 4 more 
Caused by: java.lang.RuntimeException: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'. 
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$1.run(RpcClientImpl.java:686) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:415) 
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796) 
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.handleSaslConnectionFailure(RpcClientImpl.java:644) 
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:752) 
... 17 more 
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException

Cause
  • The client configuration that is stored in /etc/hbase/conf/ is not included in the path
  • Jars that have the dependent code are not on the class path
Fix:
These errors are related because the default setup of Spark does not contain any of the HBase classes or configuration. Spark does not have a dependency on HBase. Any classes or configuration that is pulled into the execution environment is the result of a secondary dependency. For instance Hive, which is a Spark dependency, pulls in some of the HBase classes because it is capable of querying HBase data. However, any Spark application needs to provide all dependencies and not rely on any dependencies being available.


  1. Add the /etc/hbase/conf/ directory and dependencies (jars or classes) to the path when the application is submitted. As described above the htrace dependency is required for CDH 5.4 and later. Other dependencies might be required and the exact addition to the class path should be tested in the environment while submitting an application:
  • Spark driver class path change using the command line:
--driver-class-path /etc/hbase/conf/:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar
  • Spark executor class path change using the command line:
--conf "spark.executor.extraClassPath=/etc/hbase/conf/:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar"
  • or by adding the setting the executor class path in the spark-defaults.conf file (see below)
  • Use the --files /etc/hbase/conf/ option to pass the configuration files to the cluster nodes.
  1. Add the Hbase jars and configuration to the executor class path using the Spark defaults configuration file:
  • From Cloudera Manager, navigate to Spark on YARN > Configuration
  • Type defaults in the search box
  • Select Gateway in the scope (this opens Spark Client Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-defaults.conf)
  • Add the entry:
spark.executor.extraClassPath=/etc/hbase/conf/:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar
  • Save the change and an icon appears to deploy client configuration (can take 30 seconds to show)
  1. If we are running a Spark application in cluster mode then we have to set the HADOOP_CONF_DIR variable. Setting HADOOP_CONF_DIR will add the HBase configurations to the Spark launcher classpath (!= driver classpath in cluster mode). This is needed for Spark to obtain HBase delegation tokens. Add the HADOOP_CONF_DIR environment variable using the Spark client spark-env.sh configuration file:
    • From Cloudera Manager, navigate to Spark on YARN > Configuration
    • Type spark-env.sh in the search box
    • Select Gateway in the scope (this opens Spark Client Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh )
    • Add the entry:
    export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf:/etc/hbase/conf
    
    • Save the change and an icon appears to deploy client configuration (can take 30 seconds to show)
  2. Deploy the client configuration as directed by the GUI
  3. Run the test by executing the following:
spark-submit --master yarn-cluster --driver-class-path /etc/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar \
--class org.apache.spark.examples.HBaseTest /opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples.jar test
NOTE 1: The example table name stored in HBase is called test.

NOTE 2: If you see this error message in the YARN container logs: 
Caused by: org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.security.AccessDeniedException): org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permissions for user 'username' (table=test, action=READ)
then you need to configure the proper hbase permissions for your user. For example:
# sudo -u hbase kinit -kt hbase.service.keytab hbase/hbase_fqdn_host@HADOOP.COM
# sudo -u hbase hbase shell
hbase(main):002:0> grant  'username', 'RWCA'
0 row(s) in 3.5870 seconds 
hbase(main):003:0> quit



Kafka - Single Disk Failure Causes Broker to Crash

Summary: A failure, either hardware or lack of free space, in a single disk in the log.dirs list will cause a Kafka broker to crash. The article describes the symptoms and outlines manual steps that can be implemented to move data between disks on a single Kafka broker to work around the issue.


Symptoms:
A Kafka broker in a cluster may crash due to a single disk failure. A disk failure includes I/O errors due to hardware failure or high disk space usage leaving no additional space for storage. 
In this scenario a Kafka broker may crash with the following error:
YYYY-MM-DD HH:MM:SS,MS FATAL kafka.server.ReplicaFetcherThread: [ReplicaFetcherThread-W-XYZ], Disk error while replicating data for [TOPIC,PARTITION]
kafka.common.KafkaStorageException: I/O exception in append to log 'TOPIC-PARTITION'
 at kafka.log.Log.append(Log.scala:320)
 at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:114)
 at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:139)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:123)
 at scala.Option.foreach(Option.scala:257)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:123)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:121)
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:121)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:121)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:121)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:255)
 at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:119)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:94)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.io.IOException: No space left on device
 at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
 at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
 at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
 at sun.nio.ch.IOUtil.write(IOUtil.java:65)
 at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
 at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:160)
 at kafka.log.FileMessageSet.append(FileMessageSet.scala:231)
 at kafka.log.LogSegment.append(LogSegment.scala:86)
 at kafka.log.Log.append(Log.scala:363)
 ... 19 more
The broker may also run into the following issue on startup:
YYYY-MM-DD HH:MM:SS,MS ERROR kafka.log.LogManager: There was an error in one of the threads during logs loading: java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code
YYYY-MM-DD HH:MM:SS,MS FATAL kafka.server.KafkaServer: Fatal error during KafkaServer startup. Prepare to shutdown
java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code
 at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
 at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:209)
 at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:186)
 at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
 at kafka.log.LogSegment.recover(LogSegment.scala:179)
 at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:190)
 at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:162)
 at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
 at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
 at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
 at kafka.log.Log.loadSegments(Log.scala:162)
 at kafka.log.Log.(Log.scala:92)
 at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
 at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)



Fix:

If one of the disks listed in log.dirs is running out of space then a user may manually move replica data to another disk within the same Kafka broker to free up space or balance partition data across disks.

Assume the following scenario where you have disks Disk1 and Disk2 for broker 1. 

On Disk1 you have the following topic and partitions:

TopicA partitions 1 and 2

On Disk2 you have the following topic and partitions:

TopicB partitions 1 and 2, and TopicC partition 1

Therefore, you will have the following directories and files under Disk1:

/TopicA-1
/TopicA-2
replication-offset-checkpoint
recovery-point-offset-checkpoint

On Disk2, you'll have the following directories and files:

/TopicB-1
/TopicB-2
/TopicC-1
replication-offset-checkpoint
recovery-point-offset-checkpoint

Both the replication-offset-checkpoint and recovery-point-offset-checkpoint files on Disk1 will have the following content:

0
2
TopicA 1 X
TopicA 2 Y

The files always contain a value of 0 on the first line, followed by an integer value that represents the number of partitions. Each line after that contains the topic name, partition ID followed by an integer value for the offset. Here we denote the offsets using X and Y as examples but the actual file will have an integer value.

The same files on Disk2 will have the following contents:

0
3
TopicB 1 Z
TopicB 2 M
TopicC 1 N

Let's say you want to now move both partitions 1 and 2 for TopicB from disk2 to disk1, then you will need to do the following after confirming that broker1 is in a stopped state:

1.) Move the data directories /TopicB-1 and /TopicB-2 from Disk2 to Disk1
2.) Make a backup of the replication-offset-checkpoint and recovery-point-offset-checkpoint files on Disk1 (target log directory) and Disk2 (source log directory).
3.) Modify both the replication-offset-checkpoint and recovery-point-offset-checkpoint files on Disk1 (target log directory) so that it looks like the following:

0
4
TopicA 1 X
TopicA 2 Y
TopicB 1 Z
TopicB 2 M

You will also need to edit the replication-offset-checkpoint and recovery-point-offset-checkpoint files on the source log directory (Disk2) as follows:

0
1
TopicC 1 N

Friday, April 27, 2018

Decommissioning a Datanode is Taking a Long Time or is Hanging | Replication



Cause
DataNode decommissioning is the act of preventing new data to be written to a DataNode while safely moving
the current data to another DataNode within the cluster.

This article contains solutions for common problems with DataNode decommissioning:

  Low bandwidth
  Long delays with decommissioning open files
  Hangs with decommissioning files that have replication factor above the number of remaining nodes in the cluster.

Below is a list of common decommissioning problems and their solutions.

Decommissioning takes a long time, while minimal bandwidth is being used.

  •   The bandwidth is limited by default in order to prevent decommissioning from overloading DataNodes.  
  •   Cloudera recommends changing the defaults to allow for a faster decommission.
  Note:  This will increase the memory consumption of the Datanodes.

Solution: Increase default bandwidth for decommissioning

  • This requires higher memory usage at the DataNodes due to tracking more parallel block copies.
  • Increasing the decommissioning bandwidth also requires changing the configuration and restarting HDFS.

If using Cloudera Manager:

  1. Navigate to: HDFS > Configuration (View/Edit).
  2. Using the left-sidebar, navigate to: NameNode > Advanced > "NameNode Configuration Safety Valve for hdfs-site.xml" and paste in both XML properties (from above).
  3. Using the left-sidebar, navigate to: DataNode > Advanced > "Java Heap Size of DataNode in Bytes" and make sure the value is set to at least 4 GB.
  4. Save the changes.
  5. Restart HDFS (NameNodes and DataNodes) to set the change into effect (A rolling restart may also be performed to reduce downtime).

If not using Cloudera Manager:

  1. Apply the following XML properties into the NameNodes' hdfs-site.xml and save the changes.
  2. In the DataNode's hadoop-env.sh file, ensure the HADOOP_DATANODE_OPTS or the HADOOP_HEAPSIZE env-var properties carry a minimum of 4 GB heap. 
    (If not, add "-Xmx4g" to the HADOOP_DATANODE_OPTS property (at the end) and save the change. This should be ensured for all DataNodes in the cluster.)
  3. Restart the HDFS cluster to set the change into effect (A rolling restart can be performed to reduce downtime).  
      dfs.namenode.replication.work.multiplier.per.iteration
      10
  dfs.namenode.replication.max-streams
  100

Open Files 

 Because the DataNode writes do not involve the NameNode, if there are blocks associated with open files located on a DataNode, they are not relocated until the file is closed.
 This commonly occurs with:
  • Clusters with HBase
  • Open Flume files
  • Long running tasks

 Solution: Locate and close any open files.

  1. Find the NameNode log directory
    cd /var/hadoop-hdfs/
  2. Verify that the logs provide the needed information.
    grep "Is current datanode" *NAME* | head
  3. The sixth column shows the block id, and the message should be relevant to DataNode decommissioning.
    grep "Is current datanode" *NAME* | awk '{print $6}' | sort -u > blocks.open
  4. Provide a list of open files, their blocks, and the locations of those blocks.
    hadoop fsck / -files -blocks -locations -openforwrite 2>&1 > openfiles.out
  5.  Check in openfiles.out for the blocks in blocks.open, also verifying the DataNode IP address is correct.
  6.  Now that the name of the open file(s) is known, perform the appropriate action to restart the process to close the file.
      For example, major compaction will close all files in a region for HBase.  

A block cannot be relocated due to there not being enough nodes to satisfy the block placement policy

  For example, for a 10 node cluster, if mapred.submit.replication is set to the default of 10 while attempting to decommission one
  DataNode, there will be difficulties relocating blocks that are associated with map/reduce jobs.
  
  This condition will lead to the similar errors in the NameNode logs: 
 
  org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault: Not able to place enough replicas, still in need of 3 to reach 3   

Solution: Find the files that have blocks with replications higher than the number of DataNodes remaining in the cluster.    

  Use the following steps to find the number of files with replication equal to or above your current cluster size:
  1. Provide a listing of open files, their blocks, the locations of those blocks.
  2. You can reuse the output from a previous fsck for this solution.
    hadoop fsck / -files -blocks -locations -openforwrite 2>&1 > openfiles.out
  3. The following command will give a list of how many files have a given replication factor:
    grep repl= openfiles.out | awk '{print $NF}' | sort | uniq -c
  4. Using the example of 10 nodes, and decommissioning one:
    egrep -B4 "repl=10" openfiles.out | grep -v '' | awk '/^\//{print $1}' 
  5. Examine the paths, and decide whether to reduce the replication factor of the files, or remove them from the cluster.

Shutting down the DataNode before decommissioning finishes 

  •  HDFS is designed to be resilient to node/device failures. If data replication recommendations are followed, shutting down one DataNode will not affect the data within the cluster.    
  •  Voluntarily removing a DataNode does provide a window of time when replication is lower than recommended, increasing the potential risk of data loss. 

Solution: Determine if any files are in risk of data loss before abruptly shutting down the DataNode.

  1. List the open files.
    hadoop fsck / -files -blocks -locations -openforwrite 2>&1 > openfiles.out
  2. Examine the paths and decide whether to remove them from the cluster.
    egrep -B4 "repl=1 " openfiles.out | grep -v '' | awk '/^\//{print $1}'
  3. Look at the listing of files to see if they have blocks located on the DataNodes being decommissioned.
  4. Make a decision whether those files are required.

Hive | Impala | External Table | Parquet Storage Format | Renaming Columns | All Rows show NULL




An existing Hive table with data stored in parquet format, and the columns in the table do not match the columns stored in the Parquet schema, if one or more of the columns in table are renamed, the parquet.column.index.access property must be set to true on the table.


After renaming the table columns, the values in the renamed columns are returned as NULL for all rows.


No exceptions are produced.

Set the property parquet.column.index.access=true on the table.

Example:

ALTER TABLE parquet_columnar_renamed SET TBLPROPERTIES ('parquet.column.index.access'='true');

Or alternatively in the Hive session:

set parquet.column.index.access=true; 

How to control the number of Reducers for a particular Hive query to optimize query performance

Hive employs a simple algorithm for determining the number of Reducers to use based on the total size of the input data set.  The number of Reducers can be also be set explicitly by the developer.

-- In order to change the average load for a reducer (in bytes)
-- CDH uses a default of 67108864 (64MB)
-- That is, if the total size of the data set is 1 GB then 16 reducers will be used
set hive.exec.reducers.bytes.per.reducer=;

-- In order to set a constant number of reducers
-- Typically set to a prime close to the number of available hosts
-- Default value is -1
-- By setting this property to -1, Hive will automatically figure out what should be the number of reducers
set mapreduce.job.reduces=;

-- Hive will use this as the maximum number of reducers when automatically determining the number of reducers
-- Default value is 1099
set hive.exec.reducers.max=;

How to control the number of Mappers for a particular Hive query to optimize query performance

Setting both mapreduce.input.fileinputformat.split.maxsize and mapreduce.input.fileinputformat.split.minsize to the same value in most cases controls the number of Mappers used when Hive is running a particular query.
Example:
For a text file with file size of 200000 bytes
  • The following configurations triggers two Mappers for the MapReduce job:
set mapreduce.input.fileinputformat.split.maxsize=100000;
set mapreduce.input.fileinputformat.split.minsize=100000;

  • The following configurations triggers four Mappers for the MapReduce job:
set mapreduce.input.fileinputformat.split.maxsize=50000;
set mapreduce.input.fileinputformat.split.minsize=50000;
By default, Hive assigns several small files, whose file size are smaller than mapreduce.input.fileinputformat.split.minsize, to a single Mapper to limit the number of Mappers initialized. Hive also considers the data locality of each file's HDFS blocks.  If there are a lot of small files stored across different HDFS DataNodes, Hive will not combine the files into a single Mapper because they are not stored on the same machine.
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; (Default)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; (Does not combine files)


Mappers

Setting both mapreduce.input.fileinputformat.split.maxsize and mapreduce.input.fileinputformat.split.minsize to the same value in most cases controls the number of Mappers used when Hive is running a particular query.
Example:
For a text file with file size of 200000 bytes
  • The following configurations triggers two Mappers for the MapReduce job:
set mapreduce.input.fileinputformat.split.maxsize=100000;
set mapreduce.input.fileinputformat.split.minsize=100000;

  • The following configurations triggers four Mappers for the MapReduce job:
set mapreduce.input.fileinputformat.split.maxsize=50000;
set mapreduce.input.fileinputformat.split.minsize=50000;
 
By default, Hive assigns several small files, whose file size are smaller than mapreduce.input.fileinputformat.split.minsize, to a single Mapper to limit the number of Mappers initialized. Hive also considers the data locality of each file's HDFS blocks.  If there are a lot of small files stored across different HDFS DataNodes, Hive will not combine the files into a single Mapper because they are not stored on the same machine.
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; (Default)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; (Does not combine files)

Reducers

Determining the number of Reducers often requires more input from the developer than does determining the number of Mappers.  Efficient allocation of Reducers will greatly depend on the Hive query and the input data set.  The size of the result set of a query against a large input data set will depend greatly on the predicates in the query.  If a query heavily filters the input data set with predicates, few Reducers may be required for processing.  Alternatively, if a different query against the same large input data set has fewer filtering predicates, many Reducers may be required for processing.

Hive employs a simple algorithm for determining the number of Reducers to use based on the total size of the input data set.  The number of Reducers can be also be set explicitly by the developer.
 
-- In order to change the average load for a reducer (in bytes)
-- CDH uses a default of 67108864 (64MB)
-- That is, if the total size of the data set is 1 GB then 16 reducers will be used
set hive.exec.reducers.bytes.per.reducer=;

-- In order to set a constant number of reducers
-- Typically set to a prime close to the number of available hosts
-- Default value is -1
-- By setting this property to -1, Hive will automatically figure out what should be the number of reducers
set mapreduce.job.reduces=;

-- Hive will use this as the maximum number of reducers when automatically determining the number of reducers
-- Default value is 1099
set hive.exec.reducers.max=;


Special Cases

It is sometimes the case that certain queries do not use Reducers.  INSERT statements, for example, may generate Mapper-only MapReduce jobs.  If it is required to use Reducers for performance reasons or the developer would like to control the number of output files generated, there are workarounds available.
 
-- Add an artificial LIMIT clause to force a Reducer phase
INSERT INTO  SELECT * FROM  LIMIT 99999999;

-- Use very large split sizes to force a single Mapper (entire data set is < 2GB)
set mapreduce.input.fileinputformat.split.maxsize=2147483648;
set mapreduce.input.fileinputformat.split.minsize=2147483648;
INSERT INTO  SELECT * FROM ;

-- When enabled, dynamic partitioning column will be globally sorted
-- This way we can keep only one record writer open for each partition value in the Reducer thereby reducing the memory pressure on Reducers
-- Introduces Reducer phase into an otherwise Mapper-only MapReduce job
set hive.optimize.sort.dynamic.partition=true;
INSERT INTO  ... SELECT * FROM ;




Friday, April 6, 2018

Data Structures - Sorts with Java code


Selection Sort

Selection sort goes through each item in the list repeatedly. In every pass, the smallest value in the remained list will be selected and placed in a sorted list. Same to bubble sort, selection sort also has O(n^2) time complexity.
Here’s the sample implementation in Java.
1.  void selectionSort(int[] arr){
2.   
3.    int length = arr.length;
4.    for (int k=0; k < length; k++){
5.   
6.      // get the current smallest 
7.      // element
8.      int min = arr[k];
9.      int minIndex = k;
10.    for (j = k+1; j < N; j++){
11.        if (A[j] < min) {
12.            min = arr[j];
13.            minIndex = j;
14.        }
15.    }
16.    // put the smallest element  to the sorted list
17.    A[minIndex] = A[k];
18.    A[k] = min;
19.  }
20.}
Here’s a step-by-step example of Selection Sort.

Bubble Sort
Bubble sort repeatedly steps through the list to be sorted, compares each pair of adjacent items and swaps them if they are in the wrong order. The pass through the list is repeated until no swaps are needed, which indicates that the list is sorted.
Although the algorithm is simple, it is too slow (O(n^2)) and impractical for most problems even compared to insertion sort. It can be practical if the input is usually in sort order but may occasionally have some out-of-order elements nearly in position.
Here’s the sample implementation in Java.
1.  void bubbleSort(int[] arr) {
2.   
3.    int n = arr.length;
4.   
5.    for(int i=0;i < n;i++){
6.      for(int j=1;j < (n-i);j++){
7.        if(arr[j-1]>arr[j]){
8.          //swap the elements
9.          int temp = arr[j-1];
10.        arr[j-1] = arr[j];
11.        arr[j] = temp;
12.      }
13.    }
14.  }      
15.}
Here’s a step-by-step example of bubble sort. Basically, each pass “bubbles” the largest value from the unsorted area to the sorted area:

Insertion Sort

Insertion sort picks elements from the unsorted list one by one. For each element, it will be inserted into the proper position of the sorted list. Same to bubble sort, insertion sort also has O(n^2) time complexity.
Here’s the sample implementation in Java.
1.  void InsertionSort(int[] arr){
2.    // the number of items 
3.    // sorted so far
4.    int j;
5.    int length = arr.length;
6.    for (j = 1; j < length; j++){
7.     
8.      // the item to be inserted
9.      int key = arr[j];
10.             
11.    int i=j-1;
12.    while(i>=0 && arr[i] > key){
13.             
14.      // move one value over 
15.      // one place to the right
16.      arr[i+1] = arr[i];
17.      i--;
18.    }
19.    // put the key in its proper 
20.    // location
21.    arr[i+1] = key;    
22.  }
23.}
Here’s a step-by-step example of insertion sort.

Merge Sort

Merge sort divides the array into two halves, sorts each of those halves and then merges them back together. For each half, it uses the same algorithm to divide and merge smaller halves back. The smallest halves will just have one element each which is already sorted.
Note that the merge step needs an auxiliary array to avoid overwriting the values in the original array. The sorted values are then copied back from the auxiliary array to the original array.
Merge sort is a very typical divide-and-conquer algorithm. The idea is to divide a bigger problem (sorting a bigger array) to a smaller problem (sorting a smaller array) and then combine the results to generate the final result (by merging sorted arrays back).
The average time complexity of merge sort is O(nlog(n)).
Here’s the sample implementation of merge sort in Java.
1.  void mergeSort(int[] arr){
2.    int[] helper = new int[arr.length];
3.    mergeSort(arr,helper, 0, 
4.      arr.length-1);
5.  }
6.  void mergeSort(int[] arr, int[] helper, 
7.    int low, int high){
8.    if(low < high){
9.   
10.     /* divide the array to two
11.     halves and sort them */
12.     int middle = (low+high)/2;
13.     mergeSort(arr, helper, 
14.       low, middle);
15.     mergeSort(arr, helper, 
16.       middle+1, high);
17.     // merge the sorted array
18.     merge(arr, helper, low, 
19.       middle, high);
20.   }
21. }
22. void merge(int[] arr, int[] helper, 
23.   int low, int middle, int high){
24.   /* copy both halves in 
25.   a helper array */
26.   for(int i=low, i <= high; i++){
27.     helper[i] = array[i];
28.   }
29.   // the start element indexes of
30.   // the two halves to be merged 
31.   int left = low;
32.   int right = middle+1;
33.   int current = low;
34.   /* Iterate through the helper array. 
35.   Compare each elements in the left 
36.   and right halves and copy the smaller
37.   element from the two halves to 
38.   the original array. */
39.   while(left < middle && right <= high){
40.  
41.     if(helper[left] <= helper[right]){
42.       arr[current] = helper[left];
43.       left++;
44.     }
45.     else{
46.       arr[current] = helper[right];
47.       right++;
48.     }
49.     current++;
50.   }
51.   /* copy the rest of the left side  to the target array */
52.   int remaining = middle - left;
53.   for(int i=0; i <= remaining; i++){
54.     arr[current++]=helper[left+i];
55.   }
56. }
Here’s a step-by-step example of merge sort.

Quick Sort

Quick Sort is also a divide-and-conquer algorithm like Merge Sort. In Quick Sort, we pick a random element (called pivot) to partition the unsorted array to two parts: all the elements less then the pivot and all the elements that are greater than the pivot. If we keep partitioning the sub arrays with the same method recursively, eventually we will get a sorted array.
The most perfect case is that everytime we pick a pivot that is the median of the list. Then the time complexity will be O(nlog(n)). The worst case happens if everytime we pick a pivot that is the largest or smallest element in the array. For example, if the list is sorted in reversed order and our pivot picking strategy is to pick the first element in the array, then after every patitioning we will get a partition with 1 element and another partition with n-1 elements. In such case, the time complexity will be reduced to O(n^2).
Here’s the sample implementation in Java.
1.  void quickSort(int[] arr,   int left, int right){
2.      int index = partition(arr, left,       right);
3.  if(left < index-1){
4.      quickSort(arr, left, index-1);
5.    }
6.    if(index < right){
7.      quickSort(arr, index, right);
8.    }
9.  }
10.int partition(int[] arr,   int left, int right) {
11.  // pick a pivot in the middle of  the array
12.  int pivot = arr[(left+right)/2];
13.  while(left <= right){
14.    // find the element on left that should be on right
15.    while(arr[left] < pivot)
16.      left++;
17.    // find the element on right that should be on left
18.    while(arr[right] > pivot)
19.      right--;
20.    if(left <= right){
21.      // swap elements
22.      swap(arr, left, right);
23.      left++;
24.      right--;
25.    }
26.    return left;
27.  }
28.}