separate files

This commit is contained in:
2025-05-06 09:12:01 +02:00
parent 29ea7b229e
commit ba7b0fa91f
3 changed files with 225 additions and 220 deletions

1
src/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
__pycache__/

View File

@@ -1,224 +1,14 @@
#!/bin/python3
import random
import os
import time
from heapq import heappush, heappop
import matplotlib.pyplot as plt
from scipy.stats import t
import numpy as np
from multiprocessing import Pool
import argparse
from simulation import simulation_wrapper
class Event:
def __init__(self, event_type, request):
self.event_type = event_type # 'request', 'router_finish', 'process_finish'
self.request = request
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Simulation server cluster')
parser.add_argument('--simulation_time', type=int, default=500, help='runtime of each individual simulation')
parser.add_argument('--num_runs', type=int, default=10, help='number of simulations to run with a fixed set of parameters')
parser.add_argument('--min_runs', type=int, default=5, help='minimum number of successful runs needed to calculate statistics')
parser.add_argument('--confidence_level', type=float, default=0.95, help='confidence level')
class Request:
def __init__(self, category, arrival_time):
self.category = category
self.arrival_time = arrival_time
class Simulation:
def __init__(self, C, lambda_val):
# C clusters of K servers
self.C = C
self.K = 12 // C
self.occupied_servers = [0] * self.C
# service rate exponential distribution parameter
service_rates = {1: 4/20, 2:7/20, 3:10/20, 6:14/20}
self.service_rate = service_rates[C]
# router request processing time
self.router_processing_time = (C - 1) / C
# λ
self.lambda_val = lambda_val
self.router_state = 'idle' # 'idle', 'processing', 'blocked'
self.event_queue = [] # (time, Event)
self.current_time = 0.0
self.router_queue = []
self.total_requests = 0
self.lost_requests = 0
self.loss_rate = 0
self.response_times = []
def next_request(self):
# exponential distribution, parameter λ
interval = random.expovariate(self.lambda_val)
new_time = self.current_time + interval
arrival_time = new_time
category = random.randint(0, self.C-1) if self.C>1 else 0
request = Request(category, arrival_time)
request_event = Event("request", request)
heappush(self.event_queue, (arrival_time, request_event))
def handle_request(self, request):
self.total_requests += 1
if len(self.router_queue) == 0 and self.router_state == "idle":
self.router_process(request)
elif ((len(self.router_queue) + (self.router_state == "processing")) < 100):
self.router_queue.append(request)
else:
self.lost_requests += 1
self.loss_rate = self.lost_requests / self.total_requests
if self.loss_rate > 0.05 :
raise ValueError("lossrate too high")
def router_process(self, request):
if self.router_state == "idle":
self.router_state = 'processing'
router_finish = Event("router_finish", request)
finish_time = self.current_time + self.router_processing_time
heappush(self.event_queue, (finish_time, router_finish))
else:
raise RuntimeError("shouldn't reach this branch")
def router_process_finish(self, request):
# send the request to a free server
if self.occupied_servers[request.category] < self.K:
self.router_state = "idle"
self.occupied_servers[request.category] += 1
self.process_request(request)
else:
self.router_state = "blocked"
self.router_queue.insert(0, request)
# router process next request
self.router_process_next()
def router_process_next(self):
if (len(self.router_queue) > 0) and (self.router_state == "idle"):
self.router_process(self.router_queue.pop(0))
def process_request(self, request):
interval = random.expovariate(self.service_rate)
finish_time = self.current_time + interval
process_finish = Event("process_finish", request)
heappush(self.event_queue, (finish_time, process_finish))
def process_request_finish(self, request):
self.response_times.append(self.current_time - request.arrival_time)
self.occupied_servers[request.category] -= 1
if (self.router_state == "blocked") and (request.category == self.router_queue[0].category):
self.process_request(self.router_queue.pop(0))
self.occupied_servers[request.category] += 1
self.router_state = "idle"
self.router_process_next()
def run(self, max_time):
# first request
self.next_request()
while (self.current_time <= max_time):
current_event = heappop(self.event_queue)
self.current_time = current_event[0]
match current_event[1].event_type:
case "request":
self.next_request()
self.handle_request(current_event[1].request)
case "router_finish":
self.router_process_finish(current_event[1].request)
case "process_finish":
self.process_request_finish(current_event[1].request)
case _ :
raise RuntimeError("shouldn't reach this branch")
def run_single_simulation(args):
c, lambda_val, simulation_time = args
# for different seed in each process
random.seed(time.time() + os.getpid())
try:
sim = Simulation(c, lambda_val)
sim.run(simulation_time)
if len(sim.response_times) > 0:
run_mean = sum(sim.response_times) / len(sim.response_times)
loss_rate = sim.loss_rate
return (run_mean, loss_rate)
else:
return None
except ValueError: # Loss rate too high
return None
def simulation_wrapper():
C_values = [1, 2, 3, 6]
simulation_time = 1000
num_runs = 12
min_runs = 5
confidence_level = 0.95
lambda_vals = [l/100 for l in range(1, 301)] # λ from 0.01 to 3.00
plt.figure(figsize=(12, 8))
with Pool() as pool: # pool of workers
for c in C_values:
lambda_points = []
means = []
ci_lower = []
ci_upper = []
print(f"\nProcessing C={c}")
for lambda_val in lambda_vals:
# run num_runs simulation for each lambda
args_list = [(c, lambda_val, simulation_time) for _ in range(num_runs)]
results = pool.map(run_single_simulation, args_list)
# collect results from successful simulations
successful_results = [res for res in results if res is not None]
run_results = [res[0] for res in successful_results]
loss_rates = [res[1] for res in successful_results]
# reject if not enough successful run
if len(run_results) >= min_runs:
# statistics
mean_rt = np.mean(run_results)
std_dev = np.std(run_results, ddof=1)
n = len(run_results)
# confidence interval
t_value = t.ppf((1 + confidence_level)/2, n-1)
ci = t_value * std_dev / np.sqrt(n)
# loss rate
mean_loss = np.mean(loss_rates)
# store results
lambda_points.append(lambda_val)
means.append(mean_rt)
ci_lower.append(mean_rt - ci)
ci_upper.append(mean_rt + ci)
print(f"C={c}, λ={lambda_val:.2f}, Mean RT={mean_rt:.2f} ± {ci:.2f}, Loss Rate={mean_loss:.2%}")
elif len(run_results) > 0:
print(f"λ={lambda_val:.2f} skipped - only {len(run_results)} successful run(s)")
continue
else:
print(f"Stopped at λ={lambda_val:.2f} - no successful run")
break
plt.plot(lambda_points, means, label=f'C={c}')
plt.fill_between(lambda_points, ci_lower, ci_upper, alpha=0.2)
plt.xlabel('Arrival Rate (λ)')
plt.ylabel('Mean Response Time')
plt.title(f'Mean Response Time vs Arrival Rate ({num_runs} runs, 95% CI)')
plt.legend()
plt.grid(True)
plt.show()
if __name__ == '__main__':
simulation_wrapper()
args = parser.parse_args()
simulation_wrapper(args.simulation_time, args.num_runs, args.min_runs, args.confidence_level)

214
src/simulation.py Normal file
View File

@@ -0,0 +1,214 @@
import random
import os
import time
from heapq import heappush, heappop
import matplotlib.pyplot as plt
from scipy.stats import t
import numpy as np
from multiprocessing import Pool
class Event:
def __init__(self, event_type, request):
self.event_type = event_type # 'request', 'router_finish', 'process_finish'
self.request = request
class Request:
def __init__(self, category, arrival_time):
self.category = category
self.arrival_time = arrival_time
class Simulation:
def __init__(self, C, lambda_val):
# C clusters of K servers
self.C = C
self.K = 12 // C
self.occupied_servers = [0] * self.C
# service rate exponential distribution parameter
service_rates = {1: 4/20, 2:7/20, 3:10/20, 6:14/20}
self.service_rate = service_rates[C]
# router request processing time
self.router_processing_time = (C - 1) / C
# λ
self.lambda_val = lambda_val
self.router_state = 'idle' # 'idle', 'processing', 'blocked'
self.event_queue = [] # (time, Event)
self.current_time = 0.0
self.router_queue = []
self.total_requests = 0
self.lost_requests = 0
self.loss_rate = 0
self.response_times = []
def next_request(self):
# exponential distribution, parameter λ
interval = random.expovariate(self.lambda_val)
new_time = self.current_time + interval
arrival_time = new_time
category = random.randint(0, self.C-1) if self.C>1 else 0
request = Request(category, arrival_time)
request_event = Event("request", request)
heappush(self.event_queue, (arrival_time, request_event))
def handle_request(self, request):
self.total_requests += 1
if len(self.router_queue) == 0 and self.router_state == "idle":
self.router_process(request)
elif ((len(self.router_queue) + (self.router_state == "processing")) < 100):
self.router_queue.append(request)
else:
self.lost_requests += 1
self.loss_rate = self.lost_requests / self.total_requests
if self.loss_rate > 0.05 :
raise ValueError("lossrate too high")
def router_process(self, request):
if self.router_state == "idle":
self.router_state = 'processing'
router_finish = Event("router_finish", request)
finish_time = self.current_time + self.router_processing_time
heappush(self.event_queue, (finish_time, router_finish))
else:
raise RuntimeError("shouldn't reach this branch")
def router_process_finish(self, request):
# send the request to a free server
if self.occupied_servers[request.category] < self.K:
self.router_state = "idle"
self.occupied_servers[request.category] += 1
self.process_request(request)
else:
self.router_state = "blocked"
self.router_queue.insert(0, request)
# router process next request
self.router_process_next()
def router_process_next(self):
if (len(self.router_queue) > 0) and (self.router_state == "idle"):
self.router_process(self.router_queue.pop(0))
def process_request(self, request):
interval = random.expovariate(self.service_rate)
finish_time = self.current_time + interval
process_finish = Event("process_finish", request)
heappush(self.event_queue, (finish_time, process_finish))
def process_request_finish(self, request):
self.response_times.append(self.current_time - request.arrival_time)
self.occupied_servers[request.category] -= 1
if (self.router_state == "blocked") and (request.category == self.router_queue[0].category):
self.process_request(self.router_queue.pop(0))
self.occupied_servers[request.category] += 1
self.router_state = "idle"
self.router_process_next()
def run(self, max_time):
# first request
self.next_request()
while (self.current_time <= max_time):
current_event = heappop(self.event_queue)
self.current_time = current_event[0]
match current_event[1].event_type:
case "request":
self.next_request()
self.handle_request(current_event[1].request)
case "router_finish":
self.router_process_finish(current_event[1].request)
case "process_finish":
self.process_request_finish(current_event[1].request)
case _ :
raise RuntimeError("shouldn't reach this branch")
def run_single_simulation(args):
c, lambda_val, simulation_time = args
# for different seed in each process
random.seed(time.time() + os.getpid())
try:
sim = Simulation(c, lambda_val)
sim.run(simulation_time)
if len(sim.response_times) > 0:
run_mean = sum(sim.response_times) / len(sim.response_times)
loss_rate = sim.loss_rate
return (run_mean, loss_rate)
else:
return None
except ValueError: # Loss rate too high
return None
def simulation_wrapper(simulation_time, num_runs, min_runs, confidence_level):
C_values = [1, 2, 3, 6]
lambda_vals = [l/100 for l in range(1, 301)] # λ from 0.01 to 3.00
plt.figure(figsize=(12, 8))
with Pool() as pool: # pool of workers
for c in C_values:
lambda_points = []
means = []
ci_lower = []
ci_upper = []
print(f"\nProcessing C={c}")
for lambda_val in lambda_vals:
# run num_runs simulation for each lambda
args_list = [(c, lambda_val, simulation_time) for _ in range(num_runs)]
results = pool.map(run_single_simulation, args_list)
# collect results from successful simulations
successful_results = [res for res in results if res is not None]
run_results = [res[0] for res in successful_results]
loss_rates = [res[1] for res in successful_results]
# reject if not enough successful run
if len(run_results) >= min_runs:
# statistics
mean_rt = np.mean(run_results)
std_dev = np.std(run_results, ddof=1)
n = len(run_results)
# confidence interval
t_value = t.ppf((1 + confidence_level)/2, n-1)
ci = t_value * std_dev / np.sqrt(n)
# loss rate
mean_loss = np.mean(loss_rates)
# store results
lambda_points.append(lambda_val)
means.append(mean_rt)
ci_lower.append(mean_rt - ci)
ci_upper.append(mean_rt + ci)
print(f"C={c}, λ={lambda_val:.2f}, Mean RT={mean_rt:.2f} ± {ci:.2f}, Loss Rate={mean_loss:.2%}")
elif len(run_results) > 0:
print(f"λ={lambda_val:.2f} skipped - only {len(run_results)} successful run(s)")
continue
else:
print(f"Stopped at λ={lambda_val:.2f} - no successful run")
break
plt.plot(lambda_points, means, label=f'C={c}')
plt.fill_between(lambda_points, ci_lower, ci_upper, alpha=0.2)
plt.xlabel('Arrival Rate (λ)')
plt.ylabel('Mean Response Time')
plt.title(f'Mean Response Time vs Arrival Rate ({num_runs} runs, 95% CI)')
plt.legend()
plt.grid(True)
plt.show()