Wednesday, November 9, 2016

Kafka Custom Partition



1. Create partitioner class

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }

}
============================
2.TestProducer.java for testing
package com.alu.ipprd.spm.kafkapartition;
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {

public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092,localhost:9093");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer producer = new Producer(config);

        for (long nEvents = 0; nEvents < events; nEvents++) {
               long runtime = new Date().getTime();
               String ip = "192.168.2." + rnd.nextInt(255);
               String msg = runtime + ",www.example.com," + ip;
               KeyedMessage data = new KeyedMessage("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}
===========================
3.Maven dependencies

  org.apache.kafka
  kafka_2.9.2
  0.8.1.1
  compile
 
   
      jmxri
      com.sun.jmx
   
   
      jms
      javax.jms
   
   
      jmxtools
      com.sun.jdmk
   
 


No comments: