#!/usr/bin/env python3 from random import randint from math import log2 import multiprocessing class lfsr(object): def __init__(self, state, taps): if max(taps) > len(state): raise ValueError("inconsistent taps and initial state") else: self.state = state self.taps = taps # getters and setters (private attributes) def get_state(self): return self.__state def set_state(self, value): self.__state = value state = property(fget=get_state, fset=set_state, doc="lfsr state (current bits value in lfsr)") def get_taps(self): return self.__taps def set_taps(self, valeur): self.__taps = valeur taps = property(fget=get_taps, fset=set_taps, doc="lfsr taps (bit positions affecting next state)") def shift(self): ''' calculate next state and return the output bit ''' feedback = sum(self.state[tap] for tap in self.taps) % 2 output = self.state[0] self.state = self.state[1:] + [feedback] return output def test_lfsr17(): print("test lfsr17") key = [randint(0, 1) for _ in range(17)] # random initial state while key == [0]*17: # initial state shouldn't be 0 key = [randint(0, 1) for _ in range(17)] taps = [0, 14] lfsr17 = lfsr(key, taps) states = [lfsr17.state] for _ in range(2**17-2): lfsr17.shift() states.append(lfsr17.state) sorted_states = sorted(states, key=lambda x: tuple(x)) for i in range(2**17-2): if sorted_states[i] == sorted_states[i+1]: # compare each state with the next state in the sorted list, if 2 states are identical they should be next to each other print(f'state {sorted_states[i]} appears at least 2 times') return False n_state = len(sorted_states) n_state_log = log2(n_state+1) print(f'all {n_state} = 2^({n_state_log})-1 generated states are different') return True def css_encrypt(text, key): taps17 = [0, 14] lfsr17 = lfsr((key[:16]+[1]), taps17) taps25 = [0, 3, 4, 12] lfsr25 = lfsr((key[16:]+[1]), taps25) cipher = 0 carry = 0 if isinstance(text, int): Bytes = text.to_bytes((text.bit_length() + 7) // 8, 'big') elif isinstance(text, bytes): Bytes = text else: raise TypeError("input text should be bytes or integer") for byte in Bytes: x_b2 = "" y_b2 = "" # Generate bytes from lfsr17 and lfsr25 for _ in range(8): x_b2 += str(lfsr17.shift()) y_b2 += str(lfsr25.shift()) x = int(x_b2[::-1], 2) y = int(y_b2[::-1], 2) z = (x + y + carry) % 256 carry = 1 if x + y > 255 else 0 cipher_byte = z ^ byte cipher = (cipher << 8) | cipher_byte cipher_bytes = b'\x00'*(len(Bytes) - len(cipher.to_bytes((cipher.bit_length() + 7) // 8, 'big'))) +cipher.to_bytes((cipher.bit_length() + 7) // 8, 'big') # padding return cipher_bytes def test_encrypt(): print("test encryption: text 0xffffffffff, key 0x0 ∈ {0, 1}^40") cipher = int.from_bytes(css_encrypt(0xffffffffff, [0]*40)) print(f'cipher: {hex(cipher)}') clear = int.from_bytes(css_encrypt(cipher, [0]*40)) print(f'decrypted: {hex(clear)}') print(f'original text and decrypted message are the same: {clear==0xffffffffff}') def gen_6_bytes(key=[randint(0, 1) for _ in range(40)]): text = b'\x00\x00\x00\x00\x00\x00' cipher = css_encrypt(text, key) return cipher def attack_worker(start, end, Bytes, result_queue, stop_event): taps17 = [0, 14] taps25 = [0, 3, 4, 12] for i in range(start, end): if stop_event.is_set(): # stop key search as soon as one worker finds a solution return lfsr17_init = [int(bit) for bit in bin(i)[2:].zfill(16)]+[1] lfsr17 = lfsr(lfsr17_init, taps17) x = [] # generate first 3 output bytes from lfsr 17 for _ in range(3): x_bin = "" for _ in range(8): x_bin += str(lfsr17.shift()) x.append(int(x_bin[::-1], 2)) # calculate corresponding output bytes from lfsr 25 y = [(Bytes[0]-x[0])%256] c=1 if x[0]+y[0]>255 else 0 y.append((Bytes[1]-(x[1]+c))%256) c=1 if x[1]+y[1]>255 else 0 y.append((Bytes[2]-(x[2]+c))%256) lfsr25_init = [int(bit) for bit in (bin(y[0])[2:].zfill(8)[::-1] + bin(y[1])[2:].zfill(8)[::-1] + bin(y[2])[2:].zfill(8)[::-1] ) ]+[1] lfsr25 = lfsr(lfsr25_init, taps25) # first 3 bytes from lfsr 25 are already known for _ in range(24): lfsr25.shift() # generate next 3 bytes from lfsr 25 and lfsr 17 for _ in range(3): x_bin = "" y_bin = "" for _ in range(8): x_bin += str(lfsr17.shift()) y_bin += str(lfsr25.shift()) x.append(int(x_bin[::-1], 2)) y.append(int(y_bin[::-1], 2)) # check if we can generate the next 3 output bytes z4,z5,z6 correctly c=1 if x[2]+y[2]>255 else 0 z4 = (x[3]+y[3]+c)%256 c=1 if x[3]+y[3]>255 else 0 z5 = (x[4]+y[4]+c)%256 c=1 if x[4]+y[4]>255 else 0 z6 = (x[5]+y[5]+c)%256 if z4 == Bytes[3] and z5 == Bytes[4] and z6 == Bytes[5]: # if these bytes are correct the key found is most likely the correct one key = bin(x[0])[2:].zfill(8)[::-1] + bin(x[1])[2:].zfill(8)[::-1] + bin(y[0])[2:].zfill(8)[::-1] + bin(y[1])[2:].zfill(8)[::-1] + bin(y[2])[2:].zfill(8)[::-1] result_queue.put(key) stop_event.set() return def attack(Bytes=gen_6_bytes()): result_queue = multiprocessing.Queue() stop_event = multiprocessing.Event() processes = [] num_cores = multiprocessing.cpu_count() max_upper_limit = 2**16 chunk_size = max_upper_limit // num_cores for i in range(num_cores-1): start = i * chunk_size end = start + chunk_size process = multiprocessing.Process(target=attack_worker, args=(start, end, Bytes, result_queue, stop_event)) processes.append(process) process.start() # last process start = (num_cores-1) * chunk_size end = max_upper_limit process = multiprocessing.Process(target=attack_worker, args=(start, end, Bytes, result_queue, stop_event)) processes.append(process) process.start() for process in processes: process.join() key = result_queue.get() print(f'key found: \t{key}') return [int(bit) for bit in key] def test_attack(n=1): success = 0 print(f'testing attack in 2^16 against CSS {n} times (keys randomly generated each time)\n') for _ in range(n): key = [randint(0, 1) for _ in range(40)] key_string = ''.join(str(bit) for bit in key) print(f'key generated: \t{key_string}') Bytes = gen_6_bytes(key) found_key = attack(Bytes) failed = [] if found_key == key: success += 1 else: failed.append(key) print() print(f'{success}/{n} success') if len(failed)>0: print("fails:") for key in failed: print("".join(str(bit) for bit in key)) def test_fail(): # happened with 74/36000 keys, probability of ~0.0021 taps17 = [0, 14] taps25 = [0, 3, 4, 12] key = [1, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 1] key2 = [1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0] print(f'both of these keys seem to output the same first 6 bytes, it only differs starting from byte 7:\n{"".join(str(bit) for bit in key)}\n{"".join(str(bit) for bit in key2)}\n') lfsr17 = lfsr(key[:16]+[1], taps17) lfsr172 = lfsr(key2[:16]+[1], taps17) lfsr25 = lfsr(key[16:]+[1], taps25) lfsr252 = lfsr(key2[16:]+[1], taps25) x = [] y = [] x2 = [] y2=[] for _ in range(7): x_bin = "" y_bin = "" x_bin2 = "" y_bin2 = "" for _ in range(8): x_bin += str(lfsr17.shift()) y_bin += str(lfsr25.shift()) x_bin2 += str(lfsr172.shift()) y_bin2 += str(lfsr252.shift()) x.append(int(x_bin[::-1], 2)) y.append(int(y_bin[::-1], 2)) x2.append(int(x_bin2[::-1], 2)) y2.append(int(y_bin2[::-1], 2)) print(f'LFSR outputs with 1st key: lfsr17: {x} lfsr25: {y}') print(f'LFSR outputs with 2nd key: lfsr17: {x2} lfsr25: {y2}\n') c = 0 c2 = 0 for i in range(7): s = x[i]+y[i]+c if s>255: c=1 else: c=0 s2 = x2[i]+y2[i]+c2 if s2>255: c2=1 else: c2=0 print(f'8bits addition at step {i} equal for both keys: {s%256}={s2%256} : {s%256==s2%256}') text = 0xffffffffffff cipher = int.from_bytes(css_encrypt(text, key)) cipher2 = int.from_bytes(css_encrypt(text, key)) print(f'\nencrypting 0xffffffffffff (6 bytes)\nwith 1st key: {hex(cipher)}\nwith 2nd key: {hex(cipher2)}\nboth outputs are equal: {cipher==cipher2}\n') def safer_attack_worker(start, end, Bytes, result_queue): taps17 = [0, 14] taps25 = [0, 3, 4, 12] for i in range(start, end): lfsr17_init = [int(bit) for bit in bin(i)[2:].zfill(16)]+[1] lfsr17 = lfsr(lfsr17_init, taps17) x = [] for _ in range(3): x_bin = "" for _ in range(8): x_bin += str(lfsr17.shift()) x.append(int(x_bin[::-1], 2)) y = [(Bytes[0]-x[0])%256] c=1 if x[0]+y[0]>255 else 0 y.append((Bytes[1]-(x[1]+c))%256) c=1 if x[1]+y[1]>255 else 0 y.append((Bytes[2]-(x[2]+c))%256) lfsr25_init = [int(bit) for bit in (bin(y[0])[2:].zfill(8)[::-1] + bin(y[1])[2:].zfill(8)[::-1] + bin(y[2])[2:].zfill(8)[::-1] ) ]+[1] lfsr25 = lfsr(lfsr25_init, taps25) for _ in range(24): lfsr25.shift() for _ in range(3): x_bin = "" y_bin = "" for _ in range(8): x_bin += str(lfsr17.shift()) y_bin += str(lfsr25.shift()) x.append(int(x_bin[::-1], 2)) y.append(int(y_bin[::-1], 2)) c=1 if x[2]+y[2]>255 else 0 z4 = (x[3]+y[3]+c)%256 c=1 if x[3]+y[3]>255 else 0 z5 = (x[4]+y[4]+c)%256 c=1 if x[4]+y[4]>255 else 0 z6 = (x[5]+y[5]+c)%256 if z4 == Bytes[3] and z5 == Bytes[4] and z6 == Bytes[5]: key = bin(x[0])[2:].zfill(8)[::-1] + bin(x[1])[2:].zfill(8)[::-1] + bin(y[0])[2:].zfill(8)[::-1] + bin(y[1])[2:].zfill(8)[::-1] + bin(y[2])[2:].zfill(8)[::-1] result_queue.put(key) def safer_attack(ask_queue, answer_queue, Bytes=gen_6_bytes()): result_queue = multiprocessing.Queue() processes = [] num_cores = multiprocessing.cpu_count() max_upper_limit = 2**16 chunk_size = max_upper_limit // num_cores for i in range(num_cores-1): start = i * chunk_size end = start + chunk_size process = multiprocessing.Process(target=safer_attack_worker, args=(start, end, Bytes, result_queue)) processes.append(process) process.start() # last process start = (num_cores-1) * chunk_size end = max_upper_limit process = multiprocessing.Process(target=safer_attack_worker, args=(start, end, Bytes, result_queue)) processes.append(process) process.start() for process in processes: process.join() keys = [] while not result_queue.empty(): key = result_queue.get() print(f'key found: \t{key}') keys.append(key) if len(keys)==1: ask_queue.put(False) answer_queue.put([int(bit) for bit in keys[0]]) else: print("collision detected") ask_queue.put(True) Byte7 = answer_queue.get() taps17 = [0, 14] taps25 = [0, 3, 4, 12] for key_str in keys: key = [int(bit) for bit in key_str] lfsr17 = lfsr(key[:16]+[1], taps17) lfsr25 = lfsr(key[16:]+[1], taps25) for _ in range(40): lfsr17.shift() lfsr25.shift() x = [] y = [] for _ in range(2): x_bin = "" y_bin = "" for _ in range(8): x_bin += str(lfsr17.shift()) y_bin += str(lfsr25.shift()) x.append(int(x_bin[::-1], 2)) y.append(int(y_bin[::-1], 2)) c=1 if x[0]+y[0]>255 else 0 z7 = (x[1]+y[1]+c)%256 if z7 == Byte7: print(f'kept key: \t{key_str}') answer_queue.put(key) return def test_safer_attack(n=1): success = 0 ask_queue = multiprocessing.Queue() answer_queue = multiprocessing.Queue() print(f'testing safer attack in 2^16 against CSS {n} times (keys randomly generated each time)\n') for _ in range(n): key = [randint(0, 1) for _ in range(40)] # key = [1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0] key_string = ''.join(str(bit) for bit in key) print(f'key generated: \t{key_string}') Bytes = gen_6_bytes(key) process = multiprocessing.Process(target=safer_attack, args=(ask_queue, answer_queue, Bytes)) process.start() failed = [] additional_byte = ask_queue.get() if additional_byte: # give access to an additional byte if asked print("giving an additional byte") text = b'\x00\x00\x00\x00\x00\x00\x00' cipher = css_encrypt(text, key) answer_queue.put(cipher[-1]) found_key = answer_queue.get() if found_key == key: success += 1 else: failed.append(key) print() print(f'{success}/{n} success') if len(failed)>0: print("fails:") for key in failed: print("".join(str(bit) for bit in key)) test_lfsr17() print("\n") test_encrypt() print("\n") # time to test attack with 100 keys # CPU: 6 cores at 3.8GHz # ~65 seconds test_attack(5) print("\n") test_fail() print("\n") test_safer_attack(5)