1085 lines
39 KiB
Python
Executable File
1085 lines
39 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
"""
|
|
* File: claes
|
|
* Version : 1.0
|
|
* License : BSD
|
|
*
|
|
* Copyright (c) 2022
|
|
* Ralf Senderek, Ireland. All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions
|
|
* are met:
|
|
*
|
|
* 1. Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* 3. All advertising materials mentioning features or use of this software
|
|
* must display the following acknowledgement:
|
|
* This product includes software developed by Ralf Senderek.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
|
|
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
|
|
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
|
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
|
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
|
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
|
* SUCH DAMAGE.
|
|
*
|
|
"""
|
|
|
|
|
|
import sys, os
|
|
from binascii import *
|
|
|
|
# error return codes
|
|
ERR_CL = -1
|
|
OK = 0
|
|
ERR_NOBYTES = 1
|
|
ERR_PERM = 2
|
|
ERR_PASSWORD = 3
|
|
ERR_INSTALL = 4
|
|
ERR_WRONGKEY = 5
|
|
ERR_DECODE = 6
|
|
ERR_SIZE = 7
|
|
ERR_ENCRYPT = 8
|
|
ERR_DECRYPT = 9
|
|
ERR_CORRUPT = 10
|
|
ERR_INCOMPLETE = 11
|
|
ERR_INPUT = 12
|
|
ERR_DATATYPE = 13
|
|
|
|
try:
|
|
from cryptlib_py import *
|
|
except:
|
|
ERR_IMPORT = """
|
|
The python3 library is not installed. You need to install
|
|
the packages cryptlib-python3 and cryptlib.
|
|
You will find them for a variety of operating systems here:
|
|
https://senderek.ie/cryptlib
|
|
or in the Fedora repository.
|
|
"""
|
|
print( ERR_IMPORT )
|
|
exit( ERR_INSTALL )
|
|
|
|
Version = "1.0"
|
|
DEBUG = False
|
|
BINARY = False
|
|
|
|
MaxBytes = 150000000
|
|
MaxBufferSize = MaxBytes + 4000
|
|
Source = "file"
|
|
Mode = "pgp"
|
|
Text = ""
|
|
InputBytes = ""
|
|
AESblocksize = 16
|
|
NumberOfBlocks = 0
|
|
|
|
# we try to read all bytes in a single pass
|
|
ReadMoreBytes = False
|
|
FileName = ""
|
|
MinPasswordLength = 8
|
|
MaxPasswordLength = 64
|
|
DECRYPTION = False
|
|
ENVELOPE = True
|
|
KEY128 = False
|
|
|
|
ASKPASS = "/bin/systemd-ask-password"
|
|
if not os.path.isfile(ASKPASS) :
|
|
print ("Error: Please install " + ASKPASS + " to ensure safe password input")
|
|
exit(ERR_INSTALL)
|
|
|
|
#-------------------------------------------------#
|
|
def print_help():
|
|
Help = """
|
|
claes encrypts or decrypts data in OpenPGP, CMS or OpenSSL format using files or standard input
|
|
with a passphrase-based AES cipher.
|
|
|
|
usage: claes [-debug] [-cms | -openssl [-128]] [OPTION] [FILE | -]
|
|
|
|
If no FILE or "-" is given, data is read from standard input. The input size is limited to 150 MByte.
|
|
|
|
Options are:
|
|
-help display this message
|
|
-version display version information
|
|
-debug print debugging information to stderr
|
|
-cms produce CMS enveloped and encrypted data instead of OpenPGP
|
|
-openssl produce encrypted data using pbkdf2 in openssl format
|
|
-128 forces the use of 128 bit AES keys with -openssl
|
|
(256 bits is the default)
|
|
-decrypt decrypts an encrypted message (default is encrypt)
|
|
|
|
Full documentation <https://senderek.ie/cryptlib/tools>
|
|
|
|
This program depends on two packages providing the cryptlib shared object library
|
|
and the python3-bindings to this library.
|
|
|
|
You can download both packages in RPM and DEB format at:
|
|
https://senderek.ie/cryptlib/downloads
|
|
|
|
Or in FEDORA you can install the packages cryptlib and cryptlib-python3
|
|
directly from the repository.
|
|
|
|
In addition the program /bin/systemd-ask-password is used to read sensible data from
|
|
stdin. This program is part of the systemd package.
|
|
|
|
INTEROPERABILITY
|
|
|
|
gpg2: Without any options claes produces OpenPGP (base64-encoded) encrypted messages using
|
|
AES-128.
|
|
It can decrypt any message produced by GnuPG with the following ciphers:
|
|
AES, AES192, AES256, 3DES and CAST-128.
|
|
|
|
openssl: In OpenSSL mode claes writes (base64-encoded) encrypted messages in the proprietary
|
|
OpenSSL format using AES256 as the default.
|
|
|
|
These messages can be decrypted with openssl :
|
|
openssl aes-256-cbc -pbkdf2 -d -a -in FILE.asc
|
|
|
|
The use of AES-128 can be forced by the additional option -128.
|
|
"""
|
|
print( Help )
|
|
|
|
#-----------------------------------------------------------#
|
|
def print_debug ( message ):
|
|
if DEBUG:
|
|
sys.stderr.write( "Debug: " )
|
|
sys.stderr.write( message + "\n" )
|
|
|
|
#-----------------------------------------------------------#
|
|
def get_proper_filename():
|
|
global OutFilename
|
|
|
|
if (not DECRYPTION) :
|
|
if (Source == "file") :
|
|
if ("cms" in Mode) :
|
|
OutFilename = FileName + ".cms"
|
|
else:
|
|
if (BINARY) :
|
|
OutFilename = FileName + ".gpg"
|
|
else:
|
|
OutFilename = FileName + ".asc"
|
|
if (os.path.isfile(OutFilename)) :
|
|
RET = input("Overwrite " + OutFilename + " ? [y/n] ")
|
|
if (RET != "y") :
|
|
OutFilename = input("File name to write : ")
|
|
else:
|
|
# source is standard input, write to claes.asc
|
|
OutFilename = "./claes.asc"
|
|
else:
|
|
# Decryption
|
|
if (Source == "file") :
|
|
if ((FileName[-4:] == ".asc") or (FileName[-4:] == ".cms") or (FileName[-4:] == ".gpg") or (FileName[-4:] == ".pgp")) :
|
|
OutFilename = FileName[:-4]
|
|
else:
|
|
# get a proper file name to write to
|
|
OutFilename = input("File name to write : ")
|
|
if (os.path.isfile(OutFilename)) :
|
|
RET = input("Overwrite " + OutFilename + " ? [y/n] ")
|
|
if (RET != "y") :
|
|
OutFilename = input("Filename to write : ")
|
|
|
|
#-----------------------------------------------------------#
|
|
def unix (command) :
|
|
if os.name == "posix" :
|
|
Pipe = os.popen(command, "r")
|
|
Result = Pipe.read()
|
|
Pipe.close()
|
|
return Result
|
|
|
|
#-----------------------------------------------#
|
|
def crc24(data):
|
|
|
|
# input bytearray, output int
|
|
|
|
INIT = 0xB704CE
|
|
POLY = 0x1864CFB
|
|
crc = INIT
|
|
for c in data :
|
|
octet = c
|
|
crc ^= (octet << 16)
|
|
for i in range(8) :
|
|
crc <<= 1
|
|
if crc & 0x1000000 :
|
|
crc ^= POLY
|
|
return crc & 0xFFFFFF
|
|
|
|
#-----------------------------------------------#
|
|
def crc24_encoding(buff):
|
|
# input string
|
|
|
|
C = crc24( buff )
|
|
BC = C.to_bytes(3,'big')
|
|
return "=" + str( b2a_base64(BC).decode() )
|
|
|
|
#-----------------------------------------------#
|
|
def write_pgp_message(buff, pathname):
|
|
# writes an encrypted bytearray into a file
|
|
try:
|
|
F = open(pathname,'w')
|
|
|
|
F.write("-----BEGIN PGP MESSAGE-----\n")
|
|
F.write("Version: claes " + Version + " with cryptlib "+ CryptlibVersion +"\n\n")
|
|
ASCII = b2a_base64(buff)
|
|
ASCII = ASCII[:-1]
|
|
#print("ASCII : "+str(ASCII))
|
|
i = 0
|
|
while i < len(ASCII) :
|
|
line = ASCII[i:i+64]
|
|
i = i + 64
|
|
F.write(line.decode())
|
|
if i < len(ASCII) :
|
|
F.write("\n")
|
|
F.write("\n")
|
|
F.write(crc24_encoding(buff))
|
|
F.write("-----END PGP MESSAGE-----\n")
|
|
unix("chmod 600 " + pathname)
|
|
except:
|
|
print (str( sys.exc_info()[0]) )
|
|
print ("Error: cannot write to " + pathname)
|
|
|
|
#-----------------------------------------------#
|
|
def write_openssl_message(salt, buff, pathname):
|
|
# writes a CMS encrypted buffer into a file
|
|
try:
|
|
F = open(pathname,'w')
|
|
|
|
OUT = bytearray(b'Salted__')
|
|
OUT.extend(salt)
|
|
OUT.extend(buff)
|
|
ASCII = b2a_base64(OUT)
|
|
i = 0
|
|
while i < len(ASCII) :
|
|
line = ASCII[i:i+64]
|
|
i = i + 64
|
|
F.write(line.decode())
|
|
if i < len(ASCII) :
|
|
F.write("\n")
|
|
unix("chmod 600 " + pathname)
|
|
except:
|
|
print (str( sys.exc_info()[0]) )
|
|
print ("Error: cannot write to " + pathname)
|
|
|
|
#-----------------------------------------------#
|
|
def write_cms_message(buff, pathname):
|
|
# writes a CMS encrypted buffer into a file
|
|
try:
|
|
F = open(pathname,'w')
|
|
|
|
F.write("-----BEGIN CMS-----\n")
|
|
ASCII = b2a_base64(buff)
|
|
i = 0
|
|
while i < len(ASCII) :
|
|
line = ASCII[i:i+64]
|
|
i = i + 64
|
|
F.write(line.decode())
|
|
if i < len(ASCII) :
|
|
F.write("\n")
|
|
F.write("-----END CMS-----\n")
|
|
unix("chmod 600 " + pathname)
|
|
except:
|
|
print (str( sys.exc_info()[0]) )
|
|
print ("Error: cannot write to " + pathname)
|
|
|
|
#-----------------------------------------------#
|
|
def analyze_PGP_data( data ):
|
|
|
|
global BINARY
|
|
print ("Trying to read OpenPGP data")
|
|
start = end = 0
|
|
PGP_BEGIN = bytearray(b"-----BEGIN PGP MESSAGE-----\n")
|
|
PGP_END = bytearray(b"-----END PGP MESSAGE-----\n")
|
|
begin = False
|
|
Length = len( data )
|
|
ASCII = bytearray()
|
|
|
|
i = j = 0
|
|
while ( (not begin) and (i < Length) ) :
|
|
while ((i < Length) and (data[i] != 45)) :
|
|
i = i + 1
|
|
if (i < Length) :
|
|
begin = True
|
|
# hit first -
|
|
j = 0
|
|
while ((j < (len(PGP_BEGIN) -1)) and begin and (i < Length)) :
|
|
if (data[i] != PGP_BEGIN[j]) :
|
|
begin = False
|
|
i = i + 1
|
|
j = j + 1
|
|
if (begin) :
|
|
# skip version line, if it exists
|
|
i = i + 1
|
|
if (data[i] == 86) :
|
|
while ((i < Length) and (data[i] != 10)) :
|
|
i = i + 1
|
|
i = i + 1
|
|
start = i
|
|
# found beginning of the block
|
|
# find -----END
|
|
begin = False
|
|
while ( (not begin) and (i < Length) ) :
|
|
while ((i < Length) and (data[i] != 45)) :
|
|
i = i + 1
|
|
if (i < Length) :
|
|
begin = True
|
|
# hit first -
|
|
j = 0
|
|
while ((j < (len(PGP_END) -1)) and begin and (i < Length)) :
|
|
if (data[i] != PGP_END[j]) :
|
|
begin = False
|
|
i = i + 1
|
|
j = j + 1
|
|
if (begin) :
|
|
end = i - len(PGP_END) -6
|
|
# determine CRC code
|
|
|
|
# copy start to end to ASCII block
|
|
i = start
|
|
j = 0
|
|
while (i < end) :
|
|
if (data[i] != 10) :
|
|
ASCII.append( data[i] )
|
|
j = j + 1
|
|
i = i + 1
|
|
ASCII.append(58)
|
|
ASCII.append(data[end+2])
|
|
ASCII.append(data[end+3])
|
|
ASCII.append(data[end+4])
|
|
ASCII.append(data[end+5])
|
|
|
|
if ( (start == 0) and (end == 0) ) :
|
|
# data is probably binary input
|
|
print_debug("Found binary input data.")
|
|
BINARY = True
|
|
return data
|
|
|
|
return ASCII
|
|
|
|
#-----------------------------------------------#
|
|
def analyze_CMS_data( data ):
|
|
|
|
global BINARY
|
|
print ("Trying to read CMS data")
|
|
start = end = 0
|
|
CMS_BEGIN = bytearray(b"-----BEGIN CMS-----\n")
|
|
CMS_END = bytearray(b"-----END CMS-----\n")
|
|
begin = False
|
|
Length = len( data )
|
|
ASCII = bytearray()
|
|
|
|
if (Mode == "openssl") :
|
|
# copy start to end to ASCII block
|
|
i = start
|
|
j = 0
|
|
while (i < len (Data)) :
|
|
if (data[i] != 10) :
|
|
ASCII.append( data[i] )
|
|
i = i + 1
|
|
# check if data is base64("Sal")
|
|
if ((data[0] == 83) and (data[1] == 97) and data[2] == 108) :
|
|
print_debug("Found binary data")
|
|
# use the original data input
|
|
BINARY = True
|
|
ASCII = Data
|
|
|
|
else:
|
|
i = j = 0
|
|
while ( (not begin) and (i < Length) ) :
|
|
while ((i < Length) and (data[i] != 45)) :
|
|
i = i + 1
|
|
if (i < Length) :
|
|
begin = True
|
|
# hit first -
|
|
j = 0
|
|
while ((j < (len(CMS_BEGIN) -1)) and begin and (i < Length)) :
|
|
if (data[i] != CMS_BEGIN[j]) :
|
|
begin = False
|
|
i = i + 1
|
|
j = j + 1
|
|
if (begin) :
|
|
i = i + 1
|
|
start = i
|
|
# found beginning of the block
|
|
# find -----END
|
|
begin = False
|
|
while ( (not begin) and (i < Length) ) :
|
|
while ((i < Length) and (data[i] != 45)) :
|
|
i = i + 1
|
|
if (i < Length) :
|
|
begin = True
|
|
# hit first -
|
|
j = 0
|
|
while ((j < (len(CMS_END) -1)) and begin and (i < Length)) :
|
|
if (data[i] != CMS_END[j]) :
|
|
begin = False
|
|
i = i + 1
|
|
j = j + 1
|
|
if (begin) :
|
|
end = i - len(CMS_END)
|
|
# copy start to end to ASCII block
|
|
i = start
|
|
j = 0
|
|
while (i < end) :
|
|
if (data[i] != 10) :
|
|
ASCII.append( data[i] )
|
|
i = i + 1
|
|
ASCII.append(58)
|
|
ASCII.append(110)
|
|
ASCII.append(111)
|
|
ASCII.append(110)
|
|
ASCII.append(101)
|
|
|
|
return ASCII
|
|
|
|
#-----------------------------------------------------------#
|
|
def get_random_bytes ( num ):
|
|
# this function does not need to produce cryptographically secure random numbers
|
|
try:
|
|
from random import randbytes
|
|
return randbytes( num )
|
|
except:
|
|
RandomBuffer = bytearray(b' '*num)
|
|
RandomContext_object = cryptCreateContext( cryptUser, CRYPT_ALGO_AES )
|
|
RandomContext = int( RandomContext_object )
|
|
cryptSetAttribute( RandomContext, CRYPT_CTXINFO_MODE, CRYPT_MODE_CFB )
|
|
cryptGenerateKey( RandomContext )
|
|
cryptEncrypt( RandomContext, RandomBuffer )
|
|
cryptDestroyContext( RandomContext )
|
|
return RandomBuffer
|
|
|
|
#-----------------------------------------------------------#
|
|
def pbkdf2 ( salt ):
|
|
from hashlib import pbkdf2_hmac
|
|
|
|
iterations = 10000
|
|
# the password is available globally
|
|
if ( Mode == "openssl" ) and salt :
|
|
#generate session key and iv from password and salt using pbkdf2
|
|
# 256 bit AES is the default
|
|
dk = pbkdf2_hmac('sha256', password, salt, iterations, 48)
|
|
if (KEY128) :
|
|
# use 256 bit AES key
|
|
dk = pbkdf2_hmac('sha256', password, salt, iterations, 32)
|
|
KeyandIV = bytearray()
|
|
KeyandIV.extend( dk )
|
|
return KeyandIV
|
|
return ""
|
|
|
|
#-----------------------------------------------------------#
|
|
def envelope_info():
|
|
global Envelope
|
|
|
|
# check the ALGO used
|
|
RESULT = bytearray()
|
|
ALGO = bytearray(b' ')
|
|
cryptGetAttributeString( Envelope, CRYPT_CTXINFO_NAME_ALGO, ALGO )
|
|
i = 0
|
|
while ( (ALGO[i] != 32) and (i < len(ALGO)) ) :
|
|
RESULT.append(ALGO[i])
|
|
i = i + 1
|
|
RESULT.extend(b' ')
|
|
KEYSIZE = bytearray()
|
|
KEYSIZE = cryptGetAttribute( Envelope, CRYPT_CTXINFO_KEYSIZE )
|
|
RESULT.extend(str(KEYSIZE*8).encode())
|
|
RESULT.extend(b' ')
|
|
MODE = bytearray(b' ')
|
|
cryptGetAttributeString( Envelope, CRYPT_CTXINFO_NAME_MODE, MODE )
|
|
RESULT.extend(MODE)
|
|
print(RESULT.decode())
|
|
|
|
#-----------------------------------------------------------#
|
|
def context_info():
|
|
global AESContext
|
|
|
|
# check the ALGO used
|
|
RESULT = bytearray()
|
|
ALGO = bytearray(b' ')
|
|
cryptGetAttributeString( AESContext, CRYPT_CTXINFO_NAME_ALGO, ALGO )
|
|
i = 0
|
|
while ( (ALGO[i] != 32) and (i < len(ALGO)) ) :
|
|
RESULT.append(ALGO[i])
|
|
i = i + 1
|
|
RESULT.extend(b' ')
|
|
KEYSIZE = bytearray()
|
|
KEYSIZE = cryptGetAttribute( AESContext, CRYPT_CTXINFO_KEYSIZE )
|
|
RESULT.extend(str(KEYSIZE*8).encode())
|
|
RESULT.extend(b' ')
|
|
MODE = bytearray(b' ')
|
|
cryptGetAttributeString( AESContext, CRYPT_CTXINFO_NAME_MODE, MODE )
|
|
RESULT.extend(MODE)
|
|
print(RESULT.decode())
|
|
|
|
#-----------------------------------------------------------#
|
|
def clean_envelope():
|
|
global Envelope
|
|
global password
|
|
try:
|
|
print_debug("Cleaning envelope before exit")
|
|
password = get_random_bytes( len(password) )
|
|
cryptDestroyEnvelope( Envelope )
|
|
cryptEnd()
|
|
except:
|
|
pass
|
|
|
|
#-----------------------------------------------------------#
|
|
def clean_context():
|
|
global AESContext
|
|
global password
|
|
try:
|
|
print_debug("Cleaning context before exit")
|
|
password = get_random_bytes( len(password) )
|
|
cryptDestroyContext( AESContext )
|
|
cryptEnd()
|
|
except:
|
|
pass
|
|
|
|
|
|
#############################################################
|
|
if ( len(sys.argv) > 1 ):
|
|
# legitimate options or a file name is in the parameter list
|
|
|
|
if "-debug" in sys.argv :
|
|
DEBUG = True
|
|
sys.argv.remove( "-debug" )
|
|
|
|
if "-cms" in sys.argv :
|
|
Mode = "cms"
|
|
sys.argv.remove( "-cms" )
|
|
|
|
if "-openssl" in sys.argv :
|
|
Mode = "openssl"
|
|
LowLevelCrypto = True
|
|
ENVELOPE = False
|
|
sys.argv.remove( "-openssl" )
|
|
if "-128" in sys.argv :
|
|
KEY128 = True
|
|
sys.argv.remove( "-128" )
|
|
|
|
if "-help" in sys.argv :
|
|
print_help()
|
|
exit( OK )
|
|
|
|
if "-version" in sys.argv :
|
|
print ( Version )
|
|
exit( OK )
|
|
|
|
if "-decrypt" in sys.argv :
|
|
DECRYPTION = True
|
|
MaxBytes = 200000000
|
|
MaxBufferSize = MaxBytes + 4000
|
|
sys.argv.remove( "-decrypt" )
|
|
|
|
# all options are processed
|
|
|
|
if len(sys.argv) > 1 :
|
|
if sys.argv[1] == "-":
|
|
try:
|
|
Text = sys.stdin.read( MaxBytes )
|
|
Source = "stdin"
|
|
FileName = "-"
|
|
except:
|
|
exit ( ERR_PERM )
|
|
|
|
elif os.path.isfile(sys.argv[1]) :
|
|
FileName = sys.argv[1]
|
|
try:
|
|
F = open( FileName, "rb" )
|
|
InputBytes = F.read( MaxBytes )
|
|
F.close()
|
|
except:
|
|
print( "cannot open file " + str(FileName) )
|
|
exit ( ERR_PERM )
|
|
|
|
else:
|
|
print ("No such file: " + sys.argv[1] )
|
|
exit ( ERR_INPUT )
|
|
else:
|
|
try:
|
|
Text = sys.stdin.read( MaxBytes )
|
|
Source = "stdin"
|
|
FileName = "-"
|
|
except:
|
|
exit ( ERR_PERM )
|
|
|
|
|
|
OutFilename = FileName
|
|
get_proper_filename ()
|
|
|
|
##### Begin Cryptlib code #####
|
|
cryptInit()
|
|
|
|
# get Cryptlib Version
|
|
Major = cryptGetAttribute(CRYPT_UNUSED, CRYPT_OPTION_INFO_MAJORVERSION)
|
|
Minor = cryptGetAttribute(CRYPT_UNUSED, CRYPT_OPTION_INFO_MINORVERSION)
|
|
Step = cryptGetAttribute(CRYPT_UNUSED, CRYPT_OPTION_INFO_STEPPING)
|
|
CryptlibVersion = str(Major)+"."+str(Minor)+"."+str(Step)
|
|
|
|
cryptUser = CRYPT_UNUSED
|
|
|
|
if (not DECRYPTION) :
|
|
if Mode == "pgp" :
|
|
Envelope_object = cryptCreateEnvelope( cryptUser, CRYPT_FORMAT_PGP )
|
|
else:
|
|
Envelope_object = cryptCreateEnvelope( cryptUser, CRYPT_FORMAT_CMS )
|
|
else:
|
|
Envelope_object = cryptCreateEnvelope( cryptUser, CRYPT_FORMAT_AUTO )
|
|
|
|
# now an Envelope_object exists
|
|
try:
|
|
Envelope = int( Envelope_object )
|
|
except:
|
|
print ("Cryptlib error.")
|
|
cryptEnd()
|
|
exit (ERR_CL)
|
|
|
|
# get Data. Data must be modifiable Buffer
|
|
Data = bytearray()
|
|
if Text:
|
|
Data.extend( Text.encode() )
|
|
# randomize Text
|
|
Text = get_random_bytes( len (Text))
|
|
else:
|
|
Data.extend( InputBytes )
|
|
# randomize InputBytes
|
|
InputBytes = get_random_bytes( len (InputBytes) )
|
|
|
|
# read a user-supplied passphrase of sufficient quality
|
|
# because either encryption or decryption will use it anyway
|
|
|
|
password = bytearray()
|
|
password.extend( unix(ASKPASS).encode() )
|
|
if len( password ) >= MinPasswordLength and len( password ) <= MaxPasswordLength :
|
|
password = password[:-1]
|
|
if ( ENVELOPE and (not DECRYPTION) ) :
|
|
try:
|
|
# add the encryption password to the envelope
|
|
cryptSetAttributeString( Envelope, CRYPT_ENVINFO_PASSWORD, password )
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
if (status == CRYPT_ERROR_WRONGKEY) :
|
|
print("Error: " + message)
|
|
clean_envelope()
|
|
exit( ERR_WRONGKEY )
|
|
# randomize password buffer as it is no longer needed
|
|
password = get_random_bytes( len(password) )
|
|
print_debug (str(len(password)) + " bytes used as password")
|
|
else:
|
|
print ("Error: Your password must have at least " + str(MinPasswordLength) + " characters. Nothing done.")
|
|
clean_envelope()
|
|
# terminate the program
|
|
exit (ERR_PASSWORD)
|
|
|
|
if (not DECRYPTION) :
|
|
# ENCRYPTION
|
|
# expand the internal buffer which is set to 32K by default. This limits the input data size.
|
|
try:
|
|
cryptSetAttribute( Envelope, CRYPT_ATTRIBUTE_BUFFERSIZE, MaxBufferSize )
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
if status != CRYPT_ENVELOPE_RESOURCE :
|
|
print( "Error: cannot set BufferSize to " + str(MaxBufferSize) )
|
|
clean_envelope()
|
|
exit( ERR_SIZE )
|
|
|
|
# encrypt the input
|
|
if Data :
|
|
print ( "Performing encryption of input data" )
|
|
else:
|
|
print( "Your message is empty. Nothing to encrypt." )
|
|
clean_envelope()
|
|
exit( ERR_SIZE )
|
|
|
|
Buffer = bytearray()
|
|
if (ENVELOPE) :
|
|
# the password has been supplied already to the envelope
|
|
if (len( Data ) < MaxBytes) :
|
|
# we only need one pass, no looping required
|
|
print_debug ( "processing " + str( len( Data)) + " bytes of input data")
|
|
cryptSetAttribute( Envelope, CRYPT_ENVINFO_DATASIZE, len( Data ) )
|
|
try:
|
|
bytesCopied = cryptPushData( Envelope, Data )
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
if status != CRYPT_ENVELOPE_RESOURCE :
|
|
print( "Your message is too large. The limit is " + str(MaxBufferSize-4000) )
|
|
clean_envelope()
|
|
exit( ERR_SIZE )
|
|
|
|
print_debug ("Pushed " +str(bytesCopied) + " bytes into the envelope")
|
|
|
|
if len( Data ) != bytesCopied :
|
|
# user information and proceed
|
|
print ( "Error: message did not fit into the envelope completely." )
|
|
|
|
try:
|
|
cryptFlushData( Envelope )
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
if status != CRYPT_ENVELOPE_RESOURCE :
|
|
print( "Encryption error: " + message )
|
|
clean_envelope()
|
|
exit( ERR_ENCRYPT )
|
|
|
|
# randomize cleartext data
|
|
Data = get_random_bytes( len (Data) )
|
|
|
|
# prepare the cryptogram
|
|
DataBufferSize = MaxBytes
|
|
envelopedData = bytearray( b' ' * DataBufferSize )
|
|
bytesCopied = cryptPopData( Envelope, envelopedData, DataBufferSize )
|
|
print_debug ("Retrieving " + str(bytesCopied) + " encrypted bytes from envelope")
|
|
Buffer = envelopedData[:bytesCopied]
|
|
# Buffer holds the encrypted data
|
|
else:
|
|
print_debug( "Input is too big" )
|
|
ReadMoreBytes = True
|
|
clean_envelope()
|
|
exit( ERR_SIZE )
|
|
|
|
else:
|
|
# USE ONLY CRYPT-CONTEXT and NO ENVELOPES
|
|
if (Mode == "openssl") :
|
|
# get an AEScontext from a password and salt
|
|
crypt_object = cryptCreateContext( cryptUser , CRYPT_ALGO_AES )
|
|
AESContext = int ( crypt_object )
|
|
# get 8 bytes of random data for the salt
|
|
SALT = bytearray()
|
|
# USE internal cryptContext to generate SALT
|
|
try:
|
|
SaltBuffer = bytearray(b'Cryptlib')
|
|
SaltContext_object = cryptCreateContext( cryptUser, CRYPT_ALGO_AES )
|
|
SaltContext = int( SaltContext_object )
|
|
cryptSetAttribute( SaltContext, CRYPT_CTXINFO_MODE, CRYPT_MODE_CFB )
|
|
cryptGenerateKey( SaltContext )
|
|
cryptEncrypt( SaltContext, SaltBuffer )
|
|
cryptDestroyContext( SaltContext )
|
|
SALT.extend(SaltBuffer[:8])
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
print( "CL random failed" + message )
|
|
|
|
keyandiv = pbkdf2( SALT )
|
|
sessionkey = keyandiv[:32]
|
|
iv = keyandiv[32:]
|
|
if (KEY128) :
|
|
sessionkey = keyandiv[:16]
|
|
iv = keyandiv[16:]
|
|
|
|
# make sure that len(Data) is a multiple of the AES Blocksize
|
|
# use PKCS#7 padding
|
|
NumberOfBlocks = int( len(Data)/AESblocksize )
|
|
reminder = int ( AESblocksize - (len(Data) % AESblocksize) )
|
|
for i in range(reminder) :
|
|
Data.append( reminder )
|
|
NumberOfBlocks = NumberOfBlocks + 1
|
|
|
|
if (sessionkey) :
|
|
cryptSetAttribute( AESContext, CRYPT_CTXINFO_MODE, CRYPT_MODE_CBC )
|
|
cryptSetAttribute( AESContext, CRYPT_CTXINFO_KEYSIZE, AESblocksize )
|
|
cryptSetAttributeString( AESContext, CRYPT_CTXINFO_KEY, sessionkey )
|
|
cryptSetAttributeString( AESContext, CRYPT_CTXINFO_IV, iv )
|
|
# encrypt Data in the context
|
|
try:
|
|
status = cryptEncrypt( AESContext, Data )
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
print_debug( "Encryption error while encrypting data ...")
|
|
print_debug(str(status) + message )
|
|
clean_context()
|
|
exit( ERR_ENCRYPT )
|
|
|
|
# if encryption was successful, the encrypted data is in-place
|
|
cryptDestroyContext( AESContext )
|
|
print_debug( "Retrieving " + str(len(Data)) + " encrypted bytes." )
|
|
|
|
Buffer = Data
|
|
# Buffer holds the encrypted data
|
|
|
|
if ( Buffer ) :
|
|
# write the encrypted Buffer into the file system
|
|
if Mode == "pgp" :
|
|
print("writing " + OutFilename)
|
|
write_pgp_message(Buffer , OutFilename)
|
|
elif Mode == "openssl" :
|
|
print("writing " + OutFilename)
|
|
write_openssl_message(SALT, Buffer , OutFilename)
|
|
else :
|
|
print("writing " + OutFilename)
|
|
write_cms_message(Buffer , OutFilename)
|
|
else:
|
|
# ENCRYPTION failed
|
|
print ("Encryption failed.")
|
|
clean_context()
|
|
exit ( ERR_ENCRYPT )
|
|
|
|
else:
|
|
# DECRYPTION
|
|
print ("Performing decryption of input data")
|
|
# get the raw data from ascii armoured Data
|
|
print_debug( "decrypting " + str(len(Data)) + " bytes of input" )
|
|
if ( Mode == "pgp" ) :
|
|
Data = analyze_PGP_data( Data )
|
|
else:
|
|
Data = analyze_CMS_data( Data )
|
|
|
|
# expand the internal buffer which is set to 32K by default. This limits the input data size
|
|
try:
|
|
cryptSetAttribute( Envelope, CRYPT_ATTRIBUTE_BUFFERSIZE, MaxBufferSize )
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
if status != CRYPT_ENVELOPE_RESOURCE :
|
|
print( "Cannot set BufferSize to " + str(MaxBufferSize) )
|
|
clean_envelope()
|
|
exit( ERR_SIZE )
|
|
|
|
Cleartext = bytearray()
|
|
if (ENVELOPE) :
|
|
# the password has been supplied but not added to the decryption envelope
|
|
|
|
if len( Data ) <= MaxBytes :
|
|
# we only need one pass, no looping required
|
|
if (not BINARY) :
|
|
ASCII = Data[:-5]
|
|
CRC = Data[-4:]
|
|
|
|
# base64 decode ASCII
|
|
try:
|
|
Buffer = a2b_base64( ASCII )
|
|
except:
|
|
print ( "Error: cannot decode message block" )
|
|
clean_envelope()
|
|
exit (ERR_DECODE)
|
|
|
|
else:
|
|
# binary input
|
|
Buffer = Data
|
|
|
|
print_debug ( "Processing " + str( len( Buffer ) ) + " bytes of input data" )
|
|
|
|
if ( (Mode == "pgp") and (not BINARY) ) :
|
|
# check the CRC24 on the blob.
|
|
Checksum = bytearray()
|
|
S = crc24_encoding( Buffer )[1:-1]
|
|
Checksum.extend( S.encode() )
|
|
if ( Checksum != CRC ) :
|
|
print( "Integrity check failed." )
|
|
|
|
# decrypt the buffer
|
|
# push buffer into envelope
|
|
if ( len(Buffer) > 0 ) :
|
|
try:
|
|
bytesCopied = cryptPushData( Envelope, Buffer )
|
|
except CryptException as e :
|
|
# catch the advisory exception, that the key is still missing
|
|
status, message = e.args
|
|
if status != CRYPT_ENVELOPE_RESOURCE :
|
|
print( "Decryption error while pushing bytes into the envelope. " + message )
|
|
print( "Possibly inconsistent PGP message or unsupported crypto." )
|
|
clean_envelope()
|
|
exit( ERR_DECRYPT )
|
|
else:
|
|
# nothing to decrypt
|
|
print( "Error: no valid input found." )
|
|
clean_envelope()
|
|
exit( ERR_DECODE )
|
|
|
|
try:
|
|
status = cryptFlushData( Envelope )
|
|
print_debug( "Flushed." )
|
|
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
print_debug( "Flushing data ... " + message )
|
|
if (status == CRYPT_ERROR_WRONGKEY) :
|
|
print( "Error: " + message )
|
|
clean_envelope()
|
|
exit( ERR_WRONGKEY )
|
|
|
|
# insert the password
|
|
try:
|
|
cryptSetAttributeString( Envelope, CRYPT_ENVINFO_PASSWORD, password )
|
|
# randomize password buffer
|
|
password = get_random_bytes( len(password) )
|
|
print_debug ( str(len(password)) + " bytes used as password" )
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
if (status == CRYPT_ERROR_WRONGKEY) :
|
|
print( "Error: " + message )
|
|
clean_envelope()
|
|
exit( ERR_WRONGKEY )
|
|
|
|
# check the ALGO used
|
|
envelope_info()
|
|
|
|
DataBufferSize = MaxBytes
|
|
Cleartext = bytearray( b' ' * DataBufferSize )
|
|
|
|
try:
|
|
bytesCopied = cryptPopData( Envelope, Cleartext, DataBufferSize )
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
print( "Decryption error: " + message )
|
|
cryptDestroyEnvelope( Envelope )
|
|
cryptEnd()
|
|
exit( ERR_DECRYPT )
|
|
|
|
print_debug ( str(bytesCopied) + " decrypted bytes retrieved from envelope" )
|
|
|
|
Cleartext = Cleartext[:bytesCopied]
|
|
# Cleartext holds the decrypted data
|
|
|
|
ContentType = bytearray()
|
|
ContentType = cryptGetAttribute( Envelope, CRYPT_ENVINFO_CONTENTTYPE )
|
|
|
|
if (ContentType != CRYPT_CONTENT_DATA) :
|
|
if (ContentType == CRYPT_CONTENT_COMPRESSEDDATA) :
|
|
print_debug( "Found compressed data." )
|
|
try:
|
|
DeCompress_object = cryptCreateEnvelope( cryptUser, CRYPT_FORMAT_AUTO )
|
|
DeCompressEnvelope = int ( DeCompress_object )
|
|
bytesCopied = cryptPushData( DeCompressEnvelope, Cleartext )
|
|
cryptFlushData( DeCompressEnvelope )
|
|
bytesCopied = cryptPopData( DeCompressEnvelope, Cleartext, DataBufferSize )
|
|
cryptDestroyEnvelope( DeCompressEnvelope )
|
|
Cleartext = Cleartext[:bytesCopied]
|
|
print_debug( "Decompression successful" )
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
print( "Decryption error: " + message )
|
|
print_debug( "Decompression FAILED." )
|
|
clean_envelope()
|
|
exit ( ERR_DECRYPT )
|
|
else:
|
|
print_debug("Other non-ordinary data found. Aborting")
|
|
clean_envelope()
|
|
exit ( ERR_DATATYPE )
|
|
|
|
else:
|
|
print_debug( "Input is too large." )
|
|
ReadMoreBytes = True
|
|
clean_envelope()
|
|
exit( ERR_SIZE )
|
|
|
|
|
|
else:
|
|
# USE ONLY A AES CONTEXT TO DECRYPT Data
|
|
if ( Mode == "openssl" ) :
|
|
# get an AEScontext from a password and salt
|
|
crypt_object = cryptCreateContext( cryptUser , CRYPT_ALGO_AES )
|
|
AESContext = int ( crypt_object )
|
|
|
|
# decode the input
|
|
Buffer = bytearray(b' '*len(Data))
|
|
if ( not BINARY ) :
|
|
try:
|
|
Buffer = a2b_base64( Data )
|
|
except:
|
|
print ( "Error: cannot decode message block" )
|
|
exit ( ERR_DECODE )
|
|
else:
|
|
Buffer = Data
|
|
|
|
print_debug ( "Processing " + str( len( Buffer ) ) + " bytes of input data" )
|
|
|
|
# read the salt from Buffer
|
|
SALT = bytearray()
|
|
CryptoBuffer = bytearray()
|
|
if ( ( Buffer[:8] == b'Salted__' ) and (len (Buffer) > 1) ) :
|
|
SALT = Buffer[8:16]
|
|
i = 0
|
|
for i in range(len(Buffer)-16) :
|
|
CryptoBuffer.append(Buffer[i+16])
|
|
else:
|
|
print( "Decryption error: inconsistent encrypted message format." )
|
|
|
|
keyandiv = pbkdf2( SALT )
|
|
sessionkey = keyandiv[:32]
|
|
iv = keyandiv[32:]
|
|
if ( KEY128 ) :
|
|
sessionkey = keyandiv[:16]
|
|
iv = keyandiv[16:]
|
|
|
|
if (sessionkey) :
|
|
cryptSetAttribute( AESContext, CRYPT_CTXINFO_MODE, CRYPT_MODE_CBC )
|
|
cryptSetAttribute( AESContext, CRYPT_CTXINFO_KEYSIZE, AESblocksize )
|
|
cryptSetAttributeString( AESContext, CRYPT_CTXINFO_IV, iv )
|
|
cryptSetAttributeString( AESContext, CRYPT_CTXINFO_KEY, sessionkey )
|
|
|
|
# decrypt Data in the context
|
|
try:
|
|
if ( (len(CryptoBuffer) % AESblocksize ) != 0 ) :
|
|
print( "This input data may be corrupt" )
|
|
clean_context()
|
|
exit ( ERR_CORRUPT )
|
|
|
|
cryptDecrypt( AESContext, CryptoBuffer )
|
|
# CryptoBuffer holds the decrypted clear text and must be randomized below
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
print_debug( "Decryption error while decrypting data ... " + message )
|
|
clean_context()
|
|
exit( ERR_DECRYPT )
|
|
|
|
context_info()
|
|
# if decryption was successful, the decrypted data is in-place
|
|
cryptDestroyContext( AESContext )
|
|
print_debug ( "Retrieving " + str(len(CryptoBuffer)) + " encrypted bytes." )
|
|
|
|
# use PKCS#7 padding to remove padding bytes from the tail of the clear text
|
|
Last = int( CryptoBuffer[ len(CryptoBuffer) -1 ] )
|
|
if Last > 16 :
|
|
print ( "This data is not AES-256 encrypted." )
|
|
exit ( ERR_DECRYPT )
|
|
for X in range( Last ) :
|
|
del CryptoBuffer[ len(CryptoBuffer) -1 ]
|
|
|
|
Cleartext = CryptoBuffer
|
|
# randomize Buffer
|
|
CryptoBuffer = get_random_bytes( len (CryptoBuffer) )
|
|
# Cleartext holds the decrypted data
|
|
|
|
if (Cleartext) :
|
|
|
|
# write decrypted bytes to file system
|
|
try:
|
|
F = open( OutFilename, "wb" )
|
|
F.write( Cleartext )
|
|
F.close()
|
|
unix("chmod 600 " + OutFilename)
|
|
print("Clear text written to " + OutFilename)
|
|
except:
|
|
print( "Error: cannot write decrypted bytes to " + OutFilename )
|
|
clean_envelope()
|
|
exit( ERR_PERM )
|
|
else:
|
|
# DECRYPTION failed.
|
|
print( "Decryption failed." )
|
|
clean_envelope()
|
|
exit ( ERR_DECRYPT )
|
|
|
|
# randomize Data
|
|
Cleartext = get_random_bytes( len (Cleartext) )
|
|
|
|
# normal clean up
|
|
# randomize the password buffer, because it is no longer needed
|
|
password = get_random_bytes( len(password) )
|
|
del password
|
|
|
|
try:
|
|
cryptDestroyEnvelope( Envelope )
|
|
cryptEnd()
|
|
except CryptException as e :
|
|
status, message = e.args
|
|
print("Error: " + message)
|
|
exit ( ERR_INCOMPLETE )
|
|
|
|
exit( OK )
|
|
|