Introduction
You have a dataset in AWS S3 and would like to download it to Crusoe Infrastructure for training purposes.
Prerequisites
- Kubernetes API access
- Kubeconfig
- AWS Credentials
- Crusoe Shared Disk
- Python
Summary
The script provided below performs the following functions:
- Creates a generic Kubernetes secret to store AWS credentials.
- Lists all the files from the AWS S3 bucket/path assigned to the S3_BUCKET variable
- Depending on the list of files and the total size, the manifest will be split and distributed into multiple parts
- The split manifest files are then copied over to the shared disk, so each worker can access the files from the disk
- Each rclone worker will pick one manifest file and start downloading the files mentioned in the manifest
Step-by-Step Instructions
- Export the AWS access and secret key as a variable
export AWS_ACCESS_KEY_ID= export AWS_SECRET_ACCESS_KEY=
- Create a Crusoe Shared Disk to be used for downloading the data.
apiVersion: v1 kind: PersistentVolumeClaim metadata: name: sharedfs-mount spec: accessModes: - ReadWriteMany storageClassName: crusoe-csi-driver-fs-sc resources: requests: storage: 1000Ti volumeMode: Filesystem
- Modify the following python script to set the necessary variables and save it as
rclone-download.py
import boto3 import subprocess import os import json import time import logging from botocore.exceptions import ClientError # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') ## CONFIGURATION ## AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') NUM_PODS = 8 # Decides number of parallel workstreams SECRET_NAME = "aws-credentials" # Name of the K8s secret PVC_NAME = "sharedfs-mount" # Name of the shared disk to be mounted S3_BUCKET = "crusoe-solutions-engineering/c4-dataset/en" # AWS bucket/path MASTER_POD_NAME = "rclone-master" # Name of the master pod MANIFEST_DIR = "/manifests" # Mount path to save manifest file on shared disks INSTANCE_CLASS = "s1a" # Type of instance to use for scheduling rclone workers ## Create the K8s secret ## def create_k8s_secret(): logging.info(f"Creating or updating Kubernetes secret '{SECRET_NAME}'...") cmd = [ "kubectl", "create", "secret", "generic", SECRET_NAME, f"--from-literal=AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID}", f"--from-literal=AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}", "--dry-run=client", "-o", "yaml" ] proc = subprocess.run(cmd, capture_output=True, check=True) subprocess.run(["kubectl", "apply", "-f", "-"], input=proc.stdout, check=True) logging.info("Kubernetes secret created or updated.") ## List the files in your AWS S3 bucket/path and save it as a manifest ## ## Manifest file has the list of all files and their size from the defined bucket ## def list_s3_files(bucket_name, prefix=''): logging.info(f"Listing files in S3 bucket {bucket_name}, prefix: {prefix}...") try: session = boto3.Session( aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) s3 = session.client('s3') paginator = s3.get_paginator('list_objects_v2') pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix) files = [] for page in pages: for obj in page.get('Contents', []): files.append({'key': obj['Key'], 'size': obj['Size']}) logging.info(f"Found {len(files)} files in S3 bucket {bucket_name}, prefix: {prefix}.") return files except ClientError as e: logging.error(f"Error listing files: {e}") return [] ## Split the manifest files into multiple parts so each worker will download the same amount of data ## def split_manifests(files, num_manifests, bucket_name): logging.info(f"Splitting into {num_manifests} balanced manifests...") files.sort(key=lambda x: x['size'], reverse=True) manifests = [[] for _ in range(num_manifests)] manifest_sizes = [0] * num_manifests manifest_files_content = {} for file in files: idx = manifest_sizes.index(min(manifest_sizes)) manifests[idx].append(file['key']) manifest_sizes[idx] += file['size'] for i, manifest in enumerate(manifests): content = "\n".join(manifest) + "\n" filename = f"manifest-part-{i}.txt" manifest_files_content[filename] = content logging.info(f"{filename}: {len(manifest)} files, {manifest_sizes[i]} bytes") return manifest_files_content ## Create the rclone-master pod to copy the split manifest files to the shared disk ## def create_master_pod_and_copy_manifests(manifest_files_content, bucket_name): logging.info(f"Creating rclone-master pod '{MASTER_POD_NAME}'...") master_pod_manifest = { "apiVersion": "v1", "kind": "Pod", "metadata": { "name": MASTER_POD_NAME }, "spec": { "volumes": [ { "name": "myvolume", "persistentVolumeClaim": { "claimName": PVC_NAME, "readOnly": False } } ], "containers": [ { "name": "rclone-master", "image": "busybox", "command": ["/bin/sh", "-c", "while true; do sleep 3600; done"], "volumeMounts": [ { "mountPath": MANIFEST_DIR, "name": "myvolume" } ] } ] } } subprocess.run([ "kubectl", "apply", "-f", "-", ], input=json.dumps(master_pod_manifest).encode(), check=True) logging.info(f"rclone-master pod '{MASTER_POD_NAME}' created.") logging.info(f"Waiting for pod '{MASTER_POD_NAME}' to be running...") subprocess.run([ "kubectl", "wait", "--for=condition=Ready", f"pod/{MASTER_POD_NAME}", "--timeout=600s" ], check=True) logging.info(f"Pod '{MASTER_POD_NAME}' is running.") logging.info(f"Copying manifest files to pod '{MASTER_POD_NAME}:{MANIFEST_DIR}'...") for filename, content in manifest_files_content.items(): with open(filename, "w") as tmp_file: tmp_file.write(content) subprocess.run([ "kubectl", "cp", filename, f"{MASTER_POD_NAME}:{MANIFEST_DIR}/{filename}" ], check=True) os.remove(filename) print(f" Copied '{filename}' to '{MASTER_POD_NAME}:{MANIFEST_DIR}'") logging.info("Manifest files copied to the rclone-master pod.") ## Create rclone-workers equal to NUM_PODS variable and have the same shared disk mounted ## ## Each worker pod will get assigned the unique manifest we created previously ## def launch_worker_pods(num_pods, bucket_name): logging.info(f"Launching {num_pods} rclone-worker pods...") for i in range(num_pods): pod_name = f"rclone-worker-{i}" manifest_file = f"/data/manifest-part-{i}.txt" rclone_command = [ "sh", "-c", ( "mkdir -p /root/.config/rclone && " "echo '[s3]\n" "type = s3\n" "provider = AWS\n" "env_auth = true' /root/.config/rclone/rclone.conf && " f"rclone copy --files-from {manifest_file} s3:{bucket_name} /data " "--s3-chunk-size=64M --fast-list --buffer-size=32M --transfers=40 --multi-thread-streams=40 -P" ) ] overrides = { "spec": { "hostNetwork": True, "nodeSelector": { "crusoe.ai/instance.class": INSTANCE_CLASS }, "containers": [ { "name": "rclone-worker", "image": "rclone/rclone", "command": rclone_command, "volumeMounts": [ { "mountPath": "/data", "name": "myvolume" } ], "env": [ { "name": "AWS_ACCESS_KEY_ID", "valueFrom": { "secretKeyRef": { "name": SECRET_NAME, "key": "AWS_ACCESS_KEY_ID" } } }, { "name": "AWS_SECRET_ACCESS_KEY", "valueFrom": { "secretKeyRef": { "name": SECRET_NAME, "key": "AWS_SECRET_ACCESS_KEY" } } } ] } ], "volumes": [ { "name": "myvolume", "persistentVolumeClaim": { "claimName": PVC_NAME, "readOnly": False } } ] } } logging.info(f"Launching pod {pod_name} with manifest '{manifest_file}'...") subprocess.run([ "kubectl", "run", pod_name, "--restart=Never", "--overrides", json.dumps(overrides), "--image=rclone/rclone", "--labels=app=rclone-worker" ], check=True) logging.info("All worker pods launched.") ## Wait for worker pods to complete download ## def wait_for_pods_completion(label_selector="app=rclone-worker", poll_interval=15): logging.info("Waiting for rclone-worker pods to complete...") backoff = poll_interval while True: result = subprocess.run( ["kubectl", "get", "pods", "-l", label_selector, "-o", "json"], capture_output=True, text=True, check=True ) pods = json.loads(result.stdout)["items"] incomplete = [p["metadata"]["name"] for p in pods if p["status"]["phase"] not in ("Succeeded", "Failed")] if not incomplete: logging.info("All pods have completed.") break logging.info(f"Still running: {incomplete}") time.sleep(backoff) backoff = min(backoff * 2, 10) # exponential backoff, max 60s ## Cleanup the worker pods after test completes ## def delete_pods(label_selector="app=rclone-worker"): logging.info("Deleting all rclone-worker pods...") subprocess.run( ["kubectl", "delete", "pod", "-l", label_selector], check=True ) ## Delete master pod post test completion ## def delete_master_pod(): logging.info(f"Deleting rclone-master pod '{MASTER_POD_NAME}'...") try: subprocess.run( ["kubectl", "delete", "pod", MASTER_POD_NAME], check=True ) logging.info(f"rclone-master pod '{MASTER_POD_NAME}' deleted.") except subprocess.CalledProcessError as e: if b"not found" in e.stderr: logging.info(f"rclone-master pod '{MASTER_POD_NAME}' not found.") else: raise def main(): create_k8s_secret() parts = S3_BUCKET.split('/') bucket_name = parts[0] prefix = '/'.join(parts[1:]) + '/' if len(parts) 1 else '' files = list_s3_files(bucket_name, prefix) manifest_files_content = split_manifests(files, NUM_PODS, prefix) create_master_pod_and_copy_manifests(manifest_files_content, bucket_name) time.sleep(5) launch_worker_pods(NUM_PODS, bucket_name) wait_for_pods_completion() delete_master_pod() delete_pods() if __name__ == "__main__": main()
- Run the script
python3 rclone-download.py
Additional Resources
-
-
- With 8 pods running in parallel and spread across 2 s1a.160x type nodes, we observed the following peak bandwidth number. This is cumulative bandwidth of all the rclone workers.
- The output of the script should look like below:
# python3 rclone-download.py
2025-05-15 17:18:39,010 - INFO - Creating or updating Kubernetes secret 'aws-credentials'...
secret/aws-credentials configured
2025-05-15 17:18:40,200 - INFO - Kubernetes secret created or updated.
2025-05-15 17:18:40,201 - INFO - Listing files in S3 bucket crusoe-solutions-engineering, prefix: c4-dataset/en/...
2025-05-15 17:18:41,505 - INFO - Found 1032 files in S3 bucket crusoe-solutions-engineering, prefix: c4-dataset/en/.
2025-05-15 17:18:41,507 - INFO - Splitting into 8 balanced manifests...
2025-05-15 17:18:41,509 - INFO - manifest-part-0.txt: 129 files, 40888132483 bytes
2025-05-15 17:18:41,509 - INFO - manifest-part-1.txt: 129 files, 40888167248 bytes
2025-05-15 17:18:41,509 - INFO - manifest-part-2.txt: 129 files, 40888209571 bytes
2025-05-15 17:18:41,509 - INFO - manifest-part-3.txt: 129 files, 40887191536 bytes
2025-05-15 17:18:41,509 - INFO - manifest-part-4.txt: 129 files, 40888113404 bytes
2025-05-15 17:18:41,509 - INFO - manifest-part-5.txt: 129 files, 40888203247 bytes
2025-05-15 17:18:41,509 - INFO - manifest-part-6.txt: 129 files, 40888273042 bytes
2025-05-15 17:18:41,509 - INFO - manifest-part-7.txt: 129 files, 40888232487 bytes
2025-05-15 17:18:41,509 - INFO - Creating rclone-master pod 'rclone-master'...
pod/rclone-master created
2025-05-15 17:18:42,705 - INFO - rclone-master pod 'rclone-master' created.
2025-05-15 17:18:42,705 - INFO - Waiting for pod 'rclone-master' to be running...
pod/rclone-master condition met
2025-05-15 17:19:00,344 - INFO - Pod 'rclone-master' is running.
2025-05-15 17:19:00,344 - INFO - Copying manifest files to pod 'rclone-master:/manifests'...
Copied 'manifest-part-0.txt' to 'rclone-master:/manifests'
Copied 'manifest-part-1.txt' to 'rclone-master:/manifests'
Copied 'manifest-part-2.txt' to 'rclone-master:/manifests'
Copied 'manifest-part-3.txt' to 'rclone-master:/manifests'
Copied 'manifest-part-4.txt' to 'rclone-master:/manifests'
Copied 'manifest-part-5.txt' to 'rclone-master:/manifests'
Copied 'manifest-part-6.txt' to 'rclone-master:/manifests'
Copied 'manifest-part-7.txt' to 'rclone-master:/manifests'
2025-05-15 17:19:17,450 - INFO - Manifest files copied to the rclone-master pod.
2025-05-15 17:19:22,456 - INFO - Launching 8 rclone-worker pods...
2025-05-15 17:19:22,456 - INFO - Launching pod rclone-worker-0 with manifest '/data/manifest-part-0.txt'...
pod/rclone-worker-0 created
2025-05-15 17:19:23,064 - INFO - Launching pod rclone-worker-1 with manifest '/data/manifest-part-1.txt'...
pod/rclone-worker-1 created
2025-05-15 17:19:23,702 - INFO - Launching pod rclone-worker-2 with manifest '/data/manifest-part-2.txt'...
pod/rclone-worker-2 created
2025-05-15 17:19:24,317 - INFO - Launching pod rclone-worker-3 with manifest '/data/manifest-part-3.txt'...
pod/rclone-worker-3 created
2025-05-15 17:19:24,986 - INFO - Launching pod rclone-worker-4 with manifest '/data/manifest-part-4.txt'...
pod/rclone-worker-4 created
2025-05-15 17:19:25,611 - INFO - Launching pod rclone-worker-5 with manifest '/data/manifest-part-5.txt'...
pod/rclone-worker-5 created
2025-05-15 17:19:26,276 - INFO - Launching pod rclone-worker-6 with manifest '/data/manifest-part-6.txt'...
pod/rclone-worker-6 created
2025-05-15 17:19:26,867 - INFO - Launching pod rclone-worker-7 with manifest '/data/manifest-part-7.txt'...
pod/rclone-worker-7 created
2025-05-15 17:19:27,497 - INFO - All worker pods launched.
2025-05-15 17:19:27,498 - INFO - Waiting for rclone-worker pods to complete...
2025-05-15 17:19:28,343 - INFO - Still running: ['rclone-worker-0', 'rclone-worker-1', 'rclone-worker-2', 'rclone-worker-3', 'rclone-worker-4', 'rclone-worker-5', 'rclone-worker-6', 'rclone-worker-7']
2025-05-15 17:19:44,233 - INFO - Still running: ['rclone-worker-0', 'rclone-worker-1', 'rclone-worker-2', 'rclone-worker-3', 'rclone-worker-4', 'rclone-worker-5', 'rclone-worker-6', 'rclone-worker-7']
2025-05-15 17:19:55,204 - INFO - Still running: ['rclone-worker-0', 'rclone-worker-1', 'rclone-worker-2', 'rclone-worker-3', 'rclone-worker-4', 'rclone-worker-5', 'rclone-worker-6', 'rclone-worker-7']
2025-05-15 17:20:06,117 - INFO - Still running: ['rclone-worker-1', 'rclone-worker-2', 'rclone-worker-3', 'rclone-worker-4', 'rclone-worker-5', 'rclone-worker-6', 'rclone-worker-7']
2025-05-15 17:20:17,078 - INFO - Still running: ['rclone-worker-2', 'rclone-worker-3', 'rclone-worker-4', 'rclone-worker-5', 'rclone-worker-6', 'rclone-worker-7']
2025-05-15 17:20:28,469 - INFO - Still running: ['rclone-worker-3']
2025-05-15 17:20:39,439 - INFO - All pods have completed.
2025-05-15 17:20:39,440 - INFO - Deleting rclone-master pod 'rclone-master'...
pod "rclone-master" deleted
2025-05-15 17:21:10,647 - INFO - rclone-master pod 'rclone-master' deleted.
2025-05-15 17:21:10,647 - INFO - Deleting all rclone-worker pods...
pod "rclone-worker-0" deleted
pod "rclone-worker-1" deleted
pod "rclone-worker-2" deleted
pod "rclone-worker-3" deleted
pod "rclone-worker-4" deleted
pod "rclone-worker-5" deleted
pod "rclone-worker-6" deleted
pod "rclone-worker-7" deleted
- With 8 pods running in parallel and spread across 2 s1a.160x type nodes, we observed the following peak bandwidth number. This is cumulative bandwidth of all the rclone workers.
-
Comments
0 comments
Please sign in to leave a comment.