# Manta Python
# Manta Protocol Implementation for Python
# Copyright (C) 2018-2019 Alessandro ViganĂ²
# from __future__ import annotations
"""
Library that implements a basic a Manta :term:`Payment Processor`.
"""
from abc import abstractmethod
import base64
from dataclasses import dataclass
from decimal import Decimal
import logging
import traceback
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Set
import attr
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.serialization import load_pem_private_key
import paho.mqtt.client as mqtt
from .base import MantaComponent
from .dispatcher import Dispatcher
from .messages import (
PaymentRequestMessage,
MerchantOrderRequestMessage,
PaymentRequestEnvelope,
Destination,
PaymentMessage,
AckMessage,
Status,
Merchant,
)
logger = logging.getLogger(__name__)
class Conf(NamedTuple):
url: str
nano_address: str
key_file: str
nano_euro: float
class SessionDoesNotExist(Exception):
pass
[docs]@dataclass()
class TransactionState:
# noinspection PyUnresolvedReferences
"""
Transaction State represents state of a Transaction
Args:
txid: Transaction ID
session_id: Session ID of transaction
application: Application ID of transaction
order: Merchant order for transaction
payment_request: Last payment request of transaction
payment_message: Last payment message of transaction
ack: Last ack of transaction
wallet_request: Last wallet request of transaction
notify: callback to be called when attributes of transaction change
"""
txid: int
session_id: str
application: str
order: MerchantOrderRequestMessage
payment_request: Optional[PaymentRequestMessage] = None
payment_message: Optional[PaymentMessage] = None
ack: Optional[AckMessage] = None
wallet_request: Optional[str] = None
notify: Optional[Callable[[int, str, Any], None]] = None
def __setattr__(self, key, value):
if callable(self.notify):
self.notify(self.txid, key, value)
super().__setattr__(key, value)
[docs]class TXStorage:
"""
Storage for Active Session Data of Transaction
This is an abstract class to be implemented in subclassing.
This is meant to be used with external storage like a database.
Payproc will use a memory TXStorage implementation by default, which is not persistent.
Transaction with status 'PAID' or 'INVALID' must not be present
"""
[docs] @abstractmethod
def create(
self,
txid: int,
session_id: str,
application: str,
order: MerchantOrderRequestMessage,
ack: AckMessage = None,
) -> TransactionState:
"""
Create a new transaction
Args:
txid: txid of Transaction
session_id: session_id of Transaction
application: application_id of Transaction
order: Merchant Order of Transaction
ack: First Ack Message (ie status=NEW)
Returns:
"""
pass
[docs] @abstractmethod
def get_state_for_session(self, session_id: str) -> TransactionState:
"""
Get state for transaction with session_id
Args:
session_id: session_id of transaction
Returns: state of transaction
"""
pass
[docs] @abstractmethod
def session_exists(self, session_id: str) -> bool:
"""
Check if session exist
Args:
session_id: session_id of transaction
Returns: true if session exist
"""
pass
def __iter__(self):
return self
def __next__(self):
pass
@abstractmethod
def __len__(self):
pass
[docs]class TXStorageMemory(TXStorage):
"""
Implmentation of TXStorage as memory storage
"""
states: Dict[str, TransactionState]
def __init__(self):
self.states = {}
def _on_notify(self, txid, key, value):
if key == "ack":
value: AckMessage
if value.status in [Status.PAID, Status.INVALID]:
session_id = next(
key
for key, state in self.states.items()
if state.txid == int(value.txid)
)
del self.states[session_id]
[docs] def create(
self,
txid: int,
session_id: str,
application: str,
order: MerchantOrderRequestMessage,
ack: AckMessage = None,
) -> TransactionState:
tx = TransactionState(
txid=txid,
session_id=session_id,
application=application,
order=order,
ack=ack,
notify=self._on_notify,
)
self.states[session_id] = tx
return tx
[docs] def get_state_for_session(self, session_id: str) -> TransactionState:
return self.states[session_id]
[docs] def session_exists(self, session_id: str) -> bool:
return session_id in self.states
def __iter__(self):
return iter(self.states.items())
def __len__(self):
return len(self.states)
def generate_crypto_legacy_url(crypto: str, address: str, amount: float) -> str:
if crypto == "btc":
return "bitcoin:{}?amount={}".format(address, amount)
return ""
[docs]class PayProc(MantaComponent):
# noinspection PyUnresolvedReferences
"""
Manta Protocol Payment Processor Implementation
Args:
key_file: File name of PEM private key of Payment Processor. This
will be used to sign messages
certificate: File name of Manta Certificate Authority, IE mantaproto
host: MQTT Broker host
starting_txid: Transaction ID are progressive, starting from the
one specified
tx_storage: TXStorage instance to store session information
mqtt_options: A Dict of options to be passed to MQTT Client (like
username, password)
port: MQTT Broker port number. Specified only if it's different
than the default of 1883
Attributes:
get_destinations: Callback function to retrieve list of Destination
get_supported_cryptos: Callback function to retrieve list of
supported cryptos
"""
key: RSAPrivateKey
certificate: str
# str is txid
on_processed_order: Optional[
Callable[[str, MerchantOrderRequestMessage, AckMessage], None]
] = None
# on_processed_get_payment(txid, crypto, payment_request_message)
on_processed_get_payment: Optional[
Callable[[str, str, PaymentRequestMessage], None]
] = None
# str is txid
on_processed_payment: Optional[
Callable[[str, PaymentMessage, AckMessage], None]
] = None
on_processed_confirmation: Optional[Callable[[str, AckMessage], None]] = None
tx_storage: TXStorage
dispatcher: Dispatcher
txid: int
def __init__(
self,
key_file: str,
cert_file: str = None,
host: str = "localhost",
starting_txid: int = 0,
tx_storage: TXStorage = None,
mqtt_options: Dict[str, Any] = None,
port: int = 1883,
) -> None:
self.txid = starting_txid
self.tx_storage = tx_storage if tx_storage is not None else TXStorageMemory()
self.dispatcher = Dispatcher(self)
mqtt_options = mqtt_options if mqtt_options else {}
self.mqtt_client = mqtt.Client(**mqtt_options)
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.enable_logger()
self.host = host
self.port = port
with open(key_file, "rb") as myfile:
key_data = myfile.read()
self.key = PayProc.key_from_keydata(key_data)
if cert_file is not None:
with open(cert_file, "r") as cfile:
self.certificate = cfile.read()
else:
self.certificate = ""
def get_merchant(application: str) -> Merchant:
raise NotImplementedError
def get_destinations(
application: str, merchant_order: MerchantOrderRequestMessage
) -> List[Destination]:
raise NotImplementedError
def get_supported_cryptos(
application: str, merchant_order: MerchantOrderRequestMessage
) -> Set[str]:
raise NotImplementedError
self.get_merchant: Callable[[str], Merchant] = get_merchant
# get_destinations(application: str, merchant_order: MerchantOrderRequestMessage)
self.get_destinations: Callable[
[str, MerchantOrderRequestMessage], List[Destination]
] = get_destinations
# get_supported_cryptos(application: str, merchant_order: MerchantOrderRequestMessage)
self.get_supported_cryptos: Callable[
[str, MerchantOrderRequestMessage], Set[str]
] = get_supported_cryptos
[docs] def run(self):
"""
Start processing network requests. This starts the :term:`MQTT`
client processing loop in another thread.
"""
self.mqtt_client.connect(host=self.host, port=self.port)
self.mqtt_client.loop_start()
[docs] @staticmethod
def key_from_keydata(key_data: bytes) -> RSAPrivateKey:
"""
Given a string containing the key encoded in PEM format, loads it.
Args:
key_data: string containing the key in PEM format
Returns:
a ready key object
"""
return load_pem_private_key(key_data, password=None, backend=default_backend())
[docs] def sign(self, message: bytes) -> bytes:
"""
Sign the given string with the cryptographic key specified at
creation time.
Args:
message: string to sign
Returns:
signature text
"""
# signature = key.sign(message,
# padding.PSS(
# mgf=padding.MGF1(hashes.SHA256()),
# salt_length=padding.PSS.MAX_LENGTH
# ),
# hashes.SHA256())
signature = self.key.sign(message, padding.PKCS1v15(), hashes.SHA256())
return base64.b64encode(signature)
# noinspection PyUnusedLocal,PyMethodMayBeStatic
def on_connect(self, client, userdata, flags, rc):
logger.info("Connected with result code " + str(rc))
self._subscribe("merchant_order_request/+")
self._subscribe("merchant_order_cancel/+")
for session, value in self.tx_storage:
self._subscribe("payment_requests/{}/+".format(session))
self._subscribe("payments/{}".format(session))
self.mqtt_client.publish("certificate", self.certificate, retain=True)
def _subscribe(self, topic):
self.mqtt_client.subscribe(topic)
logger.info("Subscribed to %r", topic)
@Dispatcher.method_topic("merchant_order_cancel/+")
def on_merchant_order_cancel(self, session_id, payload):
logger.info("Request for canceling order with session_id %r", session_id)
self.invalidate(session_id, "Canceled by Merchant")
@Dispatcher.method_topic("merchant_order_request/+")
def on_merchant_order_request(self, application_id, payload):
logger.info("Processing merchant_order message")
p = MerchantOrderRequestMessage.from_json(payload)
ack: AckMessage
# This is a manta request
if p.crypto_currency is None or p.crypto_currency == "":
self.mqtt_client.subscribe("payment_requests/{}/+".format(p.session_id))
self.mqtt_client.subscribe("payments/{}".format(p.session_id))
ack = AckMessage(
status=Status.NEW,
url="manta://{}{}/{}".format(
self.host,
":" + str(self.port) if self.port != 1883 else "",
p.session_id,
),
txid=str(self.txid),
)
self.ack(p.session_id, ack)
self.tx_storage.create(self.txid, p.session_id, application_id, p, ack)
self.txid = self.txid + 1
else:
destinations = self.get_destinations(application_id, p)
d = destinations[0]
ack = AckMessage(
txid=str(self.txid),
status=Status.NEW,
url=generate_crypto_legacy_url(
d.crypto_currency, d.destination_address, Decimal(d.amount)
),
)
self.ack(p.session_id, ack)
self.tx_storage.create(self.txid, p.session_id, application_id, p, ack),
self.txid = self.txid + 1
if callable(self.on_processed_order):
self.on_processed_order(ack.txid, p, ack)
# noinspection PyUnusedLocal
@Dispatcher.method_topic("payment_requests/+/+")
def on_get_payment_request(
self, session_id: str, crypto_currency: str, payload: str
):
logger.info("Processing payment request message")
state: TransactionState = self.tx_storage.get_state_for_session(session_id)
state.wallet_request = crypto_currency
request = MerchantOrderRequestMessage(
fiat_currency=state.order.fiat_currency,
amount=state.order.amount,
session_id=session_id,
crypto_currency=None if crypto_currency == "all" else crypto_currency,
)
application = state.application
envelope = self.generate_payment_request(application, request)
state.payment_request = envelope.unpack()
logger.info("Publishing {}".format(envelope))
self.mqtt_client.publish(
"payment_requests/{}".format(session_id), envelope.to_json()
)
if callable(self.on_processed_get_payment):
assert state.ack is not None
self.on_processed_get_payment(
state.ack.txid, crypto_currency, envelope.unpack()
)
@Dispatcher.method_topic("payments/+")
def on_payment(self, session_id: str, payload: str):
if self.tx_storage.session_exists(session_id):
payment_message = PaymentMessage.from_json(payload)
state = self.tx_storage.get_state_for_session(session_id)
# check if crypto is one of the supported
payment_request = state.payment_request
assert payment_request is not None
if payment_message.crypto_currency.upper() not in [
x.upper() for x in payment_request.supported_cryptos
]:
return
new_ack = attr.evolve(
state.ack,
status=Status.PENDING,
transaction_hash=payment_message.transaction_hash,
transaction_currency=payment_message.crypto_currency,
url=None,
)
assert new_ack is not None
state.payment_message = payment_message
state.ack = new_ack
self.ack(session_id, new_ack)
if callable(self.on_processed_payment):
self.on_processed_payment(state.ack.txid, payment_message, new_ack)
# noinspection PyUnusedLocal
def on_message(self, client: mqtt.Client, userdata, msg):
logger.info("New Message {} on {}".format(msg.payload, msg.topic))
try:
self.dispatcher.dispatch(msg.topic, payload=msg.payload)
except Exception as e:
logger.error(e)
traceback.print_exc()
[docs] def ack(self, session_id: str, ack: AckMessage):
"""
Publish the given :class:`~.messages.AckMessage`.
Args:
session_id: id of the session where to send the messages
"""
logger.info("Publishing ack for {} as {}".format(session_id, ack.status.value))
self.mqtt_client.publish("acks/{}".format(session_id), ack.to_json())
[docs] def confirming(self, session_id: str):
"""
Change the status of the session with the given ``session_id`` to
:attr:`~.messages.Status.CONFIRMING` and publish the
:class:`~.messages.AckMessage`.
Args:
session_id: session to change
"""
if self.tx_storage.session_exists(session_id):
state = self.tx_storage.get_state_for_session(session_id)
new_ack = attr.evolve(state.ack, status=Status.CONFIRMING)
assert new_ack is not None
state.ack = new_ack
self.ack(session_id, new_ack)
[docs] def confirm(
self,
session_id: str,
transaction_hash: Optional[str] = None,
transaction_currency: Optional[str] = None,
):
"""
Change the status of the session with the given ``session_id`` to
:attr:`~.messages.Status.PAID` and publish the
:class:`~.messages.AckMessage`.
Args:
session_id: session to change
"""
if self.tx_storage.session_exists(session_id):
state = self.tx_storage.get_state_for_session(session_id)
new_ack = attr.evolve(state.ack, status=Status.PAID)
assert new_ack is not None
new_ack.url = None
if transaction_hash:
new_ack.transaction_hash = transaction_hash
if transaction_currency:
new_ack.transaction_currency = transaction_currency
state.ack = new_ack
self.ack(session_id, new_ack)
if callable(self.on_processed_confirmation):
self.on_processed_confirmation(state.ack.txid, new_ack)
[docs] def invalidate(self, session_id: str, reason: str = ""):
"""
Change the status of the session with the given ``session_id`` to
:attr:`~.messages.Status.INVALID` and publish the
:class:`~.messages.AckMessage`.
Args:
session_id: session to change
reason: reason for INVALID status (ex. 'Timeout')
"""
if self.tx_storage.session_exists(session_id):
state = self.tx_storage.get_state_for_session(session_id)
new_ack = attr.evolve(state.ack, status=Status.INVALID, memo=reason)
assert new_ack is not None
state.ack = new_ack
self.ack(session_id, new_ack)
[docs] def generate_payment_request(
self, device: str, merchant_request: MerchantOrderRequestMessage
) -> PaymentRequestEnvelope:
"""
Create a :class:`~.messages.PaymentRequestEnvelope` containing the
payment informations.
Args:
device: :term:`application_id` of the :term:`POS`
merchant_request: object containing payment infos
Returns:
an envelope containing a :class:`~.message.PaymentRequestMessage`
"""
merchant = self.get_merchant(device)
destinations = self.get_destinations(device, merchant_request)
supported_cryptos = self.get_supported_cryptos(device, merchant_request)
message = PaymentRequestMessage(
merchant=merchant,
amount=merchant_request.amount,
fiat_currency=merchant_request.fiat_currency,
destinations=destinations,
supported_cryptos=supported_cryptos,
)
json_message = message.to_json()
signature = self.sign(json_message.encode("utf-8")).decode("utf-8")
payment_request_envelope = PaymentRequestEnvelope(
message=json_message, signature=signature
)
return payment_request_envelope