How to authenticate Kafka using Kerberos (SASL) with terminal, Spark (and Spark Streaming) and Jupyter Notebook.
To briefly explain what we are trying to get here: we want to have permission to read and write Kafka topics. Our Kafka is protected by Kerberos. It means, before we start accessing Kafka, we need to obtain a ticket from Kerberos. To get the ticket we have to provide a keytab — authentication file for each user. All there steps have to be done automatically, because when we use commands to access Kafka there won’t be opportunity to show keytab manually. To get things done we need to specify right parameters and configurations in the right place.
Here is my environment (your tools and versions may vary but the schema still should work):
- Cloudera Hadoop cluster v. 5+
- Kafka v. 2+ ( topic with Kerberos auth already exists)
- Spark v. 2+
- Kerberos v. 5
- Jupyter Notebook with Pyspark
For the beginning, let’s access protected Kafka topic with terminal. The access on topic should only be granted if we obtain a ticket from Kerberos for the right user. For this operation we need to prepare (it will be smoother if all the files will be in the same path):
- User’s keytab file ( for Kerberos )
- File jaas.conf:
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab=”${PATH_TO_YOUR_KEYTAB}“principal=”${USER_NAME}@${REALM}”;};
- File kafka_security.properties:
security.protocol=SASL_PLAINTEXTsasl.kerberos.service.name=kafkasasl.mechanism=GSSAPI
- File krb5.conf (probably located in /etc/krb5.conf or /etc/kafka/krb5.conf) (see JDK’s Kerberos Requirements for more details):
Then we need to export the variable with jaas.conf and krb5.conf:
export KAFKA_OPTS=”-Djava.security.auth.login.config=jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf”
Then we can write and read Kafka topic from Terminal:
For writing:
/bin/kafka-console-producer --broker-list ${KAFKA_BROKERS_WITH_PORTS} --topic ${TOPIC_NAME} --producer.config kafka_security.properties
For reading:
/bin/kafka-console-consumer --bootstrap-server ${KAFKA_BROKERS_WITH_PORTS} --topic ${TOPIC_NAME} --from-beginning --consumer.config kafka_security.properties
Hope everything worked!
Let’s do the same thing using Spark.
The challenge here is that we want Spark to access Kafka not only with application driver, but also with every executor. It means, each executor needs to obtain ticket from Kerberos with our keytab. To make Spark do this, we need to specify right configurations.
Firstly, we need the same jaas.conf:
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab=”${YOUR_KEYTAB_FILE} “principal=”${USER_NAME}@${REALM}”;};
Before launching Spark, we also need to export the variable:
export SPARK_KAFKA_VERSION=0.10
In Spark code we will access Kafka with these options (the first 5 is mandatory):
kafka.bootstrap.servers=${KAFKA_BROKERS_WITH_PORTS}kafka.security.protocol=SASL_PLAINTEXTkafka.sasl.kerberos.service.name=kafkakafka.sasl.mechanism=GSSAPIsubscribe=${TOPIC_NAME}startingOffsets=latestmaxOffsetsPerTrigger=1000
You can pass these options map to:
spark.readStream.
format("kafka").
options(myOptionsMap).
load()
Before starting Spark we can define shell variable
JAVA_OPTIONS="-Djava.security.auth.login.config=jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf"
Also we will need two copies of users Keytab with different name. If we already have one, we can create the second one with command:
cp $USER_NAME.keytab ${USER_NAME}_2.keytab
And to launch the spark application we should run this command:
spark2-submit \--master yarn \--conf "spark.yarn.keytab=${USER_NAME}_2.keytab" \--conf "spark.yarn.principal=$USER_NAME@$REALM" \--conf "spark.driver.extraJavaOptions=$JAVA_OPTIONS" \--conf "spark.executor.extraJavaOptions=$JAVA_OPTIONS" \--class "org.example.MyClass" \--jars spark-sql-kafka-0-10_2.11-2.4.0.jar \--files "jaas.conf","${USER_NAME}.keytab" \my_spark.jar
Or you can use the same configurations with spark-shell or pyspark.
Note: to allow Spark access HDFS we specify spark.yarn.keytab and spark.yarn.principal. To allow Spark access Kafka we specify spark.driver.extraJavaOptions and spark.executor.extraJavaOptions and provide files jaas.conf, ${USER_NAME}.keytab, mentioned in JavaOptions so every executor could receive a copy of these files for authentication. And for spark kafka dependency we provide spark-sql-kafka jar suitable for our spark version. We can also use option — package instead of — jars.
Hope everything worked!
Let’s do the same trick in PySpark using Jupyter Notebook.
To access shell environment from python we will use os.environ.
import os
import sysos.environ[‘SPARK_KAFKA_VERSION’] = ‘0.10’
Then we should configure Spark session.
spark = SparkSession.builder. \config(‘spark.yarn.keytab’, ‘${USER_NAME}_2.keytab’).\config(‘spark.yarn.principal’, ‘$USER_NAME@$REALM’).\config(‘spark.jars’, ‘spark-sql-kafka-0–10_2.11–2.4.0.jar’).\config(‘spark.driver.extraJavaOptions’, ‘-Djava.security.auth.login.config=jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf’).\config(‘spark.executor.extraJavaOptions’, ‘-Djava.security.auth.login.config=jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf’).\config(‘spark.files’, ‘jaas.conf,${KEYTAB}’).\.appName(“KafkaSpark”).getOrCreate()
we can connect to Kafka:
kafka_raw = spark.readStream. \format(‘kafka’). \option(‘kafka.bootstrap.servers’, ${KAFKA_BROKERS_WITH_PORTS}). \option(‘kafka.security.protocol’,’SASL_PLAINTEXT’). \option(‘kafka.sasl.kerberos.service.name’,’kafka’). \option(‘kafka.sasl.mechanism’,’GSSAPI’). \option(‘startingOffsets’,’earliest’). \option(‘maxOffestPerTrigger’,10). \option(‘subscribe’,${TOPIC_NAME}). \load()
To access the data we can use:
query = kafka_raw. \
writeStream. \
format("console"). \
start()
That’s it. I hope you could find all the configurations you need to access Kafka using Kerberos any way you like.