Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to the world of Kubernetes Deployments! ๐ Today weโre diving into one of the most powerful tools for managing containerized applications at scale.
Kubernetes Deployments are like having a smart assistant that manages your applications 24/7 ๐ค. They ensure your apps are always running, handle updates smoothly, and scale automatically based on demand. Whether youโre deploying a simple web app or a complex microservices architecture, Deployments are your best friend in the Kubernetes ecosystem!
By the end of this tutorial, youโll be confidently deploying and managing Python applications in Kubernetes like a pro! Letโs embark on this exciting journey! ๐
๐ Understanding Kubernetes Deployments
๐ค What are Kubernetes Deployments?
Think of a Kubernetes Deployment as a smart restaurant manager ๐ด. Just like a manager ensures there are always enough waiters during busy hours and replaces any who call in sick, a Deployment ensures your application always has the right number of running instances (pods) and replaces any that fail.
In Python terms, a Deployment is a declarative way to manage your applicationโs lifecycle. You tell Kubernetes what you want (e.g., โI want 3 instances of my Flask app runningโ), and it figures out how to make that happen and keep it that way! This means you can:
- โจ Automatically restart crashed applications
- ๐ Scale up during high traffic
- ๐ก๏ธ Roll out updates without downtime
๐ก Why Use Kubernetes Deployments?
Hereโs why developers love Deployments:
- Self-Healing ๐ฅ: Applications automatically restart if they crash
- Rolling Updates ๐: Deploy new versions without downtime
- Easy Scaling ๐: Handle traffic spikes effortlessly
- Version Control ๐ฆ: Roll back to previous versions instantly
Real-world example: Imagine running an e-commerce site during Black Friday ๐. With Deployments, your app automatically scales to handle millions of shoppers and recovers instantly if any instance crashes!
๐ง Basic Syntax and Usage
๐ Creating Your First Deployment
Letโs start with a simple Python Flask application deployment:
# ๐ First, let's create a simple Flask app (app.py)
from flask import Flask, jsonify
import os
import socket
app = Flask(__name__)
@app.route('/')
def hello():
# ๐จ Return some useful info about our pod
return jsonify({
'message': 'Hello from Kubernetes! ๐',
'hostname': socket.gethostname(),
'version': '1.0',
'pod_ip': os.environ.get('POD_IP', 'unknown')
})
@app.route('/health')
def health():
# ๐ฅ Health check endpoint for Kubernetes
return jsonify({'status': 'healthy! ๐ช'}), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Now, letโs create a Dockerfile:
# ๐ณ Dockerfile for our Python app
FROM python:3.9-slim
WORKDIR /app
# ๐ฆ Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# ๐ Copy our app
COPY app.py .
# ๐ฏ Expose port and run
EXPOSE 5000
CMD ["python", "app.py"]
๐ฏ Deployment Manifest
Hereโs how to define a Kubernetes Deployment using Pythonโs kubernetes client:
# ๐๏ธ deployment.py - Create a Deployment programmatically
from kubernetes import client, config
def create_deployment():
# ๐ง Load Kubernetes config
config.load_incluster_config() # Use this inside cluster
# config.load_kube_config() # Use this for local development
# ๐จ Create API instance
apps_v1 = client.AppsV1Api()
# ๐ฆ Define the deployment
deployment = client.V1Deployment(
metadata=client.V1ObjectMeta(
name="flask-app",
labels={"app": "flask-app"}
),
spec=client.V1DeploymentSpec(
replicas=3, # ๐ Run 3 instances
selector=client.V1LabelSelector(
match_labels={"app": "flask-app"}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": "flask-app"}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="flask-app",
image="myregistry/flask-app:1.0",
ports=[client.V1ContainerPort(container_port=5000)],
env=[
# ๐ Pass pod IP as environment variable
client.V1EnvVar(
name="POD_IP",
value_from=client.V1EnvVarSource(
field_ref=client.V1ObjectFieldSelector(
field_path="status.podIP"
)
)
)
],
# ๐ฅ Health checks
liveness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path="/health",
port=5000
),
initial_delay_seconds=30,
period_seconds=10
),
readiness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path="/health",
port=5000
),
initial_delay_seconds=5,
period_seconds=5
)
)
]
)
)
)
)
# ๐ Create the deployment
apps_v1.create_namespaced_deployment(
namespace="default",
body=deployment
)
print("โ
Deployment created successfully!")
๐ก Practical Examples
๐ Example 1: E-Commerce App with Auto-Scaling
Letโs build a scalable e-commerce backend:
# ๐๏ธ ecommerce_deployment.py
from kubernetes import client, config
import yaml
class EcommerceDeployment:
def __init__(self, namespace="ecommerce"):
config.load_kube_config()
self.apps_v1 = client.AppsV1Api()
self.autoscaling_v1 = client.AutoscalingV1Api()
self.namespace = namespace
def deploy_app(self, name, image, replicas=2):
# ๐จ Create deployment for microservice
deployment = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": name,
"labels": {"app": name, "tier": "backend"}
},
"spec": {
"replicas": replicas,
"selector": {"matchLabels": {"app": name}},
"template": {
"metadata": {"labels": {"app": name}},
"spec": {
"containers": [{
"name": name,
"image": image,
"ports": [{"containerPort": 8000}],
"resources": {
# ๐ Resource management
"requests": {
"memory": "128Mi",
"cpu": "100m"
},
"limits": {
"memory": "256Mi",
"cpu": "200m"
}
},
"env": [
{"name": "SERVICE_NAME", "value": name},
{"name": "ENVIRONMENT", "value": "production"}
]
}]
}
}
}
}
# ๐ Deploy it!
self.apps_v1.create_namespaced_deployment(
namespace=self.namespace,
body=deployment
)
print(f"โ
Deployed {name} successfully!")
# ๐ Create HorizontalPodAutoscaler
self.create_autoscaler(name)
def create_autoscaler(self, name):
# ๐ฏ Auto-scale based on CPU usage
hpa = client.V1HorizontalPodAutoscaler(
metadata=client.V1ObjectMeta(name=f"{name}-hpa"),
spec=client.V1HorizontalPodAutoscalerSpec(
scale_target_ref=client.V1CrossVersionObjectReference(
api_version="apps/v1",
kind="Deployment",
name=name
),
min_replicas=2,
max_replicas=10,
target_cpu_utilization_percentage=70
)
)
self.autoscaling_v1.create_namespaced_horizontal_pod_autoscaler(
namespace=self.namespace,
body=hpa
)
print(f"๐ Created autoscaler for {name}!")
def deploy_full_stack(self):
# ๐ Deploy all microservices
services = [
("cart-service", "myregistry/cart:1.0", 3),
("product-service", "myregistry/products:1.0", 2),
("order-service", "myregistry/orders:1.0", 2),
("payment-service", "myregistry/payment:1.0", 4)
]
for name, image, replicas in services:
self.deploy_app(name, image, replicas)
print(f"๐ {name} is up and running!")
# ๐ฎ Let's deploy!
ecommerce = EcommerceDeployment()
ecommerce.deploy_full_stack()
๐ฎ Example 2: Game Server Fleet Management
Managing game servers with intelligent deployment strategies:
# ๐ game_server_manager.py
from kubernetes import client, config
from datetime import datetime
import time
class GameServerFleet:
def __init__(self):
config.load_kube_config()
self.apps_v1 = client.AppsV1Api()
self.core_v1 = client.CoreV1Api()
def create_game_server(self, region, server_type="standard"):
# ๐ฎ Define game server deployment
deployment_name = f"game-server-{region}-{int(time.time())}"
# ๐ฏ Different configurations for different server types
configs = {
"standard": {"cpu": "500m", "memory": "1Gi", "players": 50},
"premium": {"cpu": "1000m", "memory": "2Gi", "players": 100},
"tournament": {"cpu": "2000m", "memory": "4Gi", "players": 200}
}
config = configs.get(server_type, configs["standard"])
deployment = client.V1Deployment(
metadata=client.V1ObjectMeta(
name=deployment_name,
labels={
"app": "game-server",
"region": region,
"type": server_type,
"created": datetime.now().isoformat()
}
),
spec=client.V1DeploymentSpec(
replicas=1,
selector=client.V1LabelSelector(
match_labels={"app": deployment_name}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": deployment_name}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="game-server",
image="myregistry/game-server:latest",
ports=[
client.V1ContainerPort(container_port=7777), # Game port
client.V1ContainerPort(container_port=8080) # API port
],
env=[
{"name": "SERVER_REGION", "value": region},
{"name": "MAX_PLAYERS", "value": str(config["players"])},
{"name": "SERVER_TYPE", "value": server_type}
],
resources=client.V1ResourceRequirements(
requests={
"cpu": config["cpu"],
"memory": config["memory"]
},
limits={
"cpu": config["cpu"],
"memory": config["memory"]
}
),
# ๐ฅ Health checks for game servers
liveness_probe=client.V1Probe(
tcp_socket=client.V1TCPSocketAction(port=7777),
initial_delay_seconds=60,
period_seconds=30
),
readiness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path="/api/ready",
port=8080
),
initial_delay_seconds=30,
period_seconds=10
)
)
],
# ๐ฏ Place game servers on dedicated nodes
node_selector={"node-type": "game-server"},
# ๐ก๏ธ Ensure servers are spread across nodes
affinity=client.V1Affinity(
pod_anti_affinity=client.V1PodAntiAffinity(
preferred_during_scheduling_ignored_during_execution=[
client.V1WeightedPodAffinityTerm(
weight=100,
pod_affinity_term=client.V1PodAffinityTerm(
label_selector=client.V1LabelSelector(
match_labels={"app": "game-server"}
),
topology_key="kubernetes.io/hostname"
)
)
]
)
)
)
)
)
)
# ๐ Deploy the game server
self.apps_v1.create_namespaced_deployment(
namespace="game-servers",
body=deployment
)
print(f"๐ฎ Game server {deployment_name} deployed in {region}!")
return deployment_name
def scale_for_event(self, event_type="normal"):
# ๐ Scale game servers based on events
scaling_profiles = {
"normal": {"us-east": 5, "eu-west": 3, "asia": 2},
"weekend": {"us-east": 10, "eu-west": 8, "asia": 5},
"tournament": {"us-east": 20, "eu-west": 15, "asia": 10}
}
profile = scaling_profiles.get(event_type, scaling_profiles["normal"])
for region, count in profile.items():
# ๐ฏ Deploy servers for each region
for i in range(count):
server_type = "tournament" if event_type == "tournament" else "standard"
self.create_game_server(region, server_type)
time.sleep(2) # Stagger deployments
print(f"๐ Scaled fleet for {event_type} event!")
def get_server_status(self):
# ๐ Monitor all game servers
deployments = self.apps_v1.list_namespaced_deployment(
namespace="game-servers",
label_selector="app=game-server"
)
print("๐ฎ Game Server Fleet Status:")
for deployment in deployments.items:
region = deployment.metadata.labels.get("region", "unknown")
server_type = deployment.metadata.labels.get("type", "standard")
ready = deployment.status.ready_replicas or 0
total = deployment.spec.replicas
status_emoji = "โ
" if ready == total else "โ ๏ธ"
print(f"{status_emoji} {deployment.metadata.name}: {ready}/{total} ready ({region}, {server_type})")
# ๐ฎ Manage your game server fleet!
fleet = GameServerFleet()
fleet.scale_for_event("tournament")
fleet.get_server_status()
๐ Advanced Concepts
๐งโโ๏ธ Advanced Deployment Strategies
When youโre ready to level up, implement sophisticated deployment patterns:
# ๐ฏ advanced_deployments.py
from kubernetes import client, config
import time
import hashlib
class AdvancedDeploymentManager:
def __init__(self):
config.load_kube_config()
self.apps_v1 = client.AppsV1Api()
self.core_v1 = client.CoreV1Api()
def blue_green_deployment(self, app_name, new_image, namespace="default"):
"""
๐ Blue-Green deployment strategy
"""
# ๐จ Create "green" deployment alongside existing "blue"
green_deployment = f"{app_name}-green"
blue_deployment = f"{app_name}-blue"
print(f"๐ Starting Blue-Green deployment for {app_name}")
# ๐ฆ Deploy green version
deployment = client.V1Deployment(
metadata=client.V1ObjectMeta(
name=green_deployment,
labels={"app": app_name, "version": "green"}
),
spec=client.V1DeploymentSpec(
replicas=3,
selector=client.V1LabelSelector(
match_labels={"app": app_name, "version": "green"}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": app_name, "version": "green"}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name=app_name,
image=new_image,
ports=[client.V1ContainerPort(container_port=8080)]
)
]
)
)
)
)
self.apps_v1.create_namespaced_deployment(namespace=namespace, body=deployment)
print("โ
Green deployment created")
# โณ Wait for green deployment to be ready
self.wait_for_deployment(green_deployment, namespace)
# ๐ฏ Switch traffic to green
self.switch_service_to_green(app_name, namespace)
# ๐๏ธ Clean up blue deployment
time.sleep(30) # Grace period
self.apps_v1.delete_namespaced_deployment(
name=blue_deployment,
namespace=namespace
)
print("๐ Blue-Green deployment completed!")
def canary_deployment(self, app_name, new_image, canary_percentage=10):
"""
๐ค Canary deployment with gradual rollout
"""
print(f"๐ค Starting Canary deployment for {app_name}")
# ๐ฏ Create canary deployment
canary_name = f"{app_name}-canary"
stable_name = f"{app_name}-stable"
# Calculate replicas based on percentage
total_replicas = 10
canary_replicas = max(1, total_replicas * canary_percentage // 100)
stable_replicas = total_replicas - canary_replicas
# ๐ Deploy canary version
canary_deployment = self.create_versioned_deployment(
canary_name, new_image, canary_replicas, "canary"
)
# ๐ Gradually increase canary traffic
steps = [10, 25, 50, 100]
for percentage in steps:
print(f"๐ Canary at {percentage}%")
canary_replicas = total_replicas * percentage // 100
stable_replicas = total_replicas - canary_replicas
# Update replica counts
self.scale_deployment(canary_name, canary_replicas)
self.scale_deployment(stable_name, stable_replicas)
# ๐ฅ Monitor metrics
if not self.check_canary_health(canary_name):
print("โ Canary failed health checks, rolling back!")
self.rollback_canary(app_name)
return False
time.sleep(60) # Monitor for 1 minute at each stage
print("โ
Canary deployment successful!")
return True
def create_versioned_deployment(self, name, image, replicas, version):
"""
๐๏ธ Create a deployment with version labels
"""
deployment = client.V1Deployment(
metadata=client.V1ObjectMeta(
name=name,
labels={"app": name.split("-")[0], "version": version}
),
spec=client.V1DeploymentSpec(
replicas=replicas,
selector=client.V1LabelSelector(
match_labels={"app": name.split("-")[0], "version": version}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": name.split("-")[0], "version": version},
annotations={
"prometheus.io/scrape": "true",
"prometheus.io/port": "9090"
}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="app",
image=image,
ports=[
client.V1ContainerPort(container_port=8080),
client.V1ContainerPort(container_port=9090, name="metrics")
],
# ๐ฏ Advanced probes
startup_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(path="/startup", port=8080),
failure_threshold=30,
period_seconds=10
),
liveness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(path="/health", port=8080),
initial_delay_seconds=0,
period_seconds=10,
timeout_seconds=5,
success_threshold=1,
failure_threshold=3
),
readiness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(path="/ready", port=8080),
initial_delay_seconds=0,
period_seconds=5,
timeout_seconds=3,
success_threshold=1,
failure_threshold=3
)
)
]
)
),
# ๐ Advanced update strategy
strategy=client.V1DeploymentStrategy(
type="RollingUpdate",
rolling_update=client.V1RollingUpdateDeployment(
max_surge="25%",
max_unavailable="0" # Zero downtime
)
),
# ๐ Track deployment progress
progress_deadline_seconds=600,
revision_history_limit=10
)
)
return self.apps_v1.create_namespaced_deployment(
namespace="default",
body=deployment
)
# ๐ช Use advanced deployments
manager = AdvancedDeploymentManager()
manager.canary_deployment("my-app", "myregistry/my-app:2.0")
๐๏ธ GitOps-Style Deployment Management
Implement GitOps patterns with Python:
# ๐ gitops_deployment.py
import git
import yaml
from kubernetes import client, config
import hashlib
import json
class GitOpsDeploymentManager:
def __init__(self, repo_url, branch="main"):
self.repo_url = repo_url
self.branch = branch
config.load_kube_config()
self.apps_v1 = client.AppsV1Api()
self.repo = None
def sync_deployments(self):
"""
๐ Sync deployments from Git repository
"""
# ๐ฅ Clone or update repository
self.update_repo()
# ๐ฏ Find all deployment files
deployment_files = self.find_deployment_files()
for file_path in deployment_files:
print(f"๐ Processing {file_path}")
self.apply_deployment(file_path)
def apply_deployment(self, file_path):
"""
๐ Apply deployment from YAML file with GitOps annotations
"""
with open(file_path, 'r') as f:
deployment_yaml = yaml.safe_load(f)
# ๐ท๏ธ Add GitOps metadata
deployment_yaml['metadata']['annotations'] = {
'gitops.io/repo': self.repo_url,
'gitops.io/branch': self.branch,
'gitops.io/path': file_path,
'gitops.io/commit': self.get_current_commit(),
'gitops.io/sync-time': datetime.now().isoformat()
}
# ๐ Check if deployment exists
name = deployment_yaml['metadata']['name']
namespace = deployment_yaml['metadata'].get('namespace', 'default')
try:
existing = self.apps_v1.read_namespaced_deployment(name, namespace)
# ๐ Update if changed
if self.has_changed(existing, deployment_yaml):
self.apps_v1.patch_namespaced_deployment(
name=name,
namespace=namespace,
body=deployment_yaml
)
print(f"โ
Updated deployment: {name}")
else:
print(f"โญ๏ธ No changes for: {name}")
except client.exceptions.ApiException as e:
if e.status == 404:
# ๐ Create new deployment
self.apps_v1.create_namespaced_deployment(
namespace=namespace,
body=deployment_yaml
)
print(f"โจ Created deployment: {name}")
else:
raise
def has_changed(self, existing, desired):
"""
๐ Check if deployment needs updating
"""
# Generate hash of spec
existing_hash = hashlib.sha256(
json.dumps(existing.spec.to_dict(), sort_keys=True).encode()
).hexdigest()
desired_hash = hashlib.sha256(
json.dumps(desired['spec'], sort_keys=True).encode()
).hexdigest()
return existing_hash != desired_hash
# ๐ฏ Implement GitOps workflow
gitops = GitOpsDeploymentManager("https://github.com/myorg/k8s-configs")
gitops.sync_deployments()
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Resource Limits Not Set
# โ Wrong way - no resource limits!
container = client.V1Container(
name="my-app",
image="myapp:latest"
# ๐ฅ This pod could consume all node resources!
)
# โ
Correct way - always set resource limits!
container = client.V1Container(
name="my-app",
image="myapp:latest",
resources=client.V1ResourceRequirements(
requests={
"memory": "256Mi", # ๐ฏ Minimum guaranteed
"cpu": "250m" # ๐ฏ 250 millicores
},
limits={
"memory": "512Mi", # ๐ก๏ธ Maximum allowed
"cpu": "500m" # ๐ก๏ธ Prevents resource hogging
}
)
)
๐คฏ Pitfall 2: Missing Health Checks
# โ Dangerous - no health checks!
container = client.V1Container(
name="api-server",
image="api:1.0",
ports=[client.V1ContainerPort(container_port=8080)]
)
# ๐ฅ Kubernetes won't know if your app is healthy!
# โ
Safe - comprehensive health checks!
container = client.V1Container(
name="api-server",
image="api:1.0",
ports=[client.V1ContainerPort(container_port=8080)],
# ๐ฅ Startup probe for slow-starting apps
startup_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path="/startup",
port=8080
),
failure_threshold=30,
period_seconds=10
),
# ๐ Liveness probe to detect hangs
liveness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path="/health",
port=8080
),
initial_delay_seconds=0,
period_seconds=10,
timeout_seconds=5,
failure_threshold=3
),
# โ
Readiness probe for load balancing
readiness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path="/ready",
port=8080
),
initial_delay_seconds=0,
period_seconds=5,
timeout_seconds=3,
failure_threshold=3
)
)
print("๐ Your app now has proper health monitoring!")
๐ ๏ธ Best Practices
- ๐ฏ Use Declarative Configuration: Define desired state, let Kubernetes handle the rest
- ๐ Set Resource Limits: Always specify CPU and memory limits to prevent resource starvation
- ๐ฅ Implement Health Checks: Use all three probe types (startup, liveness, readiness)
- ๐ท๏ธ Label Everything: Use consistent labeling for easier management and selection
- ๐ Rolling Updates: Configure proper update strategies to ensure zero downtime
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Multi-Environment Deployment System
Create a Python-based deployment system that manages applications across different environments:
๐ Requirements:
- โ Deploy to dev, staging, and production environments
- ๐ท๏ธ Environment-specific configurations (replicas, resources)
- ๐ค Implement RBAC for different teams
- ๐ Schedule deployments with maintenance windows
- ๐จ Each environment needs different scaling policies!
๐ Bonus Points:
- Add deployment approval workflow
- Implement automatic rollback on failures
- Create deployment metrics dashboard
๐ก Solution
๐ Click to see solution
# ๐ฏ Multi-environment deployment system!
from kubernetes import client, config
from datetime import datetime, timedelta
import time
from typing import Dict, List
import schedule
class MultiEnvironmentDeploymentSystem:
def __init__(self):
config.load_kube_config()
self.apps_v1 = client.AppsV1Api()
self.core_v1 = client.CoreV1Api()
self.rbac_v1 = client.RbacAuthorizationV1Api()
# ๐จ Environment configurations
self.environments = {
"dev": {
"namespace": "development",
"replicas": 1,
"cpu_request": "100m",
"memory_request": "128Mi",
"cpu_limit": "200m",
"memory_limit": "256Mi",
"autoscale": False
},
"staging": {
"namespace": "staging",
"replicas": 2,
"cpu_request": "250m",
"memory_request": "256Mi",
"cpu_limit": "500m",
"memory_limit": "512Mi",
"autoscale": True,
"min_replicas": 2,
"max_replicas": 5
},
"production": {
"namespace": "production",
"replicas": 3,
"cpu_request": "500m",
"memory_request": "512Mi",
"cpu_limit": "1000m",
"memory_limit": "1Gi",
"autoscale": True,
"min_replicas": 3,
"max_replicas": 20,
"maintenance_window": {
"start_hour": 2, # 2 AM
"end_hour": 4 # 4 AM
}
}
}
def deploy_to_environment(self, app_name: str, image: str, env: str):
"""
๐ Deploy application to specific environment
"""
if env not in self.environments:
raise ValueError(f"โ Unknown environment: {env}")
env_config = self.environments[env]
# ๐ก๏ธ Check maintenance window for production
if env == "production" and not self.in_maintenance_window(env_config):
print("โฐ Waiting for maintenance window...")
self.wait_for_maintenance_window(env_config)
print(f"๐ Deploying {app_name} to {env} environment")
# ๐ฆ Create deployment
deployment = client.V1Deployment(
metadata=client.V1ObjectMeta(
name=f"{app_name}-{env}",
namespace=env_config["namespace"],
labels={
"app": app_name,
"environment": env,
"version": self.extract_version(image),
"managed-by": "multi-env-system"
},
annotations={
"deployment.kubernetes.io/revision": "1",
"deployed-at": datetime.now().isoformat(),
"deployed-by": "python-deployment-system"
}
),
spec=client.V1DeploymentSpec(
replicas=env_config["replicas"],
selector=client.V1LabelSelector(
match_labels={"app": app_name, "environment": env}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": app_name, "environment": env}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name=app_name,
image=image,
ports=[client.V1ContainerPort(container_port=8080)],
env=self.get_environment_variables(env),
resources=client.V1ResourceRequirements(
requests={
"cpu": env_config["cpu_request"],
"memory": env_config["memory_request"]
},
limits={
"cpu": env_config["cpu_limit"],
"memory": env_config["memory_limit"]
}
),
# ๐ฅ Environment-specific probes
liveness_probe=self.get_probe_for_env(env, "liveness"),
readiness_probe=self.get_probe_for_env(env, "readiness")
)
],
# ๐ฏ Production gets pod disruption budget
**self.get_pod_spec_extras(env)
)
),
strategy=self.get_deployment_strategy(env)
)
)
# ๐ Create or update deployment
try:
self.apps_v1.create_namespaced_deployment(
namespace=env_config["namespace"],
body=deployment
)
print(f"โ
Created deployment in {env}")
except client.exceptions.ApiException as e:
if e.status == 409: # Already exists
self.apps_v1.patch_namespaced_deployment(
name=f"{app_name}-{env}",
namespace=env_config["namespace"],
body=deployment
)
print(f"โ
Updated deployment in {env}")
else:
raise
# ๐ Create autoscaler if needed
if env_config.get("autoscale", False):
self.create_autoscaler(app_name, env, env_config)
# ๐ก๏ธ Set up RBAC for the environment
self.setup_rbac(app_name, env)
# ๐ Monitor deployment progress
self.monitor_deployment(app_name, env, env_config["namespace"])
def get_environment_variables(self, env: str) -> List[client.V1EnvVar]:
"""
๐ Environment-specific variables
"""
base_vars = [
client.V1EnvVar(name="ENVIRONMENT", value=env),
client.V1EnvVar(name="LOG_LEVEL", value="DEBUG" if env == "dev" else "INFO"),
client.V1EnvVar(name="METRICS_ENABLED", value="true" if env != "dev" else "false")
]
if env == "production":
base_vars.extend([
client.V1EnvVar(name="ENABLE_PROFILING", value="true"),
client.V1EnvVar(name="ALERT_CHANNEL", value="#prod-alerts")
])
return base_vars
def get_probe_for_env(self, env: str, probe_type: str) -> client.V1Probe:
"""
๐ฅ Environment-specific health checks
"""
if env == "dev":
# Relaxed probes for development
return client.V1Probe(
http_get=client.V1HTTPGetAction(path="/health", port=8080),
initial_delay_seconds=30,
period_seconds=30,
failure_threshold=5
)
elif env == "production":
# Strict probes for production
return client.V1Probe(
http_get=client.V1HTTPGetAction(path="/health", port=8080),
initial_delay_seconds=10,
period_seconds=5,
timeout_seconds=3,
failure_threshold=2,
success_threshold=2 if probe_type == "readiness" else 1
)
else:
# Standard probes for staging
return client.V1Probe(
http_get=client.V1HTTPGetAction(path="/health", port=8080),
initial_delay_seconds=15,
period_seconds=10,
failure_threshold=3
)
def setup_rbac(self, app_name: str, env: str):
"""
๐ค Set up role-based access control
"""
# ๐ฏ Define roles for each environment
role_rules = {
"dev": [
client.V1PolicyRule(
api_groups=["*"],
resources=["*"],
verbs=["*"] # Developers have full access in dev
)
],
"staging": [
client.V1PolicyRule(
api_groups=["apps", ""],
resources=["deployments", "pods", "services"],
verbs=["get", "list", "watch", "update", "patch"]
)
],
"production": [
client.V1PolicyRule(
api_groups=["apps", ""],
resources=["deployments", "pods", "services"],
verbs=["get", "list", "watch"] # Read-only in prod
)
]
}
role_name = f"{app_name}-{env}-role"
# Create role
role = client.V1Role(
metadata=client.V1ObjectMeta(name=role_name),
rules=role_rules.get(env, role_rules["production"])
)
try:
self.rbac_v1.create_namespaced_role(
namespace=self.environments[env]["namespace"],
body=role
)
print(f"๐ Created RBAC role for {env}")
except client.exceptions.ApiException:
pass # Role might already exist
def promote_deployment(self, app_name: str, from_env: str, to_env: str):
"""
๐ Promote deployment from one environment to another
"""
print(f"๐ฆ Promoting {app_name} from {from_env} to {to_env}")
# Get source deployment
source_deployment = self.apps_v1.read_namespaced_deployment(
name=f"{app_name}-{from_env}",
namespace=self.environments[from_env]["namespace"]
)
# Extract image
image = source_deployment.spec.template.spec.containers[0].image
# Deploy to target environment
self.deploy_to_environment(app_name, image, to_env)
print(f"โ
Successfully promoted {app_name} to {to_env}!")
def rollback_deployment(self, app_name: str, env: str, revision: int = None):
"""
๐ Rollback deployment to previous version
"""
deployment_name = f"{app_name}-{env}"
namespace = self.environments[env]["namespace"]
if revision is None:
# Get previous revision
deployment = self.apps_v1.read_namespaced_deployment(deployment_name, namespace)
revision = int(deployment.metadata.annotations.get("deployment.kubernetes.io/revision", "1")) - 1
print(f"๐ Rolling back {deployment_name} to revision {revision}")
# Perform rollback
body = {
"kind": "DeploymentRollback",
"apiVersion": "apps/v1beta1",
"name": deployment_name,
"rollback": {"revision": revision}
}
self.apps_v1.create_namespaced_deployment_rollback(
name=deployment_name,
namespace=namespace,
body=body
)
print(f"โ
Rollback completed!")
# ๐ฎ Test the multi-environment system!
deployment_system = MultiEnvironmentDeploymentSystem()
# Deploy to dev first
deployment_system.deploy_to_environment("my-app", "myregistry/my-app:1.0", "dev")
# After testing, promote to staging
deployment_system.promote_deployment("my-app", "dev", "staging")
# Finally promote to production (will wait for maintenance window)
deployment_system.promote_deployment("my-app", "staging", "production")
# Check deployment status
deployment_system.get_deployment_status("my-app", "production")
๐ Key Takeaways
Youโve mastered Kubernetes Deployments! Hereโs what you can now do:
- โ Create and manage Deployments with Pythonโs kubernetes client ๐ช
- โ Implement advanced deployment strategies like Blue-Green and Canary ๐ก๏ธ
- โ Set up auto-scaling for handling traffic spikes ๐ฏ
- โ Configure health checks to ensure high availability ๐
- โ Build multi-environment deployment systems with RBAC! ๐
Remember: Kubernetes Deployments are your Swiss Army knife for managing containerized applications. They handle the complexity so you can focus on building great software! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve conquered Kubernetes Deployments!
Hereโs what to do next:
- ๐ป Practice with the exercises above using a local Kubernetes cluster (minikube or kind)
- ๐๏ธ Build a complete CI/CD pipeline that uses these deployment strategies
- ๐ Move on to our next tutorial: Kubernetes Services and Networking
- ๐ Share your Kubernetes journey with the community!
Remember: Every Kubernetes expert started where you are now. Keep deploying, keep learning, and most importantly, have fun! ๐
Happy Kubernetes adventures! ๐๐โจ