Introduction
Large Language Models (LLMs) are very much in demand right now, and they need a lot of compute power to train. Llama 1 used 2048 NVIDIA A100 80Gb GPUs and trained for 21 days, with an estimated cost north of $5 million. LLMs have become increasingly popular for tasks such as language translation, text summarization, and sentiment analysis. Getting your hands on big enough GPUs can be logistically challenging and expensive. Also, the shortage of GPU chips is also a bottleneck for many organizations. So instead of scaling vertically, an idea that is picking up speed now is scaling the training of LLMs horizontally, i.e. using distributed computing frameworks to run training jobs of large models on multiple instances, equipped with GPU. In this blog post, we will explore how Kubernetes, Ray, and Kubeflow can be combined together to create an efficient and cost-effective platform for training ML models.
Goal
This post aims to demonstrate how we can build a Kubernetes cluster that orchestrates workloads that need GPU instances. This is used to increase the utilization of on-prem GPU resources or run these workloads on different/specialized cloud providers. For example, we can use GPU instances provided by specialized cloud vendors like Genesis Cloud. Open-source tools like Ray and Kubeflow can efficiently distribute and orchestrate training jobs to maximize performance and reduce costs. In the following sections, we will go through step-by-step instructions for setting up a multi-GPU Kubernetes cluster and demonstrate how to run a machine learning training job on this cluster or you can take a look at the related GitHub repo.
What are Kubernetes and Kubeflow
Kubernetes is an open-source container orchestration system that simplifies the deployment, scaling, and management of containerized applications. It is widely used in the industry right now. Kubeflow is a machine learning toolkit that has tight integration with Kubernetes. It provides a collection of open-source tools for building and deploying machine learning workflows at scale. Together, these technologies provide a powerful platform for building and deploying machine learning applications in a scalable manner.
What are Ray and KubeRay
Ray is an open-source framework for scaling complex workloads. It is being used heavily for AI and Python applications. With its simple and intuitive API Ray makes the distributed training and serving of ML models easy. One of the unique features of Ray is its ability to efficiently manage resources in a distributed environment, such as scaling up and down the number of worker nodes. This makes it an ideal choice for building distributed ML applications. KubeRay Operator is a package that allows you to easily orchestrate Ray jobs on Kubernetes. By using KubeRay, you can take advantage of the benefits of Ray using your existing Kubernetes infrastructure.
Following, we will go through how to set up and use KubeRay to run ML training jobs on our own Kubernetes cluster.
Bringing it All Together
To build our Kubernetes cluster, we will use K3s, a lightweight open-source Kubernetes distribution that is easy to install and requires minimal resources. The cluster will contain GPU as well as CPU-only instances. On Genesis Cloud, you would pay $0.70/hr for each NVIDIA RTX3090 GPU, and for the CPU-only instances $0.10/hr for 2vCPUs. Once we have our Kubernetes cluster up and running, we will:
install the NVIDIA GPU Operator
install KubeRay Opeator
install Kubeflow
create a Ray Cluster
run MNIST model training
The architecture looks as follows:
Let's get started
Everything in this field is moving very fast. Below are the versions of the tools that were used for this demo:
Kubernetes 1.25
Python 3.8
Ray 2.6
Kubeflow 1.7
Ubuntu 20.04
KubeRay 0.6.0
NVIDIA GPU Operator v23.6.0
Demo tested on Genesis Cloud with NVIDIA RTX3090 GPUs
To keep the blog post short, we will go through only the main steps required to create the architecture described above. Detailed steps and explanations are provided in the repository. Tools like curl, git, make, kustomize, and helm need to be installed in your local system.
To create the K3s cluster, we start by installing K3s in one of the instances. This will be our main instance. It can be an instance that has no GPUs because it will be running mostly Kubernetes administrative tasks. For this post, I will be using a 4 vCPU, 8 GiB Memory, 80 GiB SSD configuration.
curl -sfL https://get.k3s.io | sh -
After K3s is installed we need to keep a note of the IP address of the instance and the token that allows other instances to join the cluster. You can get the token by running:
sudo cat /var/lib/rancher/k3s/server/node-token
Next, we will add the rest of the instances in the cluster. The worker nodes are configured with 2 RTX 3090 GPUs each:
curl -sfL https://get.k3s.io | K3S_URL=https://<MAIN_NODE_IP>:6443 K3S_TOKEN=<K3S_TOKEN> sh -
where MAIN_NODE_IP is the IP of the main node and we got the K3S_TOKEN before.
NVIDIA GPU Operator
Once we have our instances joined in the cluster, we will install the NVIDIA GPU Operator. The purpose of the GPU Operator is to make our hardware GPUs visible to the Kubernetes cluster. You can install it as follows:
helm install --wait --generate-name \
-n gpu-operator --create-namespace \
nvidia/gpu-operator \
--set driver.enabled=false \
--set toolkit.enabled=false \
--kubeconfig /etc/rancher/k3s/k3s.yaml
Now, our cluster can orchestrate workloads that require GPUs. We can inspect the output of the describe nodes command for "nvidia.com/gpu" allocatable resources.
kubectl describe nodes
The output should contain a snippet like this:
Capacity:
cpu: 8
ephemeral-storage: 79066Mi
hugepages-1Gi: 0
hugepages-2Mi: 0
memory: 49426832Ki
nvidia.com/gpu: 2
pods: 110
Allocatable:
cpu: 8
ephemeral-storage: 78761374454
hugepages-1Gi: 0
hugepages-2Mi: 0
memory: 49426832Ki
nvidia.com/gpu: 2
pods: 110
System Info:
In the snippet, we can see how many CPUs are available in the instance, the amount of memory, the number of GPUs, and for each of them, how mare are allocatable for Kubernetes workloads. The 110 pods is the default Kubernetes configuration for maximum pods allowed on a node.
KubeRay operator
Next, installing the KubeRay Operator is straightforward:
helm upgrade --install \
kuberay-operator kuberay/kuberay-operator \
--namespace kuberay-operator \
--create-namespace \
--version 0.6.0 \
--kubeconfig /etc/rancher/k3s/k3s.yaml
Kubeflow
After installing the KubeRay Operator, we will proceed to install Kubeflow.
git clone https://github.com/data-max-hq/manifests.git
cd manifests/
while ! kustomize build example | awk '!/well-defined/' | sudo k3s kubectl apply -f -; do echo "Retrying to apply resources"; sleep 10; done
Ray Cluster
Finally, we will create a Ray Cluster and start using it from Kubeflow.
kubectl apply -f https://raw.githubusercontent.com/data-max-hq/cost-effective-ml/main/k3s/ray-cluster.yaml
The cluster is running when the head pod is in the RUNNING state. Check the status by running:
kubectl get pods -n kubeflow-user-example-com
To check that everything is running as expected, you can run the following code on the cluster, either from a Notebook on Kubeflow or directly from a pod. First, install ray:
pip install ray==2.6.1
and then run:
import ray
ray.init(address="<RAY_ADDRESS>")
@ray.remote(num_gpus=2)
def check_gpus():
print(ray.get_gpu_ids())
check_gpus.remote()
The expected response looks like below, showing the ids of the allocated GPUs:
[0, 1]
To run distributed model training, we will now use Ray Train and ScalingConfig to allocate the required resources and abstract away the specific framework complexity for the training job.
We will go through an example of running an MNIST training job with Ray. Below we will be installing the needed packages:
pip install ray==2.6.1 tensorflow==2.12.1 pyarrow tblib
And the training code looks as below
import argparse
from filelock import FileLock
import json
import os
import numpy as np
from ray.air.result import Result
import tensorflow as tf
from ray.train.tensorflow import TensorflowTrainer
from ray.air.integrations.keras import ReportCheckpointCallback
from ray.air.config import ScalingConfig
def mnist_dataset(batch_size: int) -> tf.data.Dataset:
with FileLock(os.path.expanduser("~/.mnist_lock")):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.shuffle(60000)
.repeat()
.batch(batch_size)
)
return train_dataset
def build_cnn_model() -> tf.keras.Model:
model = tf.keras.Sequential(
[
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation="relu"),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation="relu"),
tf.keras.layers.Dense(10),
]
)
return model
def train_func(config: dict):
per_worker_batch_size = config.get("batch_size", 64)
epochs = config.get("epochs", 3)
steps_per_epoch = config.get("steps_per_epoch", 70)
tf_config = json.loads(os.environ["TF_CONFIG"])
num_workers = len(tf_config["cluster"]["worker"])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = build_cnn_model()
learning_rate = config.get("lr", 0.001)
multi_worker_model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
metrics=["accuracy"],
)
history = multi_worker_model.fit(
multi_worker_dataset,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
callbacks=[ReportCheckpointCallback()],
)
results = history.history
return results
def train_tensorflow_mnist(
num_workers: int = 2, use_gpu: bool = False, epochs: int = 4
) -> Result:
config = {"lr": 1e-3, "batch_size": 64, "epochs": epochs}
trainer = TensorflowTrainer(
train_loop_per_worker=train_func,
train_loop_config=config,
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
)
results = trainer.fit()
return results
The code looks very similar to the normal MNIST training code, but notice, in our case, we are using the TensorflowTrainer class to instruct Ray that this is a Tensorflow training task, and the ScalingConfig tells Ray the amount of resources required and if we will be using GPUs.
We can trigger the training task with:
train_tensorflow_mnist(num_workers=2, use_gpu=True, epochs=3)
In the console output we will see that the task is using the specified amount of workers, i.e. GPUs:
Here we can see the trainer is using 2 GPUs to run training, as it was defined in the num_workers parameter.
There are more examples of model training for HuggingFace Transformers or PyTorch models in the Ray Train documentation.
At the end of these steps, we have built a powerful and dynamic platform for training ML models. Integrating Ray, Kubeflow, and Kubernetes together brings in the benefits of each technology while still being a flexible and cost-effective solution.
In Part 2 of the series, we will look into training an LLM model and bringing in Autoscaling so that you don't have to pay for the GPUs when the cluster is not running.
Benefits and Challenges
While there are a lot of benefits in this setup, there are also some challenges.
Benefits include:
Flexibility: By using our own Kubernetes cluster, we are not bound to any specific cloud provider and can take advantage of the benefits of specialized cloud providers.
Scalability: Kubernetes, Ray, and Kubeflow provide a powerful platform for building and deploying machine learning applications in a scalable manner.
Cost-effectiveness: By using a combination of CPU and GPU instances in the same cluster, we can optimize our platform for both cost and performance.
Ease of management: Kubernetes simplifies the deployment, scaling, and management of containerized applications, making it easy to manage a complex distributed platform like the one we have built.
Efficient resource management: Ray's ability to efficiently manage resources in a distributed environment allows us to optimize our platform for large machine learning training tasks.
Challenges:
Complexity: managing your own Kubernetes cluster can be demanding, especially when you need redundancy and high availability
Ray is very sensitive to the Python version and its library version. There are multiple moving parts in this setup. For instance, if we are running the code from a notebook, we will have the Python version used in the notebook kernel and the Ray version installed in the notebook, but also the Python and Ray version installed in the Ray Cluster. They all need to be the same down to the minor version.
Costly data transfers: depending on your cloud provider, you might be charged for egress data transfers. This makes it more expensive to move data between different cloud providers.
In cases when you are using Kubernetes already and either have your own GPUs or you want to use GPUs from a specialized cloud provider, this setup makes it easy to run workloads on GPUs using tools that you are already familiar with, keeping costs low (at least lower than renting GPUs from traditional cloud providers), and increasing the utilization for the GPUs.
Summary
In this blog post, we have demonstrated how to build a powerful platform for training large machine learning models at scale. This is required also when fine-tuning large open language models. By leveraging Kubernetes, Kubeflow, and Ray, we have created a hybrid Kubernetes cluster that provides a flexible, scalable, and cost-effective solution for ML training. We have provided step-by-step instructions for setting up the platform and discussed the benefits of each technology. By following the steps outlined in this blog post, you can build your own LLM training platform that is optimized for both cost and performance.
Comments