Files
simulation-project/src/simulation.py
2025-05-06 11:56:51 +02:00

318 lines
12 KiB
Python

import random
import os
import time
from heapq import heappush, heappop
import matplotlib.pyplot as plt
from scipy.stats import t
import numpy as np
from multiprocessing import Pool
class Event:
def __init__(self, event_type, request):
self.event_type = event_type # 'request', 'router_finish', 'process_finish'
self.request = request
class Request:
def __init__(self, category, arrival_time):
self.category = category
self.arrival_time = arrival_time
class Simulation:
def __init__(self, C, lambda_val):
# C clusters of K servers
self.C = C
self.K = 12 // C
self.occupied_servers = [0] * self.C
# service rate exponential distribution parameter
service_rates = {1: 4/20, 2:7/20, 3:10/20, 6:14/20}
self.service_rate = service_rates[C]
# router request processing time
self.router_processing_time = (C - 1) / C
# λ
self.lambda_val = lambda_val
self.router_state = 'idle' # 'idle', 'processing', 'blocked'
self.event_queue = [] # (time, Event)
self.current_time = 0.0
self.router_queue = []
self.total_requests = 0
self.lost_requests = 0
self.loss_rate = 0
self.response_times = []
def next_request(self):
# exponential distribution, parameter λ
interval = random.expovariate(self.lambda_val)
new_time = self.current_time + interval
arrival_time = new_time
category = random.randint(0, self.C-1) if self.C>1 else 0
request = Request(category, arrival_time)
request_event = Event("request", request)
heappush(self.event_queue, (arrival_time, request_event))
def handle_request(self, request, max_loss_value):
self.total_requests += 1
if len(self.router_queue) == 0 and self.router_state == "idle":
self.router_process(request)
elif ((len(self.router_queue) + (self.router_state == "processing")) < 100):
self.router_queue.append(request)
else:
self.lost_requests += 1
self.loss_rate = self.lost_requests / self.total_requests
if self.loss_rate > max_loss_value :
raise ValueError("lossrate too high")
def router_process(self, request):
if self.router_state == "idle":
self.router_state = 'processing'
router_finish = Event("router_finish", request)
finish_time = self.current_time + self.router_processing_time
heappush(self.event_queue, (finish_time, router_finish))
else:
raise RuntimeError("shouldn't reach this branch")
def router_process_finish(self, request):
# send the request to a free server
if self.occupied_servers[request.category] < self.K:
self.router_state = "idle"
self.occupied_servers[request.category] += 1
self.process_request(request)
else:
self.router_state = "blocked"
self.router_queue.insert(0, request)
# router process next request
self.router_process_next()
def router_process_next(self):
if (len(self.router_queue) > 0) and (self.router_state == "idle"):
self.router_process(self.router_queue.pop(0))
def process_request(self, request):
interval = random.expovariate(self.service_rate)
finish_time = self.current_time + interval
process_finish = Event("process_finish", request)
heappush(self.event_queue, (finish_time, process_finish))
def process_request_finish(self, request):
self.response_times.append(self.current_time - request.arrival_time)
self.occupied_servers[request.category] -= 1
if (self.router_state == "blocked") and (request.category == self.router_queue[0].category):
self.process_request(self.router_queue.pop(0))
self.occupied_servers[request.category] += 1
self.router_state = "idle"
self.router_process_next()
def run(self, max_time, max_loss_value):
# first request
self.next_request()
while (self.current_time <= max_time):
current_event = heappop(self.event_queue)
self.current_time = current_event[0]
match current_event[1].event_type:
case "request":
self.next_request()
self.handle_request(current_event[1].request, max_loss_value)
case "router_finish":
self.router_process_finish(current_event[1].request)
case "process_finish":
self.process_request_finish(current_event[1].request)
case _ :
raise RuntimeError("shouldn't reach this branch")
def run_single_simulation(args):
c, lambda_val, simulation_time, max_loss_value = args
# for different seed in each process
random.seed(time.time() + os.getpid())
try:
sim = Simulation(c, lambda_val)
sim.run(simulation_time, max_loss_value)
if len(sim.response_times) > 0:
run_mean = sum(sim.response_times) / len(sim.response_times)
loss_rate = sim.loss_rate
return (run_mean, loss_rate)
else:
return None
except ValueError: # Loss rate too high
return None
def simulation_wrapper1(simulation_time, num_runs, min_runs, confidence_level, lambda_vals, ax_rt):
C_values = [1, 2, 3, 6]
mean_at_lambda1 = {}
loss_data = {}
with Pool() as pool: # pool of workers
for c in C_values:
lambda_points = []
means = []
ci_lower = []
ci_upper = []
losses = []
print(f"\nProcessing C={c}")
for lambda_val in lambda_vals:
# run num_runs simulation for each lambda
args_list = [(c, lambda_val, simulation_time, 0.05) for _ in range(num_runs)]
results = pool.map(run_single_simulation, args_list)
# collect results from successful simulations
successful_results = [res for res in results if res is not None]
run_results = [res[0] for res in successful_results]
loss_rates = [res[1] for res in successful_results]
n = len(run_results)
# reject if not enough successful run
if n >= min_runs:
# statistics
mean_rt = np.mean(run_results)
std_dev = np.std(run_results, ddof=1)
# confidence interval
t_value = t.ppf((1 + confidence_level)/2, n-1)
ci = t_value * std_dev / np.sqrt(n)
# loss rate
mean_loss = np.mean(loss_rates)
# store results
lambda_points.append(lambda_val)
means.append(mean_rt)
ci_lower.append(mean_rt - ci)
ci_upper.append(mean_rt + ci)
losses.append(mean_loss)
print(f"C={c}, λ={lambda_val:.2f}, Mean RT={mean_rt:.2f} ± {ci:.2f}, Mean Loss Rate={mean_loss:.2%}")
elif n > 0:
print(f"λ={lambda_val:.2f} skipped - only {n} successful run(s)")
continue
else:
print(f"Stopped at λ={lambda_val:.2f} - no successful run")
break
ax_rt.plot(lambda_points, means, label=f'C={c}')
ax_rt.fill_between(lambda_points, ci_lower, ci_upper, alpha=0.2)
# store response time for lamba = 1
if 1 in lambda_points:
idx = lambda_points.index(1)
mean_at_lambda1[c] = (means[idx], ci_lower[idx], ci_upper[idx])
loss_data[c] = (np.array(lambda_points), np.array(losses))
# plot curves
if len(lambda_vals)>1:
ax_rt.set_xlim(left=0)
ax_rt.set_xlabel('Arrival Rate (λ)')
ax_rt.set_ylabel('Mean Response Time')
ax_rt.set_title(f'Mean Response Time vs Arrival Rate ({num_runs} runs, {int(confidence_level*100)}% CI)')
ax_rt.legend()
ax_rt.grid(True)
return loss_data, mean_at_lambda1
def simulation_wrapper2(simulation_time, num_runs, min_runs, confidence_level, lambda_vals, max_loss_value=0.1):
fig, (ax_rt, ax_loss) = plt.subplots(2,1, figsize=(12,10), gridspec_kw={'height_ratios': [4, 3], 'hspace': 0.4}, sharex=False)
loss_data, mean_at_lambda1 = simulation_wrapper1(simulation_time, num_runs, min_runs, confidence_level, lambda_vals, ax_rt)
if len(lambda_vals) > 1:
for c, (lams, losses) in loss_data.items():
lambda_vals2 = []
if len(lams)==0:
print(f"C={c}: no data at all.")
continue
pos_idx = np.where(losses > 0)[0]
if pos_idx.size > 0:
first_lambda = lams[pos_idx[0]]
else:
first_lambda = None
last_lambda = lams[-1]
print(f"C={c:>1} first λ with loss > 0: "
f"{first_lambda if first_lambda is not None else 'never'}; "
f" last λ with data: {last_lambda}")
lambda_vals2.extend([last_lambda + i/1000 for i in range(-100,51)])
lambda_points = []
losses = []
ci_lower = []
ci_upper = []
print(f"\nProcessing C={c}")
with Pool() as pool:
for lambda_val in lambda_vals2:
args_list = [(c, lambda_val, simulation_time, max_loss_value) for _ in range(num_runs)]
results = pool.map(run_single_simulation, args_list)
successful_results = [res for res in results if res is not None]
loss_rates = [res[1] for res in successful_results]
n = len(loss_rates)
if n >= min_runs:
# statistics
mean_loss = np.mean(loss_rates)
std_dev = np.std(loss_rates, ddof=1)
# confidence interval
t_value = t.ppf((1 + confidence_level)/2, n-1)
ci = t_value * std_dev / np.sqrt(n)
# store results
lambda_points.append(lambda_val)
losses.append(mean_loss)
ci_lower.append(mean_loss - ci)
ci_upper.append(mean_loss + ci)
print(f"C={c}, λ={lambda_val:.4f}, Mean Loss Rate={mean_loss:.2%} ± {ci:.2f}")
elif n > 0:
print(f"λ={lambda_val:.4f} skipped - only {n} successful run(s)")
continue
else:
print(f"Stopped at λ={lambda_val:.4f} - no successful run")
break
ax_loss.plot(lambda_points, losses, label=f'C={c}')
ax_loss.fill_between(lambda_points, ci_lower, ci_upper, alpha=0.2)
# determine optimal C for lamba = 1
if mean_at_lambda1:
sorted_C = sorted(mean_at_lambda1.items(), key=lambda item: item[1][0])
(best_C, (mean1, lower1, upper1)), (_, (_, lower2, _)) = sorted_C[:2]
if upper1 < lower2:
print(f"Optimal C at λ=1 is {best_C} with Mean RT = {mean1:.2f} (non-overlapping CIs)")
else:
print("Confidence intervals overlap between the two best C.")
else:
print("\nNo valid λ=1 data for any C.")
#plot curves
if len(lambda_vals) > 1:
ax_loss.set_xlim(left=2)
ax_loss.set_xlabel('Arrival Rate (λ)')
ax_loss.set_ylabel('Mean Loss Rate')
ax_loss.set_title(f'Mean Loss Rate vs Arrival Rate ({num_runs} runs, {int(confidence_level*100)}% CI)')
ax_loss.legend()
ax_loss.grid(True)
plt.show()