Skip to content
Snippets Groups Projects
Commit e545ad7e authored by Richard van der Hoff's avatar Richard van der Hoff
Browse files

Outbound group session support in the python wrappers

parent c0585541
No related branches found
No related tags found
No related merge requests found
*.pyc
/*.account
/*.session
/*.group_session
from .account import Account
from .session import Session
from .outbound_group_session import OutboundGroupSession
......@@ -8,7 +8,7 @@ import yaml
from . import *
if __name__ == '__main__':
def build_arg_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--key", help="Account encryption key", default="")
commands = parser.add_subparsers()
......@@ -206,5 +206,41 @@ if __name__ == '__main__':
decrypt.set_defaults(func=do_decrypt)
outbound_group = commands.add_parser("outbound_group", help="Create an outbound group session")
outbound_group.add_argument("session_file", help="Local group session file")
outbound_group.set_defaults(func=do_outbound_group)
group_encrypt = commands.add_parser("group_encrypt", help="Encrypt a group message")
group_encrypt.add_argument("session_file", help="Local group session file")
group_encrypt.add_argument("plaintext_file", help="Plaintext",
type=argparse.FileType('rb'), default=sys.stdin)
group_encrypt.add_argument("message_file", help="Message",
type=argparse.FileType('wb'), default=sys.stdout)
group_encrypt.set_defaults(func=do_group_encrypt)
return parser
def do_outbound_group(args):
if os.path.exists(args.session_file):
sys.stderr.write("Session %r file already exists" % (
args.session_file,
))
sys.exit(1)
session = OutboundGroupSession()
with open(args.session_file, "wb") as f:
f.write(session.pickle(args.key))
def do_group_encrypt(args):
session = OutboundGroupSession()
with open(args.session_file, "rb") as f:
session.unpickle(args.key, f.read())
plaintext = args.plaintext_file.read()
message = session.encrypt(plaintext)
with open(args.session_file, "wb") as f:
f.write(session.pickle(args.key))
args.message_file.write(message)
if __name__ == '__main__':
parser = build_arg_parser()
args = parser.parse_args()
args.func(args)
import json
from ._base import *
lib.olm_outbound_group_session_size.argtypes = []
lib.olm_outbound_group_session_size.restype = c_size_t
lib.olm_outbound_group_session.argtypes = [c_void_p]
lib.olm_outbound_group_session.restype = c_void_p
lib.olm_outbound_group_session_last_error.argtypes = [c_void_p]
lib.olm_outbound_group_session_last_error.restype = c_char_p
def outbound_group_session_errcheck(res, func, args):
if res == ERR:
raise OlmError("%s: %s" % (
func.__name__, lib.olm_outbound_group_session_last_error(args[0])
))
return res
def outbound_group_session_function(func, *types):
func.argtypes = (c_void_p,) + types
func.restypes = c_size_t
func.errcheck = outbound_group_session_errcheck
outbound_group_session_function(
lib.olm_pickle_outbound_group_session, c_void_p, c_size_t, c_void_p, c_size_t
)
outbound_group_session_function(
lib.olm_unpickle_outbound_group_session, c_void_p, c_size_t, c_void_p, c_size_t
)
outbound_group_session_function(lib.olm_init_outbound_group_session_random_length)
outbound_group_session_function(lib.olm_init_outbound_group_session, c_void_p, c_size_t)
outbound_group_session_function(lib.olm_group_encrypt_message_length, c_size_t)
outbound_group_session_function(lib.olm_group_encrypt,
c_void_p, c_size_t, # Plaintext
c_void_p, c_size_t, # Message
)
class OutboundGroupSession(object):
def __init__(self):
self.buf = create_string_buffer(lib.olm_outbound_group_session_size())
self.ptr = lib.olm_outbound_group_session(self.buf)
random_length = lib.olm_init_outbound_group_session_random_length(self.ptr)
random = read_random(random_length)
random_buffer = create_string_buffer(random)
lib.olm_init_outbound_group_session(self.ptr, random_buffer, random_length)
def pickle(self, key):
key_buffer = create_string_buffer(key)
pickle_length = lib.olm_pickle_outbound_group_session_length(self.ptr)
pickle_buffer = create_string_buffer(pickle_length)
lib.olm_pickle_outbound_group_session(
self.ptr, key_buffer, len(key), pickle_buffer, pickle_length
)
return pickle_buffer.raw
def unpickle(self, key, pickle):
key_buffer = create_string_buffer(key)
pickle_buffer = create_string_buffer(pickle)
lib.olm_unpickle_outbound_group_session(
self.ptr, key_buffer, len(key), pickle_buffer, len(pickle)
)
def encrypt(self, plaintext):
message_length = lib.olm_group_encrypt_message_length(
self.ptr, len(plaintext)
)
message_buffer = create_string_buffer(message_length)
plaintext_buffer = create_string_buffer(plaintext)
lib.olm_group_encrypt(
self.ptr,
plaintext_buffer, len(plaintext),
message_buffer, message_length,
)
return message_buffer.raw
......@@ -4,11 +4,13 @@ OLM="python -m olm"
ALICE_ACCOUNT=alice.account
ALICE_SESSION=alice.session
ALICE_GROUP_SESSION=alice.group_session
BOB_ACCOUNT=bob.account
BOB_SESSION=bob.session
rm $ALICE_ACCOUNT $BOB_ACCOUNT
rm $ALICE_SESSION $BOB_SESSION
rm $ALICE_GROUP_SESSION
$OLM create_account $ALICE_ACCOUNT
$OLM create_account $BOB_ACCOUNT
......@@ -20,3 +22,9 @@ BOB_ONE_TIME_KEY="$($OLM keys --json $BOB_ACCOUNT | jq -r '.one_time_keys.curve2
$OLM outbound $ALICE_ACCOUNT $ALICE_SESSION "$BOB_IDENTITY_KEY" "$BOB_ONE_TIME_KEY"
echo "Hello world" | $OLM encrypt $ALICE_SESSION - - | $OLM inbound $BOB_ACCOUNT $BOB_SESSION - -
### group sessions
$OLM outbound_group $ALICE_GROUP_SESSION
echo "Hello world" | $OLM group_encrypt $ALICE_GROUP_SESSION - -
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment