2024-04-13 00:28:03 +02:00
#!/usr/bin/env python3
2024-04-21 15:28:24 +02:00
from random import randint
from math import log2
2024-04-24 22:52:01 +02:00
import multiprocessing
2024-04-13 00:28:03 +02:00
class lfsr ( object ) :
def __init__ ( self , state , taps ) :
2024-04-26 12:11:26 +02:00
if max ( taps ) > len ( state ) :
raise ValueError ( " inconsistent taps and initial state " )
else :
self . state = state
self . taps = taps
2024-04-13 00:28:03 +02:00
# getters and setters (private attributes)
def get_state ( self ) :
return self . __state
def set_state ( self , value ) :
self . __state = value
state = property ( fget = get_state , fset = set_state , doc = " lfsr state (current bits value in lfsr) " )
def get_taps ( self ) :
return self . __taps
def set_taps ( self , valeur ) :
self . __taps = valeur
taps = property ( fget = get_taps , fset = set_taps , doc = " lfsr taps (bit positions affecting next state) " )
def shift ( self ) :
'''
calculate next state and return the output bit
'''
feedback = sum ( self . state [ tap ] for tap in self . taps ) % 2
2024-04-21 17:39:00 +02:00
output = self . state [ 0 ]
self . state = self . state [ 1 : ] + [ feedback ]
2024-04-13 00:28:03 +02:00
return output
2024-04-21 15:28:24 +02:00
def test_lfsr17 ( ) :
2024-04-21 17:48:31 +02:00
print ( " test lfsr17 " )
2024-04-24 22:52:01 +02:00
key = [ randint ( 0 , 1 ) for _ in range ( 17 ) ] # random initial state
while key == [ 0 ] * 17 : # initial state shouldn't be 0
key = [ randint ( 0 , 1 ) for _ in range ( 17 ) ]
2024-04-21 17:39:00 +02:00
taps = [ 0 , 14 ]
2024-04-21 15:28:24 +02:00
lfsr17 = lfsr ( key , taps )
states = [ lfsr17 . state ]
for _ in range ( 2 * * 17 - 2 ) :
lfsr17 . shift ( )
states . append ( lfsr17 . state )
sorted_states = sorted ( states , key = lambda x : tuple ( x ) )
for i in range ( 2 * * 17 - 2 ) :
if sorted_states [ i ] == sorted_states [ i + 1 ] : # compare each state with the next state in the sorted list, if 2 states are identical they should be next to each other
print ( f ' state { sorted_states [ i ] } appears at least 2 times ' )
return False
n_state = len ( sorted_states )
n_state_log = log2 ( n_state + 1 )
2024-04-21 17:48:31 +02:00
print ( f ' all { n_state } = 2^( { n_state_log } )-1 generated states are different ' )
2024-04-21 15:28:24 +02:00
return True
2024-04-21 17:39:00 +02:00
def css_encrypt ( text , key ) :
taps17 = [ 0 , 14 ]
lfsr17 = lfsr ( ( key [ : 16 ] + [ 1 ] ) , taps17 )
taps25 = [ 0 , 3 , 4 , 12 ]
lfsr25 = lfsr ( ( key [ 16 : ] + [ 1 ] ) , taps25 )
2024-04-22 12:54:38 +02:00
cipher = 0
2024-04-21 17:39:00 +02:00
carry = 0
2024-04-22 12:54:38 +02:00
if isinstance ( text , int ) :
Bytes = text . to_bytes ( ( text . bit_length ( ) + 7 ) / / 8 , ' big ' )
elif isinstance ( text , bytes ) :
Bytes = text
else :
raise TypeError ( " input text should be bytes or integer " )
for byte in Bytes :
2024-04-21 17:39:00 +02:00
x_b2 = " "
y_b2 = " "
# Generate bytes from lfsr17 and lfsr25
for _ in range ( 8 ) :
x_b2 + = str ( lfsr17 . shift ( ) )
y_b2 + = str ( lfsr25 . shift ( ) )
2024-04-21 17:48:31 +02:00
x = int ( x_b2 [ : : - 1 ] , 2 )
y = int ( y_b2 [ : : - 1 ] , 2 )
2024-04-21 17:39:00 +02:00
z = ( x + y + carry ) % 256
carry = 1 if x + y > 255 else 0
cipher_byte = z ^ byte
2024-04-24 22:52:01 +02:00
2024-04-22 12:54:38 +02:00
cipher = ( cipher << 8 ) | cipher_byte
2024-04-24 22:52:01 +02:00
cipher_bytes = b ' \x00 ' * ( len ( Bytes ) - len ( cipher . to_bytes ( ( cipher . bit_length ( ) + 7 ) / / 8 , ' big ' ) ) ) + cipher . to_bytes ( ( cipher . bit_length ( ) + 7 ) / / 8 , ' big ' ) # padding
2024-04-22 12:54:38 +02:00
return cipher_bytes
2024-04-21 17:39:00 +02:00
2024-04-21 17:48:31 +02:00
def test_encrypt ( ) :
print ( " test encryption: text 0xffffffffff, key 0x0 ∈ { 0, 1}^40 " )
2024-04-24 09:55:49 +02:00
cipher = int . from_bytes ( css_encrypt ( 0xffffffffff , [ 0 ] * 40 ) )
2024-04-21 17:48:31 +02:00
print ( f ' cipher: { hex ( cipher ) } ' )
2024-04-22 12:54:38 +02:00
clear = int . from_bytes ( css_encrypt ( cipher , [ 0 ] * 40 ) )
2024-04-21 17:48:31 +02:00
print ( f ' decrypted: { hex ( clear ) } ' )
print ( f ' original text and decrypted message are the same: { clear == 0xffffffffff } ' )
2024-04-22 19:39:22 +02:00
def gen_6_bytes ( key = [ randint ( 0 , 1 ) for _ in range ( 40 ) ] ) :
2024-04-22 12:54:38 +02:00
text = b ' \x00 \x00 \x00 \x00 \x00 \x00 '
cipher = css_encrypt ( text , key )
return cipher
2024-04-23 10:21:26 +02:00
def attack_worker ( start , end , Bytes , result_queue , stop_event ) :
2024-04-22 19:08:27 +02:00
taps17 = [ 0 , 14 ]
taps25 = [ 0 , 3 , 4 , 12 ]
2024-04-23 10:21:26 +02:00
for i in range ( start , end ) :
2024-04-26 12:11:26 +02:00
if stop_event . is_set ( ) : # stop key search as soon as one worker finds a solution
2024-04-23 10:21:26 +02:00
return
2024-04-22 19:08:27 +02:00
lfsr17_init = [ int ( bit ) for bit in bin ( i ) [ 2 : ] . zfill ( 16 ) ] + [ 1 ]
lfsr17 = lfsr ( lfsr17_init , taps17 )
x = [ ]
2024-04-26 12:11:26 +02:00
# generate first 3 output bytes from lfsr 17
2024-04-22 19:08:27 +02:00
for _ in range ( 3 ) :
x_bin = " "
for _ in range ( 8 ) :
x_bin + = str ( lfsr17 . shift ( ) )
x . append ( int ( x_bin [ : : - 1 ] , 2 ) )
2024-04-26 12:11:26 +02:00
# calculate corresponding output bytes from lfsr 25
2024-04-22 19:08:27 +02:00
y = [ ( Bytes [ 0 ] - x [ 0 ] ) % 256 ]
c = 1 if x [ 0 ] + y [ 0 ] > 255 else 0
y . append ( ( Bytes [ 1 ] - ( x [ 1 ] + c ) ) % 256 )
c = 1 if x [ 1 ] + y [ 1 ] > 255 else 0
y . append ( ( Bytes [ 2 ] - ( x [ 2 ] + c ) ) % 256 )
2024-04-26 12:11:26 +02:00
2024-04-22 19:08:27 +02:00
lfsr25_init = [ int ( bit ) for bit in ( bin ( y [ 0 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] + bin ( y [ 1 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] + bin ( y [ 2 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] ) ] + [ 1 ]
lfsr25 = lfsr ( lfsr25_init , taps25 )
2024-04-26 12:11:26 +02:00
# first 3 bytes from lfsr 25 are already known
2024-04-22 19:08:27 +02:00
for _ in range ( 24 ) :
lfsr25 . shift ( )
2024-04-26 12:11:26 +02:00
# generate next 3 bytes from lfsr 25 and lfsr 17
2024-04-22 19:08:27 +02:00
for _ in range ( 3 ) :
x_bin = " "
y_bin = " "
for _ in range ( 8 ) :
x_bin + = str ( lfsr17 . shift ( ) )
y_bin + = str ( lfsr25 . shift ( ) )
x . append ( int ( x_bin [ : : - 1 ] , 2 ) )
y . append ( int ( y_bin [ : : - 1 ] , 2 ) )
2024-04-26 12:11:26 +02:00
# check if we can generate the next 3 output bytes z4,z5,z6 correctly
2024-04-22 19:08:27 +02:00
c = 1 if x [ 2 ] + y [ 2 ] > 255 else 0
z4 = ( x [ 3 ] + y [ 3 ] + c ) % 256
c = 1 if x [ 3 ] + y [ 3 ] > 255 else 0
z5 = ( x [ 4 ] + y [ 4 ] + c ) % 256
c = 1 if x [ 4 ] + y [ 4 ] > 255 else 0
z6 = ( x [ 5 ] + y [ 5 ] + c ) % 256
2024-04-26 12:11:26 +02:00
if z4 == Bytes [ 3 ] and z5 == Bytes [ 4 ] and z6 == Bytes [ 5 ] : # if these bytes are correct the key found is most likely the correct one
2024-04-22 19:08:27 +02:00
key = bin ( x [ 0 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] + bin ( x [ 1 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] + bin ( y [ 0 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] + bin ( y [ 1 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] + bin ( y [ 2 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ]
2024-04-23 10:21:26 +02:00
result_queue . put ( key )
stop_event . set ( )
return
def attack ( Bytes = gen_6_bytes ( ) ) :
result_queue = multiprocessing . Queue ( )
stop_event = multiprocessing . Event ( )
processes = [ ]
num_cores = multiprocessing . cpu_count ( )
max_upper_limit = 2 * * 16
chunk_size = max_upper_limit / / num_cores
2024-04-23 12:45:11 +02:00
for i in range ( num_cores - 1 ) :
2024-04-23 10:21:26 +02:00
start = i * chunk_size
end = start + chunk_size
process = multiprocessing . Process ( target = attack_worker , args = ( start , end , Bytes , result_queue , stop_event ) )
processes . append ( process )
process . start ( )
2024-04-23 12:45:11 +02:00
# last process
start = ( num_cores - 1 ) * chunk_size
end = max_upper_limit
process = multiprocessing . Process ( target = attack_worker , args = ( start , end , Bytes , result_queue , stop_event ) )
processes . append ( process )
process . start ( )
2024-04-23 10:21:26 +02:00
for process in processes :
process . join ( )
2024-04-24 09:55:49 +02:00
key = result_queue . get ( )
print ( f ' key found: \t { key } ' )
return [ int ( bit ) for bit in key ]
2024-04-22 19:08:27 +02:00
2024-04-22 19:39:22 +02:00
def test_attack ( n = 1 ) :
success = 0
print ( f ' testing attack in 2^16 against CSS { n } times (keys randomly generated each time) \n ' )
for _ in range ( n ) :
key = [ randint ( 0 , 1 ) for _ in range ( 40 ) ]
key_string = ' ' . join ( str ( bit ) for bit in key )
print ( f ' key generated: \t { key_string } ' )
Bytes = gen_6_bytes ( key )
found_key = attack ( Bytes )
2024-04-24 09:55:49 +02:00
failed = [ ]
2024-04-22 19:39:22 +02:00
if found_key == key :
success + = 1
2024-04-24 09:55:49 +02:00
else :
failed . append ( key )
2024-04-22 19:39:22 +02:00
print ( )
print ( f ' { success } / { n } success ' )
2024-04-24 09:55:49 +02:00
if len ( failed ) > 0 :
print ( " fails: " )
for key in failed :
print ( " " . join ( str ( bit ) for bit in key ) )
def test_fail ( ) :
2024-04-24 22:52:01 +02:00
# happened with 74/36000 keys, probability of ~0.0021
2024-04-24 09:55:49 +02:00
taps17 = [ 0 , 14 ]
taps25 = [ 0 , 3 , 4 , 12 ]
key = [ 1 , 0 , 1 , 0 , 1 , 1 , 0 , 1 , 0 , 0 , 1 , 0 , 1 , 0 , 0 , 0 , 1 , 0 , 0 , 0 , 0 , 0 , 1 , 0 , 1 , 0 , 1 , 0 , 1 , 0 , 0 , 0 , 0 , 0 , 1 , 0 , 0 , 0 , 1 , 1 ]
key2 = [ 1 , 0 , 0 , 0 , 1 , 1 , 1 , 0 , 1 , 1 , 0 , 0 , 1 , 0 , 0 , 0 , 1 , 0 , 1 , 0 , 0 , 0 , 0 , 1 , 0 , 1 , 1 , 0 , 1 , 0 , 0 , 0 , 0 , 0 , 1 , 1 , 1 , 1 , 1 , 0 ]
print ( f ' both of these keys seem to output the same first 6 bytes, it only differs starting from byte 7: \n { " " . join ( str ( bit ) for bit in key ) } \n { " " . join ( str ( bit ) for bit in key2 ) } \n ' )
lfsr17 = lfsr ( key [ : 16 ] + [ 1 ] , taps17 )
lfsr172 = lfsr ( key2 [ : 16 ] + [ 1 ] , taps17 )
lfsr25 = lfsr ( key [ 16 : ] + [ 1 ] , taps25 )
lfsr252 = lfsr ( key2 [ 16 : ] + [ 1 ] , taps25 )
x = [ ]
y = [ ]
x2 = [ ]
y2 = [ ]
for _ in range ( 7 ) :
x_bin = " "
y_bin = " "
x_bin2 = " "
y_bin2 = " "
for _ in range ( 8 ) :
x_bin + = str ( lfsr17 . shift ( ) )
y_bin + = str ( lfsr25 . shift ( ) )
x_bin2 + = str ( lfsr172 . shift ( ) )
y_bin2 + = str ( lfsr252 . shift ( ) )
x . append ( int ( x_bin [ : : - 1 ] , 2 ) )
y . append ( int ( y_bin [ : : - 1 ] , 2 ) )
x2 . append ( int ( x_bin2 [ : : - 1 ] , 2 ) )
y2 . append ( int ( y_bin2 [ : : - 1 ] , 2 ) )
print ( f ' LFSR outputs with 1st key: lfsr17: { x } lfsr25: { y } ' )
print ( f ' LFSR outputs with 2nd key: lfsr17: { x2 } lfsr25: { y2 } \n ' )
c = 0
c2 = 0
for i in range ( 7 ) :
s = x [ i ] + y [ i ] + c
if s > 255 :
c = 1
else :
c = 0
s2 = x2 [ i ] + y2 [ i ] + c2
if s2 > 255 :
c2 = 1
else :
c2 = 0
print ( f ' 8bits addition at step { i } equal for both keys: { s % 256 } = { s2 % 256 } : { s % 256 == s2 % 256 } ' )
text = 0xffffffffffff
cipher = int . from_bytes ( css_encrypt ( text , key ) )
cipher2 = int . from_bytes ( css_encrypt ( text , key ) )
print ( f ' \n encrypting 0xffffffffffff (6 bytes) \n with 1st key: { hex ( cipher ) } \n with 2nd key: { hex ( cipher2 ) } \n both outputs are equal: { cipher == cipher2 } \n ' )
2024-04-22 19:39:22 +02:00
2024-04-22 19:08:27 +02:00
2024-04-24 22:52:01 +02:00
def safer_attack_worker ( start , end , Bytes , result_queue ) :
taps17 = [ 0 , 14 ]
taps25 = [ 0 , 3 , 4 , 12 ]
for i in range ( start , end ) :
lfsr17_init = [ int ( bit ) for bit in bin ( i ) [ 2 : ] . zfill ( 16 ) ] + [ 1 ]
lfsr17 = lfsr ( lfsr17_init , taps17 )
x = [ ]
for _ in range ( 3 ) :
x_bin = " "
for _ in range ( 8 ) :
x_bin + = str ( lfsr17 . shift ( ) )
x . append ( int ( x_bin [ : : - 1 ] , 2 ) )
y = [ ( Bytes [ 0 ] - x [ 0 ] ) % 256 ]
c = 1 if x [ 0 ] + y [ 0 ] > 255 else 0
y . append ( ( Bytes [ 1 ] - ( x [ 1 ] + c ) ) % 256 )
c = 1 if x [ 1 ] + y [ 1 ] > 255 else 0
y . append ( ( Bytes [ 2 ] - ( x [ 2 ] + c ) ) % 256 )
lfsr25_init = [ int ( bit ) for bit in ( bin ( y [ 0 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] + bin ( y [ 1 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] + bin ( y [ 2 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] ) ] + [ 1 ]
lfsr25 = lfsr ( lfsr25_init , taps25 )
for _ in range ( 24 ) :
lfsr25 . shift ( )
for _ in range ( 3 ) :
x_bin = " "
y_bin = " "
for _ in range ( 8 ) :
x_bin + = str ( lfsr17 . shift ( ) )
y_bin + = str ( lfsr25 . shift ( ) )
x . append ( int ( x_bin [ : : - 1 ] , 2 ) )
y . append ( int ( y_bin [ : : - 1 ] , 2 ) )
c = 1 if x [ 2 ] + y [ 2 ] > 255 else 0
z4 = ( x [ 3 ] + y [ 3 ] + c ) % 256
c = 1 if x [ 3 ] + y [ 3 ] > 255 else 0
z5 = ( x [ 4 ] + y [ 4 ] + c ) % 256
c = 1 if x [ 4 ] + y [ 4 ] > 255 else 0
z6 = ( x [ 5 ] + y [ 5 ] + c ) % 256
if z4 == Bytes [ 3 ] and z5 == Bytes [ 4 ] and z6 == Bytes [ 5 ] :
key = bin ( x [ 0 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] + bin ( x [ 1 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] + bin ( y [ 0 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] + bin ( y [ 1 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ] + bin ( y [ 2 ] ) [ 2 : ] . zfill ( 8 ) [ : : - 1 ]
result_queue . put ( key )
def safer_attack ( ask_queue , answer_queue , Bytes = gen_6_bytes ( ) ) :
result_queue = multiprocessing . Queue ( )
processes = [ ]
num_cores = multiprocessing . cpu_count ( )
max_upper_limit = 2 * * 16
chunk_size = max_upper_limit / / num_cores
for i in range ( num_cores - 1 ) :
start = i * chunk_size
end = start + chunk_size
process = multiprocessing . Process ( target = safer_attack_worker , args = ( start , end , Bytes , result_queue ) )
processes . append ( process )
process . start ( )
# last process
start = ( num_cores - 1 ) * chunk_size
end = max_upper_limit
process = multiprocessing . Process ( target = safer_attack_worker , args = ( start , end , Bytes , result_queue ) )
processes . append ( process )
process . start ( )
for process in processes :
process . join ( )
keys = [ ]
while not result_queue . empty ( ) :
key = result_queue . get ( )
print ( f ' key found: \t { key } ' )
keys . append ( key )
if len ( keys ) == 1 :
ask_queue . put ( False )
answer_queue . put ( [ int ( bit ) for bit in keys [ 0 ] ] )
else :
print ( " collision detected " )
ask_queue . put ( True )
Byte7 = answer_queue . get ( )
taps17 = [ 0 , 14 ]
taps25 = [ 0 , 3 , 4 , 12 ]
for key_str in keys :
key = [ int ( bit ) for bit in key_str ]
lfsr17 = lfsr ( key [ : 16 ] + [ 1 ] , taps17 )
lfsr25 = lfsr ( key [ 16 : ] + [ 1 ] , taps25 )
for _ in range ( 40 ) :
lfsr17 . shift ( )
lfsr25 . shift ( )
x = [ ]
y = [ ]
for _ in range ( 2 ) :
x_bin = " "
y_bin = " "
for _ in range ( 8 ) :
x_bin + = str ( lfsr17 . shift ( ) )
y_bin + = str ( lfsr25 . shift ( ) )
x . append ( int ( x_bin [ : : - 1 ] , 2 ) )
y . append ( int ( y_bin [ : : - 1 ] , 2 ) )
c = 1 if x [ 0 ] + y [ 0 ] > 255 else 0
z7 = ( x [ 1 ] + y [ 1 ] + c ) % 256
if z7 == Byte7 :
print ( f ' kept key: \t { key_str } ' )
answer_queue . put ( key )
return
def test_safer_attack ( n = 1 ) :
success = 0
ask_queue = multiprocessing . Queue ( )
answer_queue = multiprocessing . Queue ( )
print ( f ' testing safer attack in 2^16 against CSS { n } times (keys randomly generated each time) \n ' )
for _ in range ( n ) :
key = [ randint ( 0 , 1 ) for _ in range ( 40 ) ]
# key = [1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0]
key_string = ' ' . join ( str ( bit ) for bit in key )
print ( f ' key generated: \t { key_string } ' )
Bytes = gen_6_bytes ( key )
process = multiprocessing . Process ( target = safer_attack , args = ( ask_queue , answer_queue , Bytes ) )
process . start ( )
failed = [ ]
additional_byte = ask_queue . get ( )
if additional_byte : # give access to an additional byte if asked
print ( " giving an additional byte " )
text = b ' \x00 \x00 \x00 \x00 \x00 \x00 \x00 '
cipher = css_encrypt ( text , key )
answer_queue . put ( cipher [ - 1 ] )
found_key = answer_queue . get ( )
if found_key == key :
success + = 1
else :
failed . append ( key )
print ( )
print ( f ' { success } / { n } success ' )
if len ( failed ) > 0 :
print ( " fails: " )
for key in failed :
print ( " " . join ( str ( bit ) for bit in key ) )
2024-04-21 15:28:24 +02:00
test_lfsr17 ( )
2024-04-24 09:55:49 +02:00
print ( " \n " )
2024-04-21 17:48:31 +02:00
test_encrypt ( )
2024-04-24 09:55:49 +02:00
print ( " \n " )
# time to test attack with 100 keys
# CPU: 6 cores at 3.8GHz
# ~65 seconds
2024-04-26 12:11:26 +02:00
test_attack ( 5 )
2024-04-24 09:55:49 +02:00
print ( " \n " )
test_fail ( )
2024-04-24 22:52:01 +02:00
print ( " \n " )
test_safer_attack ( 5 )