Sunday, April 29, 2018

Kafka - Single Disk Failure Causes Broker to Crash

Summary: A failure, either hardware or lack of free space, in a single disk in the log.dirs list will cause a Kafka broker to crash. The article describes the symptoms and outlines manual steps that can be implemented to move data between disks on a single Kafka broker to work around the issue.


Symptoms:
A Kafka broker in a cluster may crash due to a single disk failure. A disk failure includes I/O errors due to hardware failure or high disk space usage leaving no additional space for storage. 
In this scenario a Kafka broker may crash with the following error:
YYYY-MM-DD HH:MM:SS,MS FATAL kafka.server.ReplicaFetcherThread: [ReplicaFetcherThread-W-XYZ], Disk error while replicating data for [TOPIC,PARTITION]
kafka.common.KafkaStorageException: I/O exception in append to log 'TOPIC-PARTITION'
 at kafka.log.Log.append(Log.scala:320)
 at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:114)
 at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:139)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:123)
 at scala.Option.foreach(Option.scala:257)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:123)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:121)
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:121)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:121)
 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:121)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:255)
 at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:119)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:94)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.io.IOException: No space left on device
 at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
 at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
 at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
 at sun.nio.ch.IOUtil.write(IOUtil.java:65)
 at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
 at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:160)
 at kafka.log.FileMessageSet.append(FileMessageSet.scala:231)
 at kafka.log.LogSegment.append(LogSegment.scala:86)
 at kafka.log.Log.append(Log.scala:363)
 ... 19 more
The broker may also run into the following issue on startup:
YYYY-MM-DD HH:MM:SS,MS ERROR kafka.log.LogManager: There was an error in one of the threads during logs loading: java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code
YYYY-MM-DD HH:MM:SS,MS FATAL kafka.server.KafkaServer: Fatal error during KafkaServer startup. Prepare to shutdown
java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code
 at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
 at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:209)
 at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:186)
 at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
 at kafka.log.LogSegment.recover(LogSegment.scala:179)
 at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:190)
 at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:162)
 at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
 at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
 at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
 at kafka.log.Log.loadSegments(Log.scala:162)
 at kafka.log.Log.(Log.scala:92)
 at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
 at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)



Fix:

If one of the disks listed in log.dirs is running out of space then a user may manually move replica data to another disk within the same Kafka broker to free up space or balance partition data across disks.

Assume the following scenario where you have disks Disk1 and Disk2 for broker 1. 

On Disk1 you have the following topic and partitions:

TopicA partitions 1 and 2

On Disk2 you have the following topic and partitions:

TopicB partitions 1 and 2, and TopicC partition 1

Therefore, you will have the following directories and files under Disk1:

/TopicA-1
/TopicA-2
replication-offset-checkpoint
recovery-point-offset-checkpoint

On Disk2, you'll have the following directories and files:

/TopicB-1
/TopicB-2
/TopicC-1
replication-offset-checkpoint
recovery-point-offset-checkpoint

Both the replication-offset-checkpoint and recovery-point-offset-checkpoint files on Disk1 will have the following content:

0
2
TopicA 1 X
TopicA 2 Y

The files always contain a value of 0 on the first line, followed by an integer value that represents the number of partitions. Each line after that contains the topic name, partition ID followed by an integer value for the offset. Here we denote the offsets using X and Y as examples but the actual file will have an integer value.

The same files on Disk2 will have the following contents:

0
3
TopicB 1 Z
TopicB 2 M
TopicC 1 N

Let's say you want to now move both partitions 1 and 2 for TopicB from disk2 to disk1, then you will need to do the following after confirming that broker1 is in a stopped state:

1.) Move the data directories /TopicB-1 and /TopicB-2 from Disk2 to Disk1
2.) Make a backup of the replication-offset-checkpoint and recovery-point-offset-checkpoint files on Disk1 (target log directory) and Disk2 (source log directory).
3.) Modify both the replication-offset-checkpoint and recovery-point-offset-checkpoint files on Disk1 (target log directory) so that it looks like the following:

0
4
TopicA 1 X
TopicA 2 Y
TopicB 1 Z
TopicB 2 M

You will also need to edit the replication-offset-checkpoint and recovery-point-offset-checkpoint files on the source log directory (Disk2) as follows:

0
1
TopicC 1 N

No comments: