CSS-cipher/main.py

404 lines
14 KiB
Python
Raw Normal View History

2024-04-13 00:28:03 +02:00
#!/usr/bin/env python3
2024-04-21 15:28:24 +02:00
from random import randint
from math import log2
2024-04-24 22:52:01 +02:00
import multiprocessing
2024-04-13 00:28:03 +02:00
class lfsr(object):
def __init__(self, state, taps):
2024-04-26 12:11:26 +02:00
if max(taps) > len(state):
raise ValueError("inconsistent taps and initial state")
else:
self.state = state
self.taps = taps
2024-04-13 00:28:03 +02:00
# getters and setters (private attributes)
def get_state(self):
return self.__state
def set_state(self, value):
self.__state = value
state = property(fget=get_state, fset=set_state, doc="lfsr state (current bits value in lfsr)")
def get_taps(self):
return self.__taps
def set_taps(self, valeur):
self.__taps = valeur
taps = property(fget=get_taps, fset=set_taps, doc="lfsr taps (bit positions affecting next state)")
def shift(self):
'''
calculate next state and return the output bit
'''
feedback = sum(self.state[tap] for tap in self.taps) % 2
2024-04-21 17:39:00 +02:00
output = self.state[0]
self.state = self.state[1:] + [feedback]
2024-04-13 00:28:03 +02:00
return output
2024-04-21 15:28:24 +02:00
def test_lfsr17():
print("test lfsr17")
2024-04-24 22:52:01 +02:00
key = [randint(0, 1) for _ in range(17)] # random initial state
while key == [0]*17: # initial state shouldn't be 0
key = [randint(0, 1) for _ in range(17)]
2024-04-21 17:39:00 +02:00
taps = [0, 14]
2024-04-21 15:28:24 +02:00
lfsr17 = lfsr(key, taps)
states = [lfsr17.state]
for _ in range(2**17-2):
lfsr17.shift()
states.append(lfsr17.state)
sorted_states = sorted(states, key=lambda x: tuple(x))
for i in range(2**17-2):
if sorted_states[i] == sorted_states[i+1]: # compare each state with the next state in the sorted list, if 2 states are identical they should be next to each other
print(f'state {sorted_states[i]} appears at least 2 times')
return False
n_state = len(sorted_states)
n_state_log = log2(n_state+1)
print(f'all {n_state} = 2^({n_state_log})-1 generated states are different')
2024-04-21 15:28:24 +02:00
return True
2024-04-21 17:39:00 +02:00
def css_encrypt(text, key):
taps17 = [0, 14]
lfsr17 = lfsr((key[:16]+[1]), taps17)
taps25 = [0, 3, 4, 12]
lfsr25 = lfsr((key[16:]+[1]), taps25)
cipher = 0
2024-04-21 17:39:00 +02:00
carry = 0
if isinstance(text, int):
Bytes = text.to_bytes((text.bit_length() + 7) // 8, 'big')
elif isinstance(text, bytes):
Bytes = text
else:
raise TypeError("input text should be bytes or integer")
for byte in Bytes:
2024-04-21 17:39:00 +02:00
x_b2 = ""
y_b2 = ""
# Generate bytes from lfsr17 and lfsr25
for _ in range(8):
x_b2 += str(lfsr17.shift())
y_b2 += str(lfsr25.shift())
x = int(x_b2[::-1], 2)
y = int(y_b2[::-1], 2)
2024-04-21 17:39:00 +02:00
z = (x + y + carry) % 256
carry = 1 if x + y > 255 else 0
cipher_byte = z ^ byte
2024-04-24 22:52:01 +02:00
cipher = (cipher << 8) | cipher_byte
2024-04-24 22:52:01 +02:00
cipher_bytes = b'\x00'*(len(Bytes) - len(cipher.to_bytes((cipher.bit_length() + 7) // 8, 'big'))) +cipher.to_bytes((cipher.bit_length() + 7) // 8, 'big') # padding
return cipher_bytes
2024-04-21 17:39:00 +02:00
def test_encrypt():
print("test encryption: text 0xffffffffff, key 0x0 ∈ {0, 1}^40")
cipher = int.from_bytes(css_encrypt(0xffffffffff, [0]*40))
print(f'cipher: {hex(cipher)}')
clear = int.from_bytes(css_encrypt(cipher, [0]*40))
print(f'decrypted: {hex(clear)}')
print(f'original text and decrypted message are the same: {clear==0xffffffffff}')
2024-04-22 19:39:22 +02:00
def gen_6_bytes(key=[randint(0, 1) for _ in range(40)]):
text = b'\x00\x00\x00\x00\x00\x00'
cipher = css_encrypt(text, key)
return cipher
2024-04-23 10:21:26 +02:00
def attack_worker(start, end, Bytes, result_queue, stop_event):
2024-04-22 19:08:27 +02:00
taps17 = [0, 14]
taps25 = [0, 3, 4, 12]
2024-04-23 10:21:26 +02:00
for i in range(start, end):
2024-04-26 12:11:26 +02:00
if stop_event.is_set(): # stop key search as soon as one worker finds a solution
2024-04-23 10:21:26 +02:00
return
2024-04-22 19:08:27 +02:00
lfsr17_init = [int(bit) for bit in bin(i)[2:].zfill(16)]+[1]
lfsr17 = lfsr(lfsr17_init, taps17)
x = []
2024-04-26 12:11:26 +02:00
# generate first 3 output bytes from lfsr 17
2024-04-22 19:08:27 +02:00
for _ in range(3):
x_bin = ""
for _ in range(8):
x_bin += str(lfsr17.shift())
x.append(int(x_bin[::-1], 2))
2024-04-26 12:11:26 +02:00
# calculate corresponding output bytes from lfsr 25
2024-04-22 19:08:27 +02:00
y = [(Bytes[0]-x[0])%256]
c=1 if x[0]+y[0]>255 else 0
y.append((Bytes[1]-(x[1]+c))%256)
c=1 if x[1]+y[1]>255 else 0
y.append((Bytes[2]-(x[2]+c))%256)
2024-04-26 12:11:26 +02:00
2024-04-22 19:08:27 +02:00
lfsr25_init = [int(bit) for bit in (bin(y[0])[2:].zfill(8)[::-1] + bin(y[1])[2:].zfill(8)[::-1] + bin(y[2])[2:].zfill(8)[::-1] ) ]+[1]
lfsr25 = lfsr(lfsr25_init, taps25)
2024-04-26 12:11:26 +02:00
# first 3 bytes from lfsr 25 are already known
2024-04-22 19:08:27 +02:00
for _ in range(24):
lfsr25.shift()
2024-04-26 12:11:26 +02:00
# generate next 3 bytes from lfsr 25 and lfsr 17
2024-04-22 19:08:27 +02:00
for _ in range(3):
x_bin = ""
y_bin = ""
for _ in range(8):
x_bin += str(lfsr17.shift())
y_bin += str(lfsr25.shift())
x.append(int(x_bin[::-1], 2))
y.append(int(y_bin[::-1], 2))
2024-04-26 12:11:26 +02:00
# check if we can generate the next 3 output bytes z4,z5,z6 correctly
2024-04-22 19:08:27 +02:00
c=1 if x[2]+y[2]>255 else 0
z4 = (x[3]+y[3]+c)%256
c=1 if x[3]+y[3]>255 else 0
z5 = (x[4]+y[4]+c)%256
c=1 if x[4]+y[4]>255 else 0
z6 = (x[5]+y[5]+c)%256
2024-04-26 12:11:26 +02:00
if z4 == Bytes[3] and z5 == Bytes[4] and z6 == Bytes[5]: # if these bytes are correct the key found is most likely the correct one
2024-04-22 19:08:27 +02:00
key = bin(x[0])[2:].zfill(8)[::-1] + bin(x[1])[2:].zfill(8)[::-1] + bin(y[0])[2:].zfill(8)[::-1] + bin(y[1])[2:].zfill(8)[::-1] + bin(y[2])[2:].zfill(8)[::-1]
2024-04-23 10:21:26 +02:00
result_queue.put(key)
stop_event.set()
return
def attack(Bytes=gen_6_bytes()):
result_queue = multiprocessing.Queue()
stop_event = multiprocessing.Event()
processes = []
num_cores = multiprocessing.cpu_count()
max_upper_limit = 2**16
chunk_size = max_upper_limit // num_cores
2024-04-23 12:45:11 +02:00
for i in range(num_cores-1):
2024-04-23 10:21:26 +02:00
start = i * chunk_size
end = start + chunk_size
process = multiprocessing.Process(target=attack_worker, args=(start, end, Bytes, result_queue, stop_event))
processes.append(process)
process.start()
2024-04-23 12:45:11 +02:00
# last process
start = (num_cores-1) * chunk_size
end = max_upper_limit
process = multiprocessing.Process(target=attack_worker, args=(start, end, Bytes, result_queue, stop_event))
processes.append(process)
process.start()
2024-04-23 10:21:26 +02:00
for process in processes:
process.join()
key = result_queue.get()
print(f'key found: \t{key}')
return [int(bit) for bit in key]
2024-04-22 19:08:27 +02:00
2024-04-22 19:39:22 +02:00
def test_attack(n=1):
success = 0
print(f'testing attack in 2^16 against CSS {n} times (keys randomly generated each time)\n')
for _ in range(n):
key = [randint(0, 1) for _ in range(40)]
key_string = ''.join(str(bit) for bit in key)
print(f'key generated: \t{key_string}')
Bytes = gen_6_bytes(key)
found_key = attack(Bytes)
failed = []
2024-04-22 19:39:22 +02:00
if found_key == key:
success += 1
else:
failed.append(key)
2024-04-22 19:39:22 +02:00
print()
print(f'{success}/{n} success')
if len(failed)>0:
print("fails:")
for key in failed:
print("".join(str(bit) for bit in key))
def test_fail():
2024-04-24 22:52:01 +02:00
# happened with 74/36000 keys, probability of ~0.0021
taps17 = [0, 14]
taps25 = [0, 3, 4, 12]
key = [1, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 1]
key2 = [1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0]
print(f'both of these keys seem to output the same first 6 bytes, it only differs starting from byte 7:\n{"".join(str(bit) for bit in key)}\n{"".join(str(bit) for bit in key2)}\n')
lfsr17 = lfsr(key[:16]+[1], taps17)
lfsr172 = lfsr(key2[:16]+[1], taps17)
lfsr25 = lfsr(key[16:]+[1], taps25)
lfsr252 = lfsr(key2[16:]+[1], taps25)
x = []
y = []
x2 = []
y2=[]
for _ in range(7):
x_bin = ""
y_bin = ""
x_bin2 = ""
y_bin2 = ""
for _ in range(8):
x_bin += str(lfsr17.shift())
y_bin += str(lfsr25.shift())
x_bin2 += str(lfsr172.shift())
y_bin2 += str(lfsr252.shift())
x.append(int(x_bin[::-1], 2))
y.append(int(y_bin[::-1], 2))
x2.append(int(x_bin2[::-1], 2))
y2.append(int(y_bin2[::-1], 2))
print(f'LFSR outputs with 1st key: lfsr17: {x} lfsr25: {y}')
print(f'LFSR outputs with 2nd key: lfsr17: {x2} lfsr25: {y2}\n')
c = 0
c2 = 0
for i in range(7):
s = x[i]+y[i]+c
if s>255:
c=1
else:
c=0
s2 = x2[i]+y2[i]+c2
if s2>255:
c2=1
else:
c2=0
print(f'8bits addition at step {i} equal for both keys: {s%256}={s2%256} : {s%256==s2%256}')
text = 0xffffffffffff
cipher = int.from_bytes(css_encrypt(text, key))
cipher2 = int.from_bytes(css_encrypt(text, key))
print(f'\nencrypting 0xffffffffffff (6 bytes)\nwith 1st key: {hex(cipher)}\nwith 2nd key: {hex(cipher2)}\nboth outputs are equal: {cipher==cipher2}\n')
2024-04-22 19:39:22 +02:00
2024-04-22 19:08:27 +02:00
2024-04-24 22:52:01 +02:00
def safer_attack_worker(start, end, Bytes, result_queue):
taps17 = [0, 14]
taps25 = [0, 3, 4, 12]
for i in range(start, end):
lfsr17_init = [int(bit) for bit in bin(i)[2:].zfill(16)]+[1]
lfsr17 = lfsr(lfsr17_init, taps17)
x = []
for _ in range(3):
x_bin = ""
for _ in range(8):
x_bin += str(lfsr17.shift())
x.append(int(x_bin[::-1], 2))
y = [(Bytes[0]-x[0])%256]
c=1 if x[0]+y[0]>255 else 0
y.append((Bytes[1]-(x[1]+c))%256)
c=1 if x[1]+y[1]>255 else 0
y.append((Bytes[2]-(x[2]+c))%256)
lfsr25_init = [int(bit) for bit in (bin(y[0])[2:].zfill(8)[::-1] + bin(y[1])[2:].zfill(8)[::-1] + bin(y[2])[2:].zfill(8)[::-1] ) ]+[1]
lfsr25 = lfsr(lfsr25_init, taps25)
for _ in range(24):
lfsr25.shift()
for _ in range(3):
x_bin = ""
y_bin = ""
for _ in range(8):
x_bin += str(lfsr17.shift())
y_bin += str(lfsr25.shift())
x.append(int(x_bin[::-1], 2))
y.append(int(y_bin[::-1], 2))
c=1 if x[2]+y[2]>255 else 0
z4 = (x[3]+y[3]+c)%256
c=1 if x[3]+y[3]>255 else 0
z5 = (x[4]+y[4]+c)%256
c=1 if x[4]+y[4]>255 else 0
z6 = (x[5]+y[5]+c)%256
if z4 == Bytes[3] and z5 == Bytes[4] and z6 == Bytes[5]:
key = bin(x[0])[2:].zfill(8)[::-1] + bin(x[1])[2:].zfill(8)[::-1] + bin(y[0])[2:].zfill(8)[::-1] + bin(y[1])[2:].zfill(8)[::-1] + bin(y[2])[2:].zfill(8)[::-1]
result_queue.put(key)
def safer_attack(ask_queue, answer_queue, Bytes=gen_6_bytes()):
result_queue = multiprocessing.Queue()
processes = []
num_cores = multiprocessing.cpu_count()
max_upper_limit = 2**16
chunk_size = max_upper_limit // num_cores
for i in range(num_cores-1):
start = i * chunk_size
end = start + chunk_size
process = multiprocessing.Process(target=safer_attack_worker, args=(start, end, Bytes, result_queue))
processes.append(process)
process.start()
# last process
start = (num_cores-1) * chunk_size
end = max_upper_limit
process = multiprocessing.Process(target=safer_attack_worker, args=(start, end, Bytes, result_queue))
processes.append(process)
process.start()
for process in processes:
process.join()
keys = []
while not result_queue.empty():
key = result_queue.get()
print(f'key found: \t{key}')
keys.append(key)
if len(keys)==1:
ask_queue.put(False)
answer_queue.put([int(bit) for bit in keys[0]])
else:
print("collision detected")
ask_queue.put(True)
Byte7 = answer_queue.get()
taps17 = [0, 14]
taps25 = [0, 3, 4, 12]
for key_str in keys:
key = [int(bit) for bit in key_str]
lfsr17 = lfsr(key[:16]+[1], taps17)
lfsr25 = lfsr(key[16:]+[1], taps25)
for _ in range(40):
lfsr17.shift()
lfsr25.shift()
x = []
y = []
for _ in range(2):
x_bin = ""
y_bin = ""
for _ in range(8):
x_bin += str(lfsr17.shift())
y_bin += str(lfsr25.shift())
x.append(int(x_bin[::-1], 2))
y.append(int(y_bin[::-1], 2))
c=1 if x[0]+y[0]>255 else 0
z7 = (x[1]+y[1]+c)%256
if z7 == Byte7:
print(f'kept key: \t{key_str}')
answer_queue.put(key)
return
def test_safer_attack(n=1):
success = 0
ask_queue = multiprocessing.Queue()
answer_queue = multiprocessing.Queue()
print(f'testing safer attack in 2^16 against CSS {n} times (keys randomly generated each time)\n')
for _ in range(n):
key = [randint(0, 1) for _ in range(40)]
# key = [1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0]
key_string = ''.join(str(bit) for bit in key)
print(f'key generated: \t{key_string}')
Bytes = gen_6_bytes(key)
process = multiprocessing.Process(target=safer_attack, args=(ask_queue, answer_queue, Bytes))
process.start()
failed = []
additional_byte = ask_queue.get()
if additional_byte: # give access to an additional byte if asked
print("giving an additional byte")
text = b'\x00\x00\x00\x00\x00\x00\x00'
cipher = css_encrypt(text, key)
answer_queue.put(cipher[-1])
found_key = answer_queue.get()
if found_key == key:
success += 1
else:
failed.append(key)
print()
print(f'{success}/{n} success')
if len(failed)>0:
print("fails:")
for key in failed:
print("".join(str(bit) for bit in key))
2024-04-21 15:28:24 +02:00
test_lfsr17()
print("\n")
test_encrypt()
print("\n")
# time to test attack with 100 keys
# CPU: 6 cores at 3.8GHz
# ~65 seconds
2024-04-26 12:11:26 +02:00
test_attack(5)
print("\n")
test_fail()
2024-04-24 22:52:01 +02:00
print("\n")
test_safer_attack(5)