diff --git a/src/.gitignore b/src/.gitignore new file mode 100644 index 0000000..c18dd8d --- /dev/null +++ b/src/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/src/main.py b/src/main.py index 8bab16d..369c53b 100644 --- a/src/main.py +++ b/src/main.py @@ -1,224 +1,14 @@ #!/bin/python3 -import random -import os -import time -from heapq import heappush, heappop -import matplotlib.pyplot as plt -from scipy.stats import t -import numpy as np -from multiprocessing import Pool +import argparse +from simulation import simulation_wrapper -class Event: - def __init__(self, event_type, request): - self.event_type = event_type # 'request', 'router_finish', 'process_finish' - self.request = request +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Simulation server cluster') + parser.add_argument('--simulation_time', type=int, default=500, help='runtime of each individual simulation') + parser.add_argument('--num_runs', type=int, default=10, help='number of simulations to run with a fixed set of parameters') + parser.add_argument('--min_runs', type=int, default=5, help='minimum number of successful runs needed to calculate statistics') + parser.add_argument('--confidence_level', type=float, default=0.95, help='confidence level') -class Request: - def __init__(self, category, arrival_time): - self.category = category - self.arrival_time = arrival_time - - -class Simulation: - def __init__(self, C, lambda_val): - # C clusters of K servers - self.C = C - self.K = 12 // C - self.occupied_servers = [0] * self.C - # service rate exponential distribution parameter - service_rates = {1: 4/20, 2:7/20, 3:10/20, 6:14/20} - self.service_rate = service_rates[C] - # router request processing time - self.router_processing_time = (C - 1) / C - # λ - self.lambda_val = lambda_val - - self.router_state = 'idle' # 'idle', 'processing', 'blocked' - - self.event_queue = [] # (time, Event) - self.current_time = 0.0 - - self.router_queue = [] - - self.total_requests = 0 - self.lost_requests = 0 - self.loss_rate = 0 - self.response_times = [] - - def next_request(self): - # exponential distribution, parameter λ - interval = random.expovariate(self.lambda_val) - new_time = self.current_time + interval - arrival_time = new_time - - category = random.randint(0, self.C-1) if self.C>1 else 0 - - request = Request(category, arrival_time) - request_event = Event("request", request) - - heappush(self.event_queue, (arrival_time, request_event)) - - def handle_request(self, request): - self.total_requests += 1 - if len(self.router_queue) == 0 and self.router_state == "idle": - self.router_process(request) - elif ((len(self.router_queue) + (self.router_state == "processing")) < 100): - self.router_queue.append(request) - else: - self.lost_requests += 1 - self.loss_rate = self.lost_requests / self.total_requests - if self.loss_rate > 0.05 : - raise ValueError("lossrate too high") - - def router_process(self, request): - if self.router_state == "idle": - self.router_state = 'processing' - router_finish = Event("router_finish", request) - finish_time = self.current_time + self.router_processing_time - heappush(self.event_queue, (finish_time, router_finish)) - else: - raise RuntimeError("shouldn't reach this branch") - - - def router_process_finish(self, request): - # send the request to a free server - if self.occupied_servers[request.category] < self.K: - self.router_state = "idle" - self.occupied_servers[request.category] += 1 - self.process_request(request) - else: - self.router_state = "blocked" - self.router_queue.insert(0, request) - - # router process next request - self.router_process_next() - - def router_process_next(self): - if (len(self.router_queue) > 0) and (self.router_state == "idle"): - self.router_process(self.router_queue.pop(0)) - - def process_request(self, request): - interval = random.expovariate(self.service_rate) - finish_time = self.current_time + interval - process_finish = Event("process_finish", request) - heappush(self.event_queue, (finish_time, process_finish)) - - def process_request_finish(self, request): - self.response_times.append(self.current_time - request.arrival_time) - self.occupied_servers[request.category] -= 1 - - if (self.router_state == "blocked") and (request.category == self.router_queue[0].category): - self.process_request(self.router_queue.pop(0)) - self.occupied_servers[request.category] += 1 - self.router_state = "idle" - self.router_process_next() - - - def run(self, max_time): - # first request - self.next_request() - - while (self.current_time <= max_time): - current_event = heappop(self.event_queue) - self.current_time = current_event[0] - match current_event[1].event_type: - case "request": - self.next_request() - self.handle_request(current_event[1].request) - case "router_finish": - self.router_process_finish(current_event[1].request) - case "process_finish": - self.process_request_finish(current_event[1].request) - case _ : - raise RuntimeError("shouldn't reach this branch") - - - -def run_single_simulation(args): - c, lambda_val, simulation_time = args - # for different seed in each process - random.seed(time.time() + os.getpid()) - try: - sim = Simulation(c, lambda_val) - sim.run(simulation_time) - if len(sim.response_times) > 0: - run_mean = sum(sim.response_times) / len(sim.response_times) - loss_rate = sim.loss_rate - return (run_mean, loss_rate) - else: - return None - except ValueError: # Loss rate too high - return None - -def simulation_wrapper(): - C_values = [1, 2, 3, 6] - simulation_time = 1000 - num_runs = 12 - min_runs = 5 - confidence_level = 0.95 - - lambda_vals = [l/100 for l in range(1, 301)] # λ from 0.01 to 3.00 - - plt.figure(figsize=(12, 8)) - - with Pool() as pool: # pool of workers - for c in C_values: - lambda_points = [] - means = [] - ci_lower = [] - ci_upper = [] - print(f"\nProcessing C={c}") - - for lambda_val in lambda_vals: - # run num_runs simulation for each lambda - args_list = [(c, lambda_val, simulation_time) for _ in range(num_runs)] - results = pool.map(run_single_simulation, args_list) - - # collect results from successful simulations - successful_results = [res for res in results if res is not None] - run_results = [res[0] for res in successful_results] - loss_rates = [res[1] for res in successful_results] - - # reject if not enough successful run - if len(run_results) >= min_runs: - # statistics - mean_rt = np.mean(run_results) - std_dev = np.std(run_results, ddof=1) - n = len(run_results) - - # confidence interval - t_value = t.ppf((1 + confidence_level)/2, n-1) - ci = t_value * std_dev / np.sqrt(n) - - # loss rate - mean_loss = np.mean(loss_rates) - - # store results - lambda_points.append(lambda_val) - means.append(mean_rt) - ci_lower.append(mean_rt - ci) - ci_upper.append(mean_rt + ci) - - print(f"C={c}, λ={lambda_val:.2f}, Mean RT={mean_rt:.2f} ± {ci:.2f}, Loss Rate={mean_loss:.2%}") - elif len(run_results) > 0: - print(f"λ={lambda_val:.2f} skipped - only {len(run_results)} successful run(s)") - continue - else: - print(f"Stopped at λ={lambda_val:.2f} - no successful run") - break - - - plt.plot(lambda_points, means, label=f'C={c}') - plt.fill_between(lambda_points, ci_lower, ci_upper, alpha=0.2) - - plt.xlabel('Arrival Rate (λ)') - plt.ylabel('Mean Response Time') - plt.title(f'Mean Response Time vs Arrival Rate ({num_runs} runs, 95% CI)') - plt.legend() - plt.grid(True) - plt.show() - - -if __name__ == '__main__': - simulation_wrapper() + args = parser.parse_args() + simulation_wrapper(args.simulation_time, args.num_runs, args.min_runs, args.confidence_level) diff --git a/src/simulation.py b/src/simulation.py new file mode 100644 index 0000000..ed2f221 --- /dev/null +++ b/src/simulation.py @@ -0,0 +1,214 @@ +import random +import os +import time +from heapq import heappush, heappop +import matplotlib.pyplot as plt +from scipy.stats import t +import numpy as np +from multiprocessing import Pool + +class Event: + def __init__(self, event_type, request): + self.event_type = event_type # 'request', 'router_finish', 'process_finish' + self.request = request + +class Request: + def __init__(self, category, arrival_time): + self.category = category + self.arrival_time = arrival_time + + +class Simulation: + def __init__(self, C, lambda_val): + # C clusters of K servers + self.C = C + self.K = 12 // C + self.occupied_servers = [0] * self.C + # service rate exponential distribution parameter + service_rates = {1: 4/20, 2:7/20, 3:10/20, 6:14/20} + self.service_rate = service_rates[C] + # router request processing time + self.router_processing_time = (C - 1) / C + # λ + self.lambda_val = lambda_val + + self.router_state = 'idle' # 'idle', 'processing', 'blocked' + + self.event_queue = [] # (time, Event) + self.current_time = 0.0 + + self.router_queue = [] + + self.total_requests = 0 + self.lost_requests = 0 + self.loss_rate = 0 + self.response_times = [] + + def next_request(self): + # exponential distribution, parameter λ + interval = random.expovariate(self.lambda_val) + new_time = self.current_time + interval + arrival_time = new_time + + category = random.randint(0, self.C-1) if self.C>1 else 0 + + request = Request(category, arrival_time) + request_event = Event("request", request) + + heappush(self.event_queue, (arrival_time, request_event)) + + def handle_request(self, request): + self.total_requests += 1 + if len(self.router_queue) == 0 and self.router_state == "idle": + self.router_process(request) + elif ((len(self.router_queue) + (self.router_state == "processing")) < 100): + self.router_queue.append(request) + else: + self.lost_requests += 1 + self.loss_rate = self.lost_requests / self.total_requests + if self.loss_rate > 0.05 : + raise ValueError("lossrate too high") + + def router_process(self, request): + if self.router_state == "idle": + self.router_state = 'processing' + router_finish = Event("router_finish", request) + finish_time = self.current_time + self.router_processing_time + heappush(self.event_queue, (finish_time, router_finish)) + else: + raise RuntimeError("shouldn't reach this branch") + + + def router_process_finish(self, request): + # send the request to a free server + if self.occupied_servers[request.category] < self.K: + self.router_state = "idle" + self.occupied_servers[request.category] += 1 + self.process_request(request) + else: + self.router_state = "blocked" + self.router_queue.insert(0, request) + + # router process next request + self.router_process_next() + + def router_process_next(self): + if (len(self.router_queue) > 0) and (self.router_state == "idle"): + self.router_process(self.router_queue.pop(0)) + + def process_request(self, request): + interval = random.expovariate(self.service_rate) + finish_time = self.current_time + interval + process_finish = Event("process_finish", request) + heappush(self.event_queue, (finish_time, process_finish)) + + def process_request_finish(self, request): + self.response_times.append(self.current_time - request.arrival_time) + self.occupied_servers[request.category] -= 1 + + if (self.router_state == "blocked") and (request.category == self.router_queue[0].category): + self.process_request(self.router_queue.pop(0)) + self.occupied_servers[request.category] += 1 + self.router_state = "idle" + self.router_process_next() + + + def run(self, max_time): + # first request + self.next_request() + + while (self.current_time <= max_time): + current_event = heappop(self.event_queue) + self.current_time = current_event[0] + match current_event[1].event_type: + case "request": + self.next_request() + self.handle_request(current_event[1].request) + case "router_finish": + self.router_process_finish(current_event[1].request) + case "process_finish": + self.process_request_finish(current_event[1].request) + case _ : + raise RuntimeError("shouldn't reach this branch") + + + +def run_single_simulation(args): + c, lambda_val, simulation_time = args + # for different seed in each process + random.seed(time.time() + os.getpid()) + try: + sim = Simulation(c, lambda_val) + sim.run(simulation_time) + if len(sim.response_times) > 0: + run_mean = sum(sim.response_times) / len(sim.response_times) + loss_rate = sim.loss_rate + return (run_mean, loss_rate) + else: + return None + except ValueError: # Loss rate too high + return None + +def simulation_wrapper(simulation_time, num_runs, min_runs, confidence_level): + C_values = [1, 2, 3, 6] + + lambda_vals = [l/100 for l in range(1, 301)] # λ from 0.01 to 3.00 + + plt.figure(figsize=(12, 8)) + + with Pool() as pool: # pool of workers + for c in C_values: + lambda_points = [] + means = [] + ci_lower = [] + ci_upper = [] + print(f"\nProcessing C={c}") + + for lambda_val in lambda_vals: + # run num_runs simulation for each lambda + args_list = [(c, lambda_val, simulation_time) for _ in range(num_runs)] + results = pool.map(run_single_simulation, args_list) + + # collect results from successful simulations + successful_results = [res for res in results if res is not None] + run_results = [res[0] for res in successful_results] + loss_rates = [res[1] for res in successful_results] + + # reject if not enough successful run + if len(run_results) >= min_runs: + # statistics + mean_rt = np.mean(run_results) + std_dev = np.std(run_results, ddof=1) + n = len(run_results) + + # confidence interval + t_value = t.ppf((1 + confidence_level)/2, n-1) + ci = t_value * std_dev / np.sqrt(n) + + # loss rate + mean_loss = np.mean(loss_rates) + + # store results + lambda_points.append(lambda_val) + means.append(mean_rt) + ci_lower.append(mean_rt - ci) + ci_upper.append(mean_rt + ci) + + print(f"C={c}, λ={lambda_val:.2f}, Mean RT={mean_rt:.2f} ± {ci:.2f}, Loss Rate={mean_loss:.2%}") + elif len(run_results) > 0: + print(f"λ={lambda_val:.2f} skipped - only {len(run_results)} successful run(s)") + continue + else: + print(f"Stopped at λ={lambda_val:.2f} - no successful run") + break + + + plt.plot(lambda_points, means, label=f'C={c}') + plt.fill_between(lambda_points, ci_lower, ci_upper, alpha=0.2) + + plt.xlabel('Arrival Rate (λ)') + plt.ylabel('Mean Response Time') + plt.title(f'Mean Response Time vs Arrival Rate ({num_runs} runs, 95% CI)') + plt.legend() + plt.grid(True) + plt.show()