diff -uNr a/blatta/Makefile b/blatta/Makefile --- a/blatta/Makefile 10b85dc5440c229443df0b04e40877901c042af961ffb0e3ab384ca7cf30f8e49c382c792207c067fdde84f1f25afb9807151abfa2c8cf2404c5964e819dd6ad +++ b/blatta/Makefile 0502acc5ec5c4b3f49962e0b52cc2ed7e31317f5057c772ab503acd4d0e0e007ace8c05fb9c1cb122a33f7b62cf1c5bfb370c5379d273b201015409281b59626 @@ -1,6 +1,6 @@ -VERSION := 9973 +VERSION := `cat VERSION` -DISTFILES = Makefile blatta README.txt lib migrations tests test_net_configs start_test_net.sh +DISTFILES = Makefile VERSION blatta README.txt lib migrations tests test_net_configs start_test_net.sh all: echo "Nothing to do." diff -uNr a/blatta/README.txt b/blatta/README.txt --- a/blatta/README.txt 391ac9b9fa75d5cb9dbeef2facdff1349cfa379504f906b42ebfd6366a63f9e073bc3652bb1f926754c49ccf773b6bf00f91a4afb6e869dd4040c37b70b95827 +++ b/blatta/README.txt 406509e8f3c1ed2099f5c227646af37c29f8d14d0d4303e2fec2602d9bb8bd5a0ad90374173b9cdc49edaf59fd979dca5015449620b692d8715bc87f6ebe0c75 @@ -7,7 +7,6 @@ Notably missing: -- Address Cast - Key Offer message support - Key Slice message support @@ -34,5 +33,4 @@ NOTES: To run the unit tests, you'll need to run: -pip install mock -Please note however that the unit tests are currently borked. \ No newline at end of file +pip install mock \ No newline at end of file diff -uNr a/blatta/VERSION b/blatta/VERSION --- a/blatta/VERSION false +++ b/blatta/VERSION e939319655cbd7d710f012c0986cbe8b900a3a82ef28006dbdddb03efc993211886356770f832525ea2e0ef08d92cb1bb7d34588446c7c834b2c24a9198a22f7 @@ -0,0 +1 @@ +9972 diff -uNr a/blatta/blatta b/blatta/blatta --- a/blatta/blatta 77dd6b050743ba29828e1b62771447365a30cd54e0e56d5dbf1634cd44e7e537b1fc8703d4a9bf8b4ba4bfe86298a5e4e2a8271c159f867fbeb3a487c3f05a94 +++ b/blatta/blatta b3e185895d51d98185cbdd7c548b3ac8a91a9da5287a5b211ba159634b10b9fda7583c5e460e130eafd0715f9d0103a797402b1aa694ba917fe9dcea0593ef7f @@ -4,7 +4,8 @@ import re import sys import logging -from lib.station import Station, VERSION +from lib.station import Station +from lib.version import VERSION from optparse import OptionParser @@ -63,7 +64,7 @@ (options, args) = op.parse_args(argv[1:]) if options.station_handle is None: - op.error("Please specify a handle") + op.error("Please specify station-handle") if options.channel_name is None: options.channel_name = "#pest" log_format = "%(levelname)s %(asctime)s: %(message)s" diff -uNr a/blatta/lib/address_cast.py b/blatta/lib/address_cast.py --- a/blatta/lib/address_cast.py false +++ b/blatta/lib/address_cast.py c38d152f4b5c292255d08c410e2b1ffb814082fd766eedb2be209bf0037e9b3b62e3666cbf01cdc3651b3be73598bcf4fc39b583fb2afee62ce519ee6f1d023e @@ -0,0 +1,159 @@ +import base64 +import hashlib +import hmac +import logging +import os +import struct +import time + +from message_exception import MessageException, DUPLICATE_PACKET +from serpent import Serpent, serpent_cbc_decrypt, serpent_cbc_encrypt +from message import Message, MESSAGE_PACKET_FORMAT +from commands import ADDRESS_CAST +from prod import get_ascii_address +from prod import OUTGOING +from prod import INCOMING + +BLACK_PACKET_FORMAT = "<272s48s4s" +RED_PACKET_FORMAT = "<16sL6s246s" +LOGGING_FORMAT = u"[%s:%d %s] %s %s %s %s" +NONCE_LENGTH = 16 +RED_PACKET_PADDING_LENGTH = 246 +PAYLOAD_PADDING_LENGTH = 4 + + +class AddressCast(Message): + def __init__(self, message, state): + super(AddressCast, self).__init__(message, state) + self.command = ADDRESS_CAST + self.address = message.get('external_address') + self.bounces = message.get('bounces', 0) + self.long_buffer = message.get('long_buffer') + self.origin_peer = None + + def inflate(self, message): + self.peer = message.get('peer') + self.message_bytes = message.get('bytes') + (int_ts, self_chain, net_chain, speaker, body) = self._unpack_generic_message(self.message_bytes) + self.timestamp = int_ts + self.speaker = speaker + self.body = body + self.message_hash = self.gen_hash(self.message_bytes) + self.long_buffer = message.get('long_buffer') + if self.long_buffer.has(self.message_hash): + raise MessageException(DUPLICATE_PACKET, self.metadata, self) + + # if the signature checks out for a peer with no AT entry, decrypt + for peer in self.state.get_keyed_peers(exclude_addressless=False): + if peer: + ( + black_packet_bytes, + signature, + ) = struct.unpack(BLACK_PACKET_FORMAT, body)[0:2] + + for key in peer.keys: + key_bytes = base64.b64decode(key) + signing_key = key_bytes[:32] + cipher_key = key_bytes[32:] + signature_check = hmac.new(signing_key, + black_packet_bytes, + hashlib.sha384).digest() + + if signature_check != signature: + continue + + # try to decrypt black packet + Serpent(cipher_key) + red_packet_bytes = serpent_cbc_decrypt(cipher_key, black_packet_bytes) + + try: + (command, address) = struct.unpack(RED_PACKET_FORMAT, red_packet_bytes)[1:3] + self.origin_peer = peer + self.address = address + self.log_incoming(message.get('peer'), peer.handles[0], get_ascii_address(address)) + return self + except NameError as ne: + # This message was not intended for us. + # We must forward it on if the bounce count is below the cutoff + pass + + self.log_incoming(message.get('peer'), "N/A", "N/A") + return self + + # Increment bounce count and send to everyone except originating peer + def forward(self): + # forward to everyone except the station from which we received this + for peer in self.state.get_keyed_peers(exclude_addressless=True, exclude_ids=[self.peer.peer_id]): + self.message_bytes = self.get_message_bytes() + signed_packet_bytes = self.pack(peer, self.command, self.bounces, self.message_bytes) + peer.send(signed_packet_bytes) + + # send to every peer with an address in the AT + def broadcast(self): + self.timestamp = int(time.time()) + + # for each *cold* peer we have a key for + cold_peers = self.state.get_cold_peers() + cold_peer_ids = map(lambda p: p.peer_id, cold_peers) + for peer in self.state.get_keyed_peers(exclude_addressless=True, exclude_ids=cold_peer_ids): + for cold_peer in cold_peers: + # parse peer key + key_bytes = base64.b64decode(cold_peer.get_key()) + signing_key = key_bytes[:32] + cipher_key = key_bytes[32:] + + # build the red packet + red_packet_bytes = struct.pack( + RED_PACKET_FORMAT, + os.urandom(NONCE_LENGTH), + 0, + self.address, + os.urandom(RED_PACKET_PADDING_LENGTH) + ) + # form the black packet by encrypting and signing the red packet + black_packet_bytes = serpent_cbc_encrypt(cipher_key, red_packet_bytes) + + # sign and pack the black address cast packet + seal = hmac.new(signing_key, black_packet_bytes, hashlib.sha384).digest() + self.body = struct.pack( + BLACK_PACKET_FORMAT, + black_packet_bytes, + seal, + os.urandom(PAYLOAD_PADDING_LENGTH) + ) + + # send off the black packet + self.message_bytes = self.get_message_bytes() + self.message_hash = hashlib.sha256(self.message_bytes).digest() + self.long_buffer.intern(self) + signed_packet_bytes = self.pack(peer, self.command, self.bounces, self.message_bytes) + peer.send(signed_packet_bytes) + self.log_outgoing(red_packet_bytes, cold_peer, peer) + + + def log_incoming(self, peer, origin_peer_handle, address): + logging.info(LOGGING_FORMAT % ( + peer.address, + peer.port, + peer.handles[0], + INCOMING, + "ADDRESS_CAST", + origin_peer_handle, + address)) + + def log_outgoing(self, red_packet_bytes, cold_peer, peer): + ( + nonce, + command, + pest_address, + padding, + ) = struct.unpack(RED_PACKET_FORMAT, red_packet_bytes) + + logging.info(LOGGING_FORMAT % ( + peer.address, + peer.port, + peer.handles[0], + OUTGOING, + "ADDRESS_CAST", + cold_peer.handles[0], + get_ascii_address(pest_address))) diff -uNr a/blatta/lib/broadcast.py b/blatta/lib/broadcast.py --- a/blatta/lib/broadcast.py 3a3d991b01ef999bffc5b634a877aeae8b7602c71e37c0f9b86b7b3b5f6b9308faf1455ce1608aaf5e36ed16d5aed522f89c39cd122c5279cfcdb6e88c1f1566 +++ b/blatta/lib/broadcast.py ea2fdb263b51b3677a909f3088f997307db1bc43aa70c77b1ebcf000a37a6eabb509fcdea575f87327cce81f8983ec81f5a486d3246bc43fc27d5c7e6bf0e64b @@ -4,10 +4,11 @@ import time import hashlib from commands import BROADCAST -from message import MAX_SPEAKER_SIZE, MESSAGE_PACKET_FORMAT +from message import MAX_SPEAKER_SIZE, MESSAGE_PACKET_FORMAT from text_message import TextMessage from message_exception import MessageException, OUT_OF_ORDER_BOTH, OUT_OF_ORDER_SELF, OUT_OF_ORDER_NET + class Broadcast(TextMessage): def __init__(self, message, state): super(Broadcast, self).__init__(message, state) @@ -63,9 +64,7 @@ # if we are not rebroadcasting we need to set the timestamp self.timestamp = int(time.time()) - target_peer = None - - self.message_bytes = self.get_message_bytes(target_peer) + self.message_bytes = self.get_message_bytes() self.message_hash = hashlib.sha256(self.message_bytes).digest() self.long_buffer.intern(self) @@ -80,7 +79,7 @@ except UnicodeDecodeError: pass - def get_message_bytes(self, peer=None): + def get_message_bytes(self): speaker = self._pad(self.speaker, MAX_SPEAKER_SIZE) # let's generate the self_chain value from the last message or set it to zero if @@ -92,11 +91,11 @@ body = self.body.encode('utf-8') message_bytes = struct.pack(MESSAGE_PACKET_FORMAT, - self.timestamp, - self.self_chain, - self.net_chain, - speaker.encode('ascii'), - body) + self.timestamp, + self.self_chain, + self.net_chain, + speaker.encode('ascii'), + body) return message_bytes # we already have message bytes here since this message came from the long buffer @@ -118,4 +117,4 @@ signed_packet_bytes = self.pack(peer, self.command, self.bounces, self.message_bytes) peer.send(signed_packet_bytes) - self.log_outgoing(peer) \ No newline at end of file + self.log_outgoing(peer) diff -uNr a/blatta/lib/client.py b/blatta/lib/client.py --- a/blatta/lib/client.py 45faf4d7f86b7a5877e472ce0e9a90a75d5c25bf996a68e47af6c5240165d5cb7312cd52077475d5eefbb5b6cece928f9b5288a105bc160507817035e75c3b2a +++ b/blatta/lib/client.py 390f36b85d76d3b68b3d6f3c08c625c6889cc5c3efe4003f9950dbbf4aee383a2971750e3068826e6d7c62ea5c76fdadf7188cc2795e7d580eff3861019be090 @@ -9,7 +9,7 @@ from message import PEST_VERSION from broadcast import Broadcast from direct import Direct -from station import VERSION +from version import VERSION from funcs import * from commands import BROADCAST @@ -136,7 +136,7 @@ return if self.nickname and self.user: self.reply("001 %s :Hi, welcome to PestNet" % self.nickname) - self.reply("002 %s :Your host is %s, running Blatta %d and Pest 0x%X" + self.reply("002 %s :Your host is %s, running Blatta %s and Pest 0x%X" % (self.nickname, server.name, VERSION, PEST_VERSION)) self.reply("003 %s :This server was created %s" % (self.nickname, datetime.datetime.now())) @@ -243,20 +243,22 @@ self.message_channel( channel, command, "%s :%s" % (channel.name, message)) # send the channel message to peers as well - Broadcast( - { - "speaker": self.server.station.station_handle, - "body": message, - "long_buffer": self.server.station.long_buffer - }, - self.state).send() - else: - Direct({ - "speaker": self.server.station.station_handle, - "handle": targetname, - "body": message, - "long_buffer": self.server.station.long_buffer - }, self.state).send() + for chunk in split_string_max_octet_size(message): + Broadcast( + { + "speaker": self.server.station.station_handle, + "body": chunk, + "long_buffer": self.server.station.long_buffer + }, + self.state).send() + else: + for chunk in split_string_max_octet_size(message): + Direct({ + "speaker": self.server.station.station_handle, + "handle": targetname, + "body": chunk, + "long_buffer": self.server.station.long_buffer + }, self.state).send() def part_handler(): if len(arguments) < 1: diff -uNr a/blatta/lib/commands.py b/blatta/lib/commands.py --- a/blatta/lib/commands.py 6f21158501f8a41b44d3dc3712385bd438432c829392fc1f2683765cd16541ab35d072d69d3da739436c4b0aab686590ce00227878ba3837c54c16652eb3a313 +++ b/blatta/lib/commands.py 643a3f26591fe32d7079dc65b679fb199b65c460ec249e4a8bc82d3d85063ba689040a9c08ac959068404d599fefe663ce73ce3a0788a8e29e00ba40d8ab390b @@ -10,6 +10,8 @@ PROD = 0x02 GETDATA = 0x03 IGNORE = 0xFF +ADDRESS_CAST = 0xFE + COMMAND_LABELS = { BROADCAST: "BROADCAST", DIRECT: "DIRECT", diff -uNr a/blatta/lib/direct.py b/blatta/lib/direct.py --- a/blatta/lib/direct.py 3bfa841aa4cc82f768b5764f59f492eba94253dea97388e5cf59cab5c02371f91615b92cec99f05642929622bbc6ff679fcee9b5bc5f618ff02f069570d90f15 +++ b/blatta/lib/direct.py 076a4f946db1a7cfc61ac5b457506149fcefa04f30c704588930ba12cc6084f854d75d19835fdd48735b6a6258816be60b58b4daed91dc90e30fecf50d39f286 @@ -45,7 +45,7 @@ return self def check_order(self): - if not (self.long_buffer.has(self.self_chain)): + if not self.long_buffer.has(self.self_chain): raise MessageException(OUT_OF_ORDER_SELF, self.metadata, self) def send(self): diff -uNr a/blatta/lib/funcs.py b/blatta/lib/funcs.py --- a/blatta/lib/funcs.py 26f103d62d35c8d8e94a6537afea06a33b8cf920ebedbf6b2a38fafb9f3be31379386194dc70c0a2e87b01f6725b43d3ad182037f6cd1d9db1b8f36cf8483542 +++ b/blatta/lib/funcs.py 1045575ab9da3266342161bf636e274d16d9066171a0d276e526ab50476571a6d7101ea07c85e918c34a1413a1a3ec98e94353d45308fe86748e8e3846ebd2c1 @@ -9,3 +9,20 @@ def irc_lower(s): return string.translate(s, _ircstring_translation) +# TODO: figure out how to use unicodedata.combining to go by character group +def split_string_max_octet_size(ustring, maxsize=324): + begin = 0 + total = 0 + results = [] + i = 0 + for character in ustring: + character_size = len(character.encode('utf-8')) + total += character_size + if total > maxsize: + results.append(ustring[begin:i]) + total = character_size + begin = i + if i == len(ustring) - 1: + results.append(ustring[begin:]) + i += 1 + return results diff -uNr a/blatta/lib/getdata.py b/blatta/lib/getdata.py --- a/blatta/lib/getdata.py bc2ec2467f2fbc977de8db86e9f5420416d5471e13322227c12fe65efd17ea2790693a3e4e0a5d2c9d94067cef2928480330b6ff08b57b5f2963f049a7a176ee +++ b/blatta/lib/getdata.py 8c49d1624d8db20a5f19f8f6d7218ed4811ea54126f6aa4ccd0fec99aed9667d6d7c6add3a1c153b8a88ffacdeaee41040dd4777c46ec38fc2acee8c02278d34 @@ -44,7 +44,7 @@ return int_ts, self_chain, net_chain, speaker, body def send(self): - self.message_bytes = self.get_message_bytes(self.target_peer) + self.message_bytes = self.get_message_bytes() self.message_hash = hashlib.sha256(self.message_bytes).digest() signed_packet_bytes = self.pack(self.target_peer, self.command, diff -uNr a/blatta/lib/long_buffer.py b/blatta/lib/long_buffer.py --- a/blatta/lib/long_buffer.py fb8262acad87a5206c853b146c7cfd9afa5e2eb1a4ee6fb99443df10904f0fb76998678dbdf15634b01b06d490694e9be682d4936dc65a05258631b9408aa19c +++ b/blatta/lib/long_buffer.py a089e5b9b37b0a2c81dea0ff9c871b7e3a6c6e1cb7f8be98f58ac7cf2f14d7ab74d74da758657765e98a392edc515504886fb05415407c4276dda683d6aaa18e @@ -9,6 +9,7 @@ class LongBuffer(object): def __init__(self, state): self.state = state + self.cache = {} def exhume(self, message_hash): command, message_bytes = self.state.get_message(message_hash) @@ -23,12 +24,29 @@ if self.has(message.message_hash): return - self.state.log_message(message) + self.cache[message.message_hash] = { + 'timestamp': time.time(), + 'value': message + } + self.evict_expired() + if message.command in [BROADCAST, DIRECT]: + self.state.log_message(message) def has(self, message_hash): if EMPTY_CHAIN == message_hash: return True + if message_hash in self.cache: + return True + if self.state.log_has_message(message_hash): return True - return False \ No newline at end of file + return False + + def evict_expired(self): + working_cache = self.cache.copy() + for key in working_cache: + if time.time() - \ + working_cache.get(key)['timestamp'] > \ + float(self.state.get_knob('long_buffer_cache_expiration_seconds')): + self.cache.pop(key) diff -uNr a/blatta/lib/message.py b/blatta/lib/message.py --- a/blatta/lib/message.py c05cc17b3b9440b910618cc3bd90ce2ed1d597e3b0b468db76092889c23da0a6bbc3c3d36244dd9927647f662354bfd0d20975e67cd6245611d2a1814d76c310 +++ b/blatta/lib/message.py 853561293347d88a3da3bafbd9dafb48c23eeda3e05d89587ff485a129565d9e30b231d50902bd16040a0de8e6ccf37e98b4be342c318ecec9883525629a6fc3 @@ -122,7 +122,7 @@ def gen_hash(cls, message_bytes): return hashlib.sha256(message_bytes).digest() - def get_message_bytes(self, peer=None): + def get_message_bytes(self): speaker = Message._pad(self.speaker, MAX_SPEAKER_SIZE) self.self_chain = self.net_chain = EMPTY_CHAIN message_bytes = struct.pack(MESSAGE_PACKET_FORMAT, diff -uNr a/blatta/lib/message_factory.py b/blatta/lib/message_factory.py --- a/blatta/lib/message_factory.py eda14ff2551455b30e4e3b37da7ef74a2731a1a72177cd49beea37948f9e3637d7b37fc7ae2a47e2c65e219717fcfb15ba0fe029be57fa77a67e9b1d8dba527c +++ b/blatta/lib/message_factory.py 845a88cabc0a29e8faa9ddfd059dde6069e8d249c85b3c47b661599930d2f9cfaaa3727208cc006bee887fa717409c085ba7255d9d41fde3fe640ec18ef539f9 @@ -4,6 +4,7 @@ import struct from prod import Prod +from address_cast import AddressCast from direct import Direct from broadcast import Broadcast from ignore import Ignore @@ -15,7 +16,7 @@ import logging #command codes -from commands import BROADCAST, DIRECT, PROD, GETDATA, IGNORE +from commands import BROADCAST, DIRECT, PROD, GETDATA, IGNORE, ADDRESS_CAST PACKET_SIZE = 496 MAX_SPEAKER_SIZE = 32 @@ -31,6 +32,7 @@ DIRECT: Direct, PROD: Prod, GETDATA: GetData, + ADDRESS_CAST: AddressCast, IGNORE: Ignore } @@ -78,7 +80,7 @@ raise MessageException(UNSUPPORTED_VERSION, metadata) try: - return MESSAGE_TYPES[command]({}, state).inflate({ + return MESSAGE_TYPES[command]({ 'metadata': metadata }, state).inflate({ 'peer': peer, 'bounces': bounces, 'bytes': message_bytes, diff -uNr a/blatta/lib/peer.py b/blatta/lib/peer.py --- a/blatta/lib/peer.py fe90ac2751047b99329795b037a23029a032d043d37d85bec59b25c3296f757dcb6a6db92b6a4a837f959965e8bc22f48c888d7fc09eaab9124b14c3797d09d7 +++ b/blatta/lib/peer.py 0f3b93f5f5012ff26ab129e720c7fbace4bb3f09e53059a8a77fc454b716988ebf40f91dc7f1d6665edb662433cf7671726bb1743a4380b390cf4ba75791f8b7 @@ -1,6 +1,7 @@ import traceback import logging + class Peer(object): def __init__(self, socket, peer_entry): self.handles = peer_entry["handles"] @@ -8,7 +9,7 @@ self.peer_id = peer_entry["peer_id"] self.address = peer_entry["address"] self.port = peer_entry["port"] - self.socket = socket + self.socket = socket self.forked = peer_entry.get("forked") def get_key(self): @@ -18,19 +19,18 @@ return None def send(self, signed_packet_bytes): - if self.get_key() != None and self.address != None and self.port != None: + if self.get_key() is not None and self.address is not None and self.port is not None: try: - self.socket.sendto(signed_packet_bytes, (self.address, self.port)) + self.socket.sendto(signed_packet_bytes, (self.address, self.port)) except Exception as ex: - stack = traceback.format_exc() - logging.debug(stack) + stack = traceback.format_exc() + logging.debug(stack) else: logging.debug("Discarding message to unknown handle or handle with no key: %s" % self.handles[0]) - def handle(self): if self.handles: if len(self.handles) > 0: return self.handles[0] - return None \ No newline at end of file + return None diff -uNr a/blatta/lib/prod.py b/blatta/lib/prod.py --- a/blatta/lib/prod.py 5e6ef345f4b955800bd47da1d475019db2695e2d2bffdf5c60a2dab97788ba536bb50aee3cf68e8963e4f4fa599d814e2462f20dab8a207f37047912a8d6b12a +++ b/blatta/lib/prod.py 5153bdf007a9ef0d7cbbe66aadf4726f22ca85999b3f89b130bf62633dcb5224000c59048d348386a798ef5370a700e23f9b50d5e8c40ae5ee1ac85bac827067 @@ -16,6 +16,21 @@ INCOMING = "->" OUTGOING = "<-" + +def get_ascii_address(bytes): + port, address_bytes = struct.unpack(PEST_ADDRESS, bytes) + dotted_quad = socket.inet_ntoa(address_bytes) + return "%s:%d" % (dotted_quad,port) + + +def get_pest_address_bytes(peer): + if peer.address in "localhost": + ip_bytes = socket.inet_aton("127.0.0.1") + else: + ip_bytes = socket.inet_aton(peer.address) + return struct.pack(PEST_ADDRESS, peer.port, ip_bytes) + + class Prod(Message): def __init__(self, message, state): super(Prod, self).__init__(message, state) @@ -58,7 +73,7 @@ self.pest_address = address return self - def send(self): + def broadcast(self): if not self.speaker: logging.error("aborting message send due speaker not being set") return @@ -67,6 +82,10 @@ self.prepare_and_send(peer) self.log_outgoing(peer) + def send(self, peer): + self.prepare_and_send(peer) + self.log_outgoing(peer) + def reply(self): self.prepare_and_send(self.prompt.peer) self.log_outgoing(self.prompt.peer) @@ -74,25 +93,13 @@ def prepare_and_send(self, peer): self.set_body(peer) self.timestamp = int(time.time()) - self.message_bytes = self.get_message_bytes(peer) + self.message_bytes = self.get_message_bytes() self.message_hash = hashlib.sha256(self.message_bytes).digest() signed_packet_bytes = self.pack(peer, self.command, self.bounces, self.message_bytes) peer.send(signed_packet_bytes) - def get_pest_address_bytes(self, peer): - if peer.address in "localhost": - ip_bytes = socket.inet_aton("127.0.0.1") - else: - ip_bytes = socket.inet_aton(peer.address) - return struct.pack(PEST_ADDRESS, peer.port, ip_bytes) - - def get_ascii_address(self, bytes): - port, address_bytes = struct.unpack(PEST_ADDRESS, bytes) - dotted_quad = socket.inet_ntoa(address_bytes) - return "%s:%d" % (dotted_quad,port) - def set_body(self, peer): - address = self.get_pest_address_bytes(peer) + address = get_pest_address_bytes(peer) self.body = struct.pack(PROD_MESSAGE_PACKET_FORMAT, self.flag, address, @@ -121,7 +128,7 @@ "PROD", flag, self._unpad(banner, 'utf-8'), - self.get_ascii_address(pest_address), + get_ascii_address(pest_address), binascii.hexlify(net_chain), binascii.hexlify(broadcast_self_chain), binascii.hexlify(handle_self_chain))) @@ -135,7 +142,7 @@ "PROD", self.flag, self.banner, - self.get_ascii_address(self.pest_address), + get_ascii_address(self.pest_address), binascii.hexlify(self.net_chain), binascii.hexlify(self.broadcast_self_chain), binascii.hexlify(self.handle_self_chain))) diff -uNr a/blatta/lib/server.py b/blatta/lib/server.py --- a/blatta/lib/server.py 19cbd806a9339a48f0ec7ef807db62967ac90844ad3d08ade9c07079fd1f6cf16622bc3b8f521919697647d70e12a48844befb9efb6f9c2bc99e4db3c45402e7 +++ b/blatta/lib/server.py b9b367ef99ff3627ea41e12893e777a7a2e000c3fcbcb917c2c66646a65d0c1713d4ebf08323a55770e44d42f9e07a00669f07a6c7da4da262eed5e751fc2151 @@ -2,6 +2,8 @@ import select import socket import time +import random + from funcs import * from client import Client from channel import Channel @@ -106,11 +108,12 @@ last_order_buffer_check = time.time() last_presence_check = time.time() last_prod = time.time() + last_address_cast = time.time() initial_prod_sent = False while True: # we don't want to be listening for client connections if there's already a client connected - if self.client == None: + if self.client is None: input_sockets = serversockets else: input_sockets = [self.client.socket] @@ -120,7 +123,7 @@ # handle tcp socket events (iwtd, owtd, ewtd) = select.select(input_sockets, output_sockets, [], .2) for x in iwtd: - if self.client != None: + if self.client is not None: self.client.socket_readable_notification() else: try: @@ -171,6 +174,13 @@ self.station.report_presence() last_presence_check = now + # broadcast address cast packets + if last_address_cast + 1 < now: + range_limit = self.station.state.get_knob('address_cast_interval_seconds') + if random.randint(1, range_limit) == range_limit: + self.station.send_address_cast() + last_address_cast = now + # send prod if self.station.state.get_knob('prod_interval_seconds') == None: if not initial_prod_sent: diff -uNr a/blatta/lib/state.py b/blatta/lib/state.py --- a/blatta/lib/state.py 867d1eb25895e5de2b1fba2fd754f2b83cc00d98169c90709c1f07a99b18b147e3948b503670aae252e1d694df7b88a47c2cff7aca750142af808db8f6d4ecb4 +++ b/blatta/lib/state.py 454c23bdcd92e53c4b7ba9b14e84f4f66958b70723a10a6079a151b8c5d404b8c0f748b10da5a7297496205ccbd5e87875009c0d34fcc9674095459694374bd8 @@ -10,16 +10,17 @@ import datetime import time import caribou -from station import VERSION +from version import VERSION from itertools import chain -KNOBS=( +KNOBS = ( { 'max_bounces': 3, 'embargo_interval_seconds': 1, 'rubbish_interval_seconds': 10, 'nick': '', + 'long_buffer_cache_expiration_seconds': 3600, 'order_buffer_check_seconds': 180, 'order_buffer_expiration_seconds': 120, 'short_buffer_expiration_seconds': 1, @@ -29,10 +30,13 @@ 'peer_away_interval_seconds': 10 * 60, 'presence_check_seconds': 5, 'prod_interval_seconds': 0, - 'banner': "Blatta %d" % (VERSION), + 'address_cast_interval_seconds': 65, + 'cold_peer_seconds': 62, + 'banner': "Blatta %s" % (VERSION), } ) + class State(object): def __init__(self, station, db_path=None): self.station = station @@ -80,7 +84,7 @@ cursor.execute("create table if not exists knobs(\ name text not null,\ value text not null)") - + # migrate the db if necessary if db_path: caribou.upgrade(db_path, "migrations") @@ -92,7 +96,8 @@ def update_handle_self_chain(self, handle, message_hash): cursor = self.cursor() - cursor.execute("insert into handle_self_chain(handle, message_hash) values(?, ?)", (handle, buffer(message_hash))) + cursor.execute("insert into handle_self_chain(handle, message_hash) values(?, ?)", + (handle, buffer(message_hash))) self.conn.commit() def get_handle_self_chain(self, handle): @@ -132,14 +137,14 @@ def get_knobs(self): cursor = self.cursor() results = cursor.execute("select name, value from knobs order by name asc").fetchall() - knobs = {} + knobs = {} for result in results: knobs[result[0]] = result[1] for key in KNOBS.keys(): if not knobs.get(key): knobs[key] = KNOBS[key] return knobs - + def get_knob(self, knob_name): cursor = self.cursor() result = cursor.execute("select value from knobs where name=?", (knob_name,)).fetchone() @@ -174,18 +179,18 @@ order by updated_at desc").fetchall() else: result = cursor.execute("select handle_id from handles where handle=?", - (handle,)).fetchone() + (handle,)).fetchone() if None != result: handle_id = result[0] else: return [] results = cursor.execute("select handle_id, address, port, updated_at, strftime('%s', updated_at) from at \ where handle_id=? order by updated_at desc", - (handle_id,)).fetchall() + (handle_id,)).fetchall() for result in results: handle_id, address, port, updated_at_utc, updated_at_unixtime = result h = cursor.execute("select handle from handles where handle_id=?", - (handle_id,)).fetchone()[0] + (handle_id,)).fetchone()[0] if updated_at_utc: if '.' not in updated_at_utc: updated_at_utc = updated_at_utc + '.0' @@ -208,43 +213,44 @@ cursor = self.cursor() wot = imp.load_source('wot', at_path) for peer in wot.peers: - results = cursor.execute("select * from handles where handle=? limit 1", - (peer["name"],)).fetchall() - if len(results) == 0: - key = peer["key"] - port = peer["port"] - address = peer["address"] - cursor.execute("insert into wot(peer_id) values(null)") - peer_id = cursor.lastrowid - cursor.execute("insert into handles(peer_id, handle) values(?, ?)", - (peer_id, peer["name"])) + results = cursor.execute('select * from handles where handle=? limit 1', + (peer["name"],)).fetchall() + if 0 == len(results): + key = peer.get('key') + port = peer.get('port') + address = peer.get("address") + cursor.execute('insert into wot(peer_id) values(null)') + peer_id = cursor.lastrowid + cursor.execute('insert into handles(peer_id, handle) values(?, ?)', + (peer_id, peer.get("name"))) handle_id = cursor.lastrowid - cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", - (handle_id, peer["address"], peer["port"], None)) - cursor.execute("insert into keys(peer_id, key) values(?, ?)", - (peer_id, key)) + if port is not None and address is not None: + cursor.execute('insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)', + (handle_id, address, port, None)) + cursor.execute('insert into keys(peer_id, key) values(?, ?)', + (peer_id, key)) self.conn.commit() def update_at(self, peer, set_active_at=True): cursor = self.cursor() row = cursor.execute("select handle_id from handles where handle=?", - (peer["handle"],)).fetchone() + (peer["handle"],)).fetchone() if row != None: handle_id = row[0] else: raise Exception("handle not found") - + at_entry = cursor.execute("select handle_id, address, port from at where handle_id=?", - (handle_id,)).fetchone() + (handle_id,)).fetchone() # if there are no AT entries for this handle, insert one timestamp = datetime.datetime.utcnow() if set_active_at else None if at_entry == None: cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", - (handle_id, - peer["address"], - peer["port"], - timestamp)) + (handle_id, + peer["address"], + peer["port"], + timestamp)) logging.debug("inserted new at entry for %s: %s:%d" % ( peer['handle'], peer['address'], @@ -257,39 +263,37 @@ address = ?,\ port = ?\ where handle_id=?", - (timestamp, - peer["address"], - peer["port"], - handle_id)) + (timestamp, + peer["address"], + peer["port"], + handle_id)) except sqlite3.IntegrityError: cursor.execute("delete from at where handle_id=?", (handle_id,)) - self.conn.commit() def add_peer(self, handle): cursor = self.cursor() cursor.execute("insert into wot(peer_id) values(null)") - peer_id = cursor.lastrowid + peer_id = cursor.lastrowid cursor.execute("insert into handles(peer_id, handle) values(?, ?)", - (peer_id, handle)) + (peer_id, handle)) self.conn.commit() - def remove_peer(self, handle): cursor = self.cursor() # get peer id result = cursor.execute("select peer_id from handles where handle=?", - (handle,)).fetchone() + (handle,)).fetchone() if result == None: raise Exception("handle not found") else: peer_id = result[0] # get all aliases - handle_ids = self.get_handle_ids_for_peer(peer_id) + handle_ids = self.get_handle_ids_for_peer(peer_id) for handle_id in handle_ids: # delete at entries for each alias cursor.execute("delete from at where handle_id=?", (handle_id,)) @@ -298,18 +302,17 @@ # delete all keys for peer id cursor.execute("delete from keys where peer_id=?", (peer_id,)) - + # delete peer from wot cursor.execute("delete from wot where peer_id=?", (peer_id,)) self.conn.commit() - def add_key(self, handle, key): cursor = self.cursor() peer_id = cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0] if peer_id != None: cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) - self.conn.commit() + self.conn.commit() def remove_key(self, key): cursor = self.cursor() @@ -319,13 +322,13 @@ def get_handle_ids_for_peer(self, peer_id): cursor = self.cursor() return list(chain.from_iterable(cursor.execute("select handle_id from handles where peer_id=?", - (peer_id,)).fetchall())) - + (peer_id,)).fetchall())) + def get_peer_handles(self): cursor = self.cursor() handles = self.listify(cursor.execute("select handle from handles").fetchall()) return handles - + def get_peers(self): cursor = self.cursor() peers = [] @@ -333,8 +336,10 @@ for handle in handles: peer = self.get_peer_by_handle(handle[0]) - if not (self.is_duplicate(peers, peer)): - peers.append(peer) + # leaving this check in here to help detect corrupt db + if (self.is_duplicate(peers, peer)): + logging.error("Peer with duplicate address or handle id detected: %s" % peer.handles[0]) + peers.append(peer) return peers def listify(self, results): @@ -366,10 +371,11 @@ return None, None - def get_keyed_peers(self, exclude_addressless=False, exclude_ids=[]): + def get_keyed_peers(self, exclude_addressless=False, exclude_ids=[], no_at_only=False): cursor = self.cursor() peer_ids = self.listify(cursor.execute("select peer_id from keys\ - where peer_id not in (%s) order by random()" % ','.join('?'*len(exclude_ids)), + where peer_id not in (%s) order by random()" % ','.join( + '?' * len(exclude_ids)), exclude_ids).fetchall()) peers = [] for peer_id in peer_ids: @@ -377,14 +383,32 @@ handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0] peer = self.get_peer_by_handle(handle) if self.is_duplicate(peers, peer): + logging.error("Peer with duplicate address or handle id detected: %s" % peer.handles[0]) + if exclude_addressless and (peer.address is None or peer.port is None): continue - if exclude_addressless and (peer.address == None or peer.port == None): + if no_at_only and (peer.address is not None or peer.port is not None): continue peers.append(peer) except: pass return peers + def get_cold_peers(self): + cold_peer_seconds = self.get_knob("cold_peer_seconds") + all_peers = self.get_peers() + cold_peers = [] + + # peers with keys and no address + for peer in all_peers: + current_time = time.time() + if peer.address is None and len(peer.keys) > 0: + cold_peers.append(peer) + elif peer.address and len(peer.keys) > 0: + at = self.get_at(peer.handles[0])[0] + if at['active_at_unixtime'] < (current_time - cold_peer_seconds): + cold_peers.append(peer) + + return cold_peers def handle_is_online(self, handle): # last rubbish message from peer associated with handle is @@ -415,7 +439,8 @@ ) raw_messages = cursor.execute("select message_bytes from log where created_at > ?", (dt,)).fetchall() for message_bytes in raw_messages: - int_ts, self_chain, net_chain, speaker, body = Message({})._unpack_generic_message(message_bytes[0][:], unpad_body=True) + int_ts, self_chain, net_chain, speaker, body = Message({})._unpack_generic_message(message_bytes[0][:], + unpad_body=True) if speaker == handle: return False return True @@ -431,24 +456,24 @@ peer_id = handle_info[1] address = cursor.execute("select address, port from at where handle_id=?\ order by updated_at desc limit 1", - (handle_info[0],)).fetchone() + (handle_info[0],)).fetchone() handles = self.listify(cursor.execute("select handle from handles where peer_id=?", - (peer_id,)).fetchall()) + (peer_id,)).fetchall()) keys = self.listify(cursor.execute("select key from keys where peer_id=?\ order by random()", - (peer_id,)).fetchall()) + (peer_id,)).fetchall()) return Peer(self.station.socket, { - "handles": handles, - "peer_id": handle_info[1], - "address": address[0] if address else None, - "port": address[1] if address else None, - "keys": keys - }) + "handles": handles, + "peer_id": handle_info[1], + "address": address[0] if address else None, + "port": address[1] if address else None, + "keys": keys + }) def is_duplicate(self, peers, peer): for existing_peer in peers: if (not existing_peer.address is None - and existing_peer.address == peer.address - and existing_peer.port == peer.port): - return True - return False + and existing_peer.address == peer.address + and existing_peer.port == peer.port): + return True + return False diff -uNr a/blatta/lib/station.py b/blatta/lib/station.py --- a/blatta/lib/station.py cd39da2074b824e921d385060ae9e183a21494cca0bbf4d2bd806f758b81553b160fc3af3d911c640ce9cd87065dbf3117a6fa6844e9e80830d5201ed0635c33 +++ b/blatta/lib/station.py f6af75aae5a5533728cde6a1fe9c3cc3d589dbe5bc2f2db41b16cb908f6abeabcc77fee321133e8d3f4d0549f9105250afbf73d6c34b1c182e58277619c77694 @@ -2,7 +2,6 @@ from fake_orphan import FakeOrphan -VERSION = 9973 STATUS_ONLINE = 0 STATUS_AWAY = 1 @@ -12,17 +11,20 @@ from state import State from getdata import GetData -from message_exception import MessageException, STALE_PACKET, OUT_OF_ORDER_NET, OUT_OF_ORDER_SELF, OUT_OF_ORDER_BOTH,\ - DUPLICATE_PACKET, MALFORMED_PACKET, INVALID_SIGNATURE, INVALID_HANDLE_ENCODING, UNSUPPORTED_VERSION, UNSUPPORTED_COMMAND +from address_cast import AddressCast +from message_exception import MessageException, STALE_PACKET, OUT_OF_ORDER_NET, OUT_OF_ORDER_SELF, OUT_OF_ORDER_BOTH, \ + DUPLICATE_PACKET, MALFORMED_PACKET, INVALID_SIGNATURE, INVALID_HANDLE_ENCODING, UNSUPPORTED_VERSION, \ + UNSUPPORTED_COMMAND from ignore import Ignore from prod import Prod, PROD_REPLY, PROD_PROMPT from server import Server from long_buffer import LongBuffer from order_buffer import OrderBuffer from short_buffer import ShortBuffer -from commands import BROADCAST, DIRECT, GETDATA, IGNORE, PROD +from commands import BROADCAST, DIRECT, GETDATA, IGNORE, PROD, ADDRESS_CAST from message_factory import MessageFactory from client_placeholder import ClientPlaceholder +from prod import get_ascii_address class Station(object): def __init__(self, cmd_line_options): @@ -42,10 +44,12 @@ BROADCAST: self.handle_broadcast, GETDATA: self.handle_getdata, IGNORE: self.handle_ignore, - PROD: self.handle_prod + PROD: self.handle_prod, + ADDRESS_CAST: self.handle_address_cast } self.presence = {} self.last_delivered = 0 + self.last_external_address = None def start(self): self.server.start() @@ -105,9 +109,11 @@ port = packet_info[1] packet_sample = packet_info[2] if error_code == STALE_PACKET: - logging.debug("[%s:%d] -> stale packet: %s" % (address, port, binascii.hexlify(message_or_exception.object.message_hash))) + logging.debug("[%s:%d] -> stale packet: %s" % ( + address, port, binascii.hexlify(message_or_exception.object.message_hash))) elif error_code == DUPLICATE_PACKET: - logging.debug("[%s:%d] -> duplicate packet: %s" % (address, port, binascii.hexlify(message_or_exception.object.message_hash))) + logging.debug("[%s:%d] -> duplicate packet: %s" % ( + address, port, binascii.hexlify(message_or_exception.object.message_hash))) elif error_code == MALFORMED_PACKET: logging.debug("[%s:%d] -> malformed packet: %s" % (address, port, packet_sample)) elif error_code == INVALID_SIGNATURE: @@ -172,6 +178,11 @@ return def handle_prod(self, prod): + self.conditionally_update_at(prod, prod.metadata.get('address')) + + # we need to "refresh" the peer in case the peer for this + # prod doesn't have an address set yet + prod.peer = self.state.get_peer_by_handle(prod.peer.handles[0]) prod.log_incoming(prod.peer) # reply to prod if necessary @@ -185,16 +196,19 @@ self.state ).reply() + # update last_external_address for future Address Cast messages + self.last_external_address = prod.pest_address + # request missing chain tips for chain in ['broadcast_self_chain', 'handle_self_chain', 'net_chain']: tip = getattr(prod, chain) if not self.long_buffer.has(tip): if not self.getdata_requests.get(tip): self.getdata_requests[getattr(prod, chain)] = time.time() - if chain in ['brodcast_self_chain', 'handle_self_chain']: - self.order_buffer.add(FakeOrphan({ 'self_chain': tip }, self.state)) + if chain in ['broadcast_self_chain', 'handle_self_chain']: + self.order_buffer.add(FakeOrphan({'self_chain': tip}, self.state)) else: - self.order_buffer.add(FakeOrphan({ 'net_chain': tip }, self.state)) + self.order_buffer.add(FakeOrphan({'net_chain': tip}, self.state)) GetData( { 'speaker': self.station_handle, @@ -204,6 +218,23 @@ self.state ).send() + def handle_address_cast(self, address_cast): + self.long_buffer.intern(address_cast) + if address_cast.address: + address_tuple = get_ascii_address(address_cast.address).split(':') + handle_to_update = address_cast.origin_peer.handles[0] + self.state.update_at({ + "handle": handle_to_update, + "address": address_tuple[0], + "port": int(address_tuple[1]) + }) + target_peer = self.state.get_peer_by_handle(handle_to_update) + Prod({ + "speaker": self.station_handle, + "flag": PROD_PROMPT + }, self.state).send(target_peer) + else: + self.rebroadcast(address_cast) def check_order_buffer(self): messages = self.order_buffer.dequeue_and_order_mature_messages() @@ -238,14 +269,14 @@ def report_presence(self): # if handle isn't in the presence dict, check if rubbish received and send /join if so for handle in self.state.get_peer_handles(): - if self.state.handle_is_online(handle): - if self.presence.get(handle) is None: - self.presence[handle] = STATUS_ONLINE - self.get_client().send_join(handle) - else: - if self.presence.get(handle): - del self.presence[handle] - self.get_client().send_part(handle) + if self.state.handle_is_online(handle): + if self.presence.get(handle) is None: + self.presence[handle] = STATUS_ONLINE + self.get_client().send_join(handle) + else: + if self.presence.get(handle): + del self.presence[handle] + self.get_client().send_part(handle) # if handle IS in the presence dict, check last message received from handle and send /away for handle in self.presence.keys(): @@ -278,10 +309,11 @@ self.clean_getdata_requests() def clean_getdata_requests(self): - for message_hash in self.getdata_requests.keys(): + get_data_requests_copy = self.getdata_requests.copy() + for message_hash in get_data_requests_copy: if (self.getdata_requests.get(message_hash) < time.time() - float(self.state.get_knob('getdata_requests_expiration_seconds'))): - del self.getdata_requests[message_hash] + self.getdata_requests.pop(message_hash) def deliver(self, message): # it's possible that these messages are from an order buffer @@ -290,7 +322,6 @@ if self.long_buffer.has(message.message_hash): return - # send to the irc client # emit a replay warning if this message is a getdata response and older than the last @@ -336,7 +367,15 @@ Prod({ "speaker": self.station_handle, "flag": PROD_PROMPT - }, self.state).send() + }, self.state).broadcast() + + def send_address_cast(self): + if self.last_external_address is not None: + AddressCast({ + "speaker": self.station_handle, + "external_address": self.last_external_address, + "long_buffer": self.long_buffer + }, self.state).broadcast() def get_client(self): if self.client: diff -uNr a/blatta/lib/version.py b/blatta/lib/version.py --- a/blatta/lib/version.py false +++ b/blatta/lib/version.py e69fdd48ec3063f5219e609c910aee1037989d049320af26ae03ced7e7b5769e83edac66ba021d3b8f6eea30df051e7ed91a5ad230b3bc8115e13b6a656af5f2 @@ -0,0 +1,6 @@ +import os + +current_path = os.path.dirname(os.path.realpath(__file__)) +with open(os.path.join(current_path, '../', 'VERSION')) as version_file: + VERSION = version_file.read().strip() + diff -uNr a/blatta/test_net_configs/ac/a.py b/blatta/test_net_configs/ac/a.py --- a/blatta/test_net_configs/ac/a.py false +++ b/blatta/test_net_configs/ac/a.py 51c4e3fdb422ec6b89624144ffe99b0bfc34b713a4d35f54941427ebc1b09f1ce51f8cb058165aaa29acadaf519b7134a9c21e17d7c8d7f42aae06cecfb279d1 @@ -0,0 +1,12 @@ +peers = [ + { + 'address': 'localhost', + 'key': '58bc4NyvMjasIXvsOvPxugaMpFS6tme+xJleOEwVn4iv2IuLUNAfHrkFCeL/Q4m/13Q5gfZxDbVEOtjQe+zW6Q==', + 'name': 'awt_b', + 'port': 7779 + }, + { + 'key': 'oVIZ+U9F1b0YI9QdLVt2If/qLxoHG/2NCmgXq7HyaYASNn3zQeXTR/4Tz8z9MB6gOkwu+5+LH8L+MsyyQ0nhdA==', + 'name': 'awt_c' + } +] diff -uNr a/blatta/test_net_configs/ac/b.py b/blatta/test_net_configs/ac/b.py --- a/blatta/test_net_configs/ac/b.py false +++ b/blatta/test_net_configs/ac/b.py 8869517ffced618bb8197669aa4124f29e450ca8f2e6451db8c7cb62d09f6ee4b6724dbd0a511851bcca1d392a4ab6c6a5a1fc1feb3f61e6f637a33becd6e251 @@ -0,0 +1,12 @@ +peers = [ + { 'address': 'localhost', + 'name': 'awt_a', + 'port': 7778, + 'key': '58bc4NyvMjasIXvsOvPxugaMpFS6tme+xJleOEwVn4iv2IuLUNAfHrkFCeL/Q4m/13Q5gfZxDbVEOtjQe+zW6Q==' + }, + { 'address': 'localhost', + 'key': '8ugkh+G1NC45DhPPtvPCI/78+fvV8K3v2XaQXvLGpJzeXy2IEA5ZnIo3PGU30+25JxAr0KV+InoqBa0VpY+zCA==', + 'name': 'awt_c', + 'port': 7780 + } +] diff -uNr a/blatta/test_net_configs/ac/c.py b/blatta/test_net_configs/ac/c.py --- a/blatta/test_net_configs/ac/c.py false +++ b/blatta/test_net_configs/ac/c.py 311e0fd18e8f51b3fc06bf6f5703b800bf304b3ef942cdbe83c1ce1519b49046575a929150897c94e021e384a8d3ea599cce74ea0e3b127b44f23c9beb0be6d7 @@ -0,0 +1,12 @@ +peers = [ + { + 'address': 'localhost', + 'name': 'awt_b', + 'port': 7779, + 'key': '8ugkh+G1NC45DhPPtvPCI/78+fvV8K3v2XaQXvLGpJzeXy2IEA5ZnIo3PGU30+25JxAr0KV+InoqBa0VpY+zCA==' + }, + { + 'key': 'oVIZ+U9F1b0YI9QdLVt2If/qLxoHG/2NCmgXq7HyaYASNn3zQeXTR/4Tz8z9MB6gOkwu+5+LH8L+MsyyQ0nhdA==', + 'name': 'awt_a' + } +] diff -uNr a/blatta/tests/test_broadcast.py b/blatta/tests/test_broadcast.py --- a/blatta/tests/test_broadcast.py 29ea5e81951450a33616be0d9609ca1391f9a92a45378aba6e6da21058d4ad588fef569cff2b4aee6b98be416aec7b6ce01bb09716a19975189bdb2a1281c04f +++ b/blatta/tests/test_broadcast.py 14f264146715836e1685a00f132d8c74288e10cdda49075bd02a4ab36459f89b757474f7efd2ca95bfd574438950a0c0d1e733704a1733cc3987dc2318f8b438 @@ -2,10 +2,11 @@ import helper from mock import Mock from lib.state import State -from lib.message import Message +from lib.message import Message, EMPTY_CHAIN from lib.broadcast import Broadcast from lib.long_buffer import LongBuffer from lib.order_buffer import OrderBuffer +from lib.message_factory import MessageFactory class TestMessage(unittest.TestCase): def setUp(self): @@ -50,20 +51,17 @@ 'handle': 'bob', 'speaker': 'alice', 'body': 'm1', - 'long_buffer': LongBuffer(self.alice_state) + 'long_buffer': LongBuffer(self.alice_state), + 'self_chain': EMPTY_CHAIN, + 'net_chain': EMPTY_CHAIN }, self.alice_state) message.send() bob = self.alice_state.get_peer_by_handle('bob') - message_bytes = message.get_message_bytes(bob) - black_packet = Message.pack(bob, message.command, message.bounces, message_bytes) + black_packet = Message.pack(bob, message.command, message.bounces, message.message_bytes) self.bob_socket.sendto.called_once_with(black_packet, (bob.address, bob.port)) # now bob must unpack the black packet alice = self.bob_state.get_peer_by_handle('alice') - received_message = Message.unpack(alice, - black_packet, - LongBuffer(self.bob_state), - OrderBuffer(self.bob_state), - {}) - self.assertEqual(message.body, received_message['body']) + received_message = MessageFactory().inflate(alice, black_packet, LongBuffer(self.bob_state), OrderBuffer(self.bob_state), {}) + self.assertEqual(message.body, received_message.body) diff -uNr a/blatta/tests/test_direct.py b/blatta/tests/test_direct.py --- a/blatta/tests/test_direct.py b5e72914e0e830b2ebab8b1052667ed3a36f72acd69312f87516894510aca9ebf0a629f502b02b960e7aa4bc383af801575b9ea0fd7860c26b9029b9af2d0540 +++ b/blatta/tests/test_direct.py eb0ea3e1c17bc3488917e7866cc7bfc26242011ba891b87288af5d79c0dbdb43b0b343269ec2d68330fa80d57b83ede4a16efef7163debcbe944c3fa9b2d154a @@ -10,6 +10,8 @@ from lib.direct import Direct from lib.long_buffer import LongBuffer from lib.order_buffer import OrderBuffer +from lib.message_factory import MessageFactory + class TestMessage(unittest.TestCase): def setUp(self): @@ -65,9 +67,11 @@ # now bob must unpack the black packet alice = self.bob_state.get_peer_by_handle('alice') - received_message = Message.unpack(alice, - black_packet, - LongBuffer(self.bob_state), - OrderBuffer(self.bob_state), - {}) - self.assertEqual(message.body, received_message['body']) \ No newline at end of file + received_message = MessageFactory().inflate( + alice, + black_packet, + LongBuffer(self.bob_state), + OrderBuffer(self.bob_state), + {} + ) + self.assertEqual(message.body, received_message.body) \ No newline at end of file diff -uNr a/blatta/tests/test_getdata.py b/blatta/tests/test_getdata.py --- a/blatta/tests/test_getdata.py c50220a20c89e9c56b3083422835887c8c5346354f2a10a117e873f3287a3f44e6f9ace31e46045b149a0a310a77c86e4506e682ccd944d2b88c2c484336f10b +++ b/blatta/tests/test_getdata.py deeec283af9d8cbe6abd297679817ddfead34efa0651fe25cf0831fa3408047c19d38f795c8bfe5e7954f06537ef37faa34e0d8dffe7e88e1fd0caf1724c4364 @@ -8,6 +8,8 @@ from lib.order_buffer import OrderBuffer from lib.state import State from lib.direct import Direct +from lib.message_factory import MessageFactory +from lib.message_exception import MessageException, OUT_OF_ORDER_SELF import helper class TestGetData(unittest.TestCase): @@ -70,47 +72,60 @@ }, self.bob_state) # we need to send these messages to get them into the log alice = self.bob_state.get_peer_by_handle('alice') - m1.message_bytes = m1.get_message_bytes(alice) m1.send() - m2.message_bytes = m2.get_message_bytes(alice) m2.send() - m3.message_bytes = m3.get_message_bytes(alice) m3.send() # now let's compile the black packet so alice can # unpack it and get a message we can pass to GetData() - m1_message_bytes = m1.get_message_bytes(alice) + m1_message_bytes = m1.message_bytes m1_black_packet = Message.pack(alice, m1.command, m1.bounces, m1_message_bytes) # we use m3 because if we used m2 there would be no break, # and if we used m1 it would be considered the first message - m3_message_bytes = m3.get_message_bytes(alice) + m3_message_bytes = m3.message_bytes # TODO: something strange going on here with the message bytes causing the logger to barf m3_black_packet = Message.pack(alice, m3.command, m3.bounces, m3_message_bytes) # we need bob's peer object to know what key to use to decrypt bob = self.alice_state.get_peer_by_handle('bob') - m1_received = Message.unpack(bob, - m1_black_packet, - LongBuffer(self.alice_state), - OrderBuffer(self.alice_state), - {}, - self.alice_state) - - m3_received = Message.unpack(bob, - m3_black_packet, - LongBuffer(self.alice_state), - OrderBuffer(self.alice_state), - {}, - self.alice_state) - gd_message = GetData(m3_received, 'self_chain', self.alice_state) + m1_received = MessageFactory().inflate( + bob, + m1_black_packet, + LongBuffer(self.alice_state), + OrderBuffer(self.alice_state), + {}, + self.alice_state + ) + + try: + MessageFactory().inflate( + bob, + m3_black_packet, + LongBuffer(self.alice_state), + OrderBuffer(self.alice_state), + {}, + self.alice_state + ) + except MessageException as me: + assert me.error_code is OUT_OF_ORDER_SELF + m3_received = me.object + + # gd_message = GetData(m3_received, 'self_chain', self.alice_state) + gd_message = GetData({ + 'speaker': 'alice', + 'body': m3_received.self_chain, + 'target_peer': bob + }) gd_message.send() # rebuild the black packet so we can compare with what was actually sent - gd_black_packet = Message.pack(bob, - gd_message.command, - gd_message.bounces, - gd_message.get_message_bytes(bob)) + gd_black_packet = Message.pack( + bob, + gd_message.command, + gd_message.bounces, + gd_message.message_bytes + ) self.alice_socket.sendto.called_once_with(gd_black_packet, (bob.address, bob.port)) \ No newline at end of file diff -uNr a/blatta/tests/test_station.py b/blatta/tests/test_station.py --- a/blatta/tests/test_station.py 90668d46c39a64258b943cf8ff176e84b0b98eb662b8e6193b717f6582bfeae981698754ff6f4aa7efda8f672ded907ee0680a5f3c175e3848718c712a200f2d +++ b/blatta/tests/test_station.py cd0f45e2780ceb2a6693d25eee911f1453ee251e9410c86f04cd1b620c8d4c41a27a38763171e9efe835dff9f775b838f5eefb0344a43f9ba04cf804d2ea189a @@ -27,7 +27,8 @@ 'channel_name', 'password', 'motd', - 'listen']) + 'listen', + 'station_handle']) options = Options( None, None, @@ -37,6 +38,7 @@ None, None, None, + None, None ) self.station = Station(options) @@ -77,7 +79,7 @@ pass def test_clean_getdata_requests_clears_expired_hashes(self): - self.station.getdata_requests["abc"] = time.time() - 15 + self.station.getdata_requests["abc"] = time.time() - 10000 self.station.clean_getdata_requests() self.assertEqual(len(self.station.getdata_requests), 0) @@ -111,7 +113,7 @@ self.station.rebroadcast.assert_called_once_with(low_bounce_message) def test_embargo_queue_cleared(self): - self.skipTest("the embargo queue is now th short buffer") + self.skipTest("the embargo queue is now the short buffer") peer = Mock() peer.handles = ["a", "b"] message = Mock() @@ -143,7 +145,7 @@ self.assertEqual(message.prefix, "c[a]") def test_in_wot_hearsay_prefix_under_four(self): - self.skipTest("the embargo queue is now th short buffer") + self.skipTest("the embargo queue is now the short buffer") peer1 = Mock() peer1.handles = ["a", "b"] peer2 = Mock() @@ -177,7 +179,7 @@ self.assertEqual(message_via_peer1.prefix, "c[a|d|f]") def test_in_wot_hearsay_prefix_more_than_three(self): - self.skipTest("the embargo queue is now th short buffer") + self.skipTest("the embargo queue is now the short buffer") peer1 = Mock() peer1.handles = ["a", "b"] peer2 = Mock()