How to use TensorFlow with Redpanda for real-time machine learning

How to use TensorFlow with Redpanda for real-time machine learning

'Hello world!' with TensorFlow and Redpanda

According to TensorFlow's website:

TensorFlow is an end-to-end open-source platform for machine learning. It has a comprehensive, flexible ecosystem of tools, libraries, and community resources that lets researchers push the state-of-the-art in ML and developers easily build and deploy ML-powered applications.

People on the street import TensorFlow as tf, and so will the code below. Download the Apache Kafka®-IO wrappers for producing and consuming TensorFlow native record format straight from Kafka without intermediaries. Because Redpanda is a drop-in replacement for Kafka, and Kafka-API compatible, you can use TensorFlow with Redpanda, too (minus the need for Zookeeper® or JVM, of course).

Here's how to integrate Redpanda with TensorFlow to get you started in building real-time machine learning projects.

Install Redpanda

In this tutorial, I’ll only cover the Ubuntu install, but you can view all our quickstart documentation on our website. To install Redpanda with Ubuntu, run:

curl -1sLf \
  'https://packages.vectorized.io/nzc4ZYQK3WRGd9sy/redpanda/cfg/setup/bash.deb.sh' \
  | sudo -E bash

sudo apt-get install redpanda && sudo systemctl start redpanda

Install deps on your machine

pip3 install --user kafka-python tensorflow-io sklearn pandas

Copy this gist

I copied this gist from the main GitHub repo of the TensorFlow / io organization:

#Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))


COLUMNS = [
          #  labels
           'class',
          #  low-level features
           'lepton_1_pT',
           'lepton_1_eta',
           'lepton_1_phi',
           'lepton_2_pT',
           'lepton_2_eta',
           'lepton_2_phi',
           'missing_energy_magnitude',
           'missing_energy_phi',
          #  high-level derived features
           'MET_rel',
           'axial_MET',
           'M_R',
           'M_TR_2',
           'R',
           'MT2',
           'S_R',
           'M_Delta_R',
           'dPhi_r_b',
           'cos(theta_r1)'
           ]

susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()

# Number of datapoints and columns
len(susy_df), len(susy_df.columns)

# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])


train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]

x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]

# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))

NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)

def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))


def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)


OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10


# design/build the model
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())


# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)

# fit the model
model.fit(train_ds, epochs=EPOCHS)

test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["susy-test"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)

res = model.evaluate(test_ds)
print("test loss, test acc:", res)


online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["susy-train"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=30000, # in milliseconds, to block indefinitely, set it to -1.
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)


def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka_after_sleep(topic_name, items):
  time.sleep(30)
  print("#"*100)
  print("Writing messages into topic: {0} after a nice sleep !".format(topic_name))
  print("#"*100)
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name,
                  key=key.encode('utf-8'),
                  value=message.encode('utf-8')
                  ).add_errback(error_callback)
    count+=1
  producer.flush()
  print("#"*100)
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))
  print("#"*100)

def decode_kafka_online_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)


thread = threading.Thread(target=write_to_kafka_after_sleep,
                          args=("susy-train", zip(x_train, y_train)))
thread.daemon = True
thread.start()

for mini_ds in online_train_ds:
  mini_ds = mini_ds.shuffle(buffer_size=32)
  mini_ds = mini_ds.map(decode_kafka_online_item)
  mini_ds = mini_ds.batch(32)
  model.fit(mini_ds, epochs=3)

Download the data

curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

Profit!

I had to install the CUDA libs to get my GPU humming. However, after a quick install of that, I was up and running!

❯ python3 main.py
2021-01-11 22:23:02.910794: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
tensorflow-io version: 0.17.0
tensorflow version: 2.4.0
Number of training samples:  60000
Number of testing sample:  40000
Wrote 60000 messages into topic: susy-train
Wrote 40000 messages into topic: susy-test
2021-01-11 22:23:10.847745: I tensorflow_io/core/kernels/cpu_check.cc:128] Your CPU supports instructions that this TensorFlow IO binary was not compiled to use: AVX2 FMA
2021-01-11 22:23:10.902048: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-11 22:23:10.902534: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-01-11 22:23:10.928441: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-01-11 22:23:10.928471: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: darktower
2021-01-11 22:23:10.928483: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: darktower
2021-01-11 22:23:10.928573: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 460.27.4
2021-01-11 22:23:10.928598: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 455.28.0
2021-01-11 22:23:10.928608: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 455.28.0 does not match DSO version 460.27.4 -- cannot find working devices in this configuration
2021-01-11 22:23:10.931185: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-11 22:23:11.935134: I tensorflow_io/core/kernels/kafka_kernels.cc:349] Kafka tail: 59449
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
dense (Dense)                (None, 128)               2432      
_________________________________________________________________
dropout (Dropout)            (None, 128)               0         
_________________________________________________________________
dense_1 (Dense)              (None, 256)               33024     
_________________________________________________________________
dropout_1 (Dropout)          (None, 256)               0         
_________________________________________________________________
dense_2 (Dense)              (None, 128)               32896     
_________________________________________________________________
dropout_2 (Dropout)          (None, 128)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 1)                 129       
=================================================================
Total params: 68,481
Trainable params: 68,481
Non-trainable params: 0
_________________________________________________________________
None
2021-01-11 22:23:12.036891: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-01-11 22:23:12.038427: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2894530000 Hz
Epoch 1/10
2021-01-11 22:23:12.283066: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 0
      1/Unknown - 0s 415ms/step - loss: 0.7726 - accuracy: 0.37502021-01-11 22:23:12.518990: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 1024
     29/Unknown - 1s 20ms/step - loss: 0.6915 - accuracy: 0.54202021-01-11 22:23:13.043534: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 2048
     46/Unknown - 2s 25ms/step - loss: 0.6721 - accuracy: 0.57392021-01-11 22:23:13.575452: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 3072
     62/Unknown - 2s 27ms/step - loss: 0.6572 - accuracy: 0.59512021-01-11 22:23:14.102275: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 4096
     78/Unknown - 3s 28ms/step - loss: 0.6444 - accuracy: 0.61152021-01-11 22:23:14.638575: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 5120
     93/Unknown - 3s 29ms/step - loss: 0.6345 - accuracy: 0.62332021-01-11 22:23:15.167787: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 6144
    109/Unknown - 4s 30ms/step - loss: 0.6251 - accuracy: 0.63442021-01-11 22:23:15.696971: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 7168
    126/Unknown - 4s 30ms/step - loss: 0.6165 - accuracy: 0.64432021-01-11 22:23:16.213187: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 8192
    128/Unknown - 5s 33ms/step - loss: 0.6155 - accuracy: 0.64542021-01-11 22:23:16.733228: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 9216
    144/Unknown - 5s 33ms/step - loss: 0.6086 - accuracy: 0.65312021-01-11 22:23:17.252568: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 10240
    160/Unknown - 6s 33ms/step - loss: 0.6024 - accuracy: 0.65982021-01-11 22:23:17.771701: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 11264
    190/Unknown - 6s 31ms/step - loss: 0.5925 - accuracy: 0.67032021-01-11 22:23:18.295851: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 12288
    192/Unknown - 7s 33ms/step - loss: 0.5920 - accuracy: 0.67092021-01-11 22:23:18.815791: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 13312
    222/Unknown - 7s 31ms/step - loss: 0.5842 - accuracy: 0.67902021-01-11 22:23:19.343171: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 14336
    224/Unknown - 8s 33ms/step - loss: 0.5837 - accuracy: 0.67952021-01-11 22:23:19.857738: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 15360
    254/Unknown - 8s 31ms/step - loss: 0.5770 - accuracy: 0.68632021-01-11 22:23:20.379426: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 16384
    270/Unknown - 9s 31ms/step - loss: 0.5737 - accuracy: 0.68952021-01-11 22:23:20.899491: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 17408
    286/Unknown - 9s 31ms/step - loss: 0.5706 - accuracy: 0.69252021-01-11 22:23:21.420403: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 18432
    302/Unknown - 10s 31ms/step - loss: 0.5677 - accuracy: 0.69522021-01-11 22:23:21.941930: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 19456
    304/Unknown - 10s 33ms/step - loss: 0.5674 - accuracy: 0.69562021-01-11 22:23:22.461011: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 20480

Note: I did time myself when copying the gists into my teminal and it was a bit less than 3 minutes.

To learn more about what you can do with Redpanda, watch our YouTube videos, join our Slack community, or download the binary from GitHub. Follow us on Twitter via @redpandadata or follow me personally at @emaxerrno.