feat: support multiple SMS backends

pull/1/head
Luca 8 months ago
parent 568f1bf9e8
commit 567b4cdf83

@ -8,7 +8,12 @@ ENVIRONMENT=production
#CELERY_RESULT_BACKEND=
#FALLBACK_DEACTIVATE_INTERVAL=
#MESSAGE_RECEIVE_INTERVAL=
#MESSAGE_FETCH_INTERVAL=
#MESSAGE_SEND_INTERVAL=
#REMINDER_SEND_INTERVAL=
#SHIFT_IMPORT_INTERVAL=
#SMS_INBOUND_BACKEND=
#SMS_OUTBOUND_BACKEND=
#SMS_SETTINGS=
#SMS_WEBHOOK_SECRET=

@ -10,6 +10,7 @@ click==8.1.2
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.2.0
clicksend-client==5.0.78
Deprecated==1.2.13
Django==5.0.4
django-dynamic-preferences==1.16.0

@ -9,6 +9,8 @@ from django.utils import timezone
from dynamic_preferences.registries import global_preferences_registry
from phonenumber_field.modelfields import PhoneNumberField
from shiftregister.messaging import message
global_preferences = global_preferences_registry.manager()
@ -185,6 +187,15 @@ class Message(models.Model):
def __str__(self):
return f"{self.to.name}({self.created_at}): {self.text}"
def as_outbound(self):
return message.Message(
self.pk,
recipient=self.to.phone,
text=self.text,
type=message.MessageType.OUTBOUND,
created_at=self.created_at,
)
def gen_token():
return secrets.token_urlsafe(

@ -1,57 +0,0 @@
from datetime import timezone
import requests
from django.conf import settings
BASE_URL = "https://api.sipgate.com/v2"
class SMS:
def __init__(self, item):
self.content = item["smsContent"]
self.created_at = item["created"]
self.id = item["id"]
self.sender = item["source"]
def list_incoming_sms(from_dt=None):
if not settings.SIPGATE_INCOMING_TOKEN_ID:
raise RuntimeError("required setting SIPGATE_INCOMING_TOKEN_ID is not set")
if not settings.SIPGATE_INCOMING_TOKEN:
raise RuntimeError("required setting SIPGATE_INCOMING_TOKEN is not set")
limit = 10
params = {
"directions": "INCOMING",
"limit": limit,
"types": "SMS",
}
if from_dt is not None:
params["from"] = (
from_dt.astimezone(timezone.utc)
.isoformat(timespec="seconds")
.replace("+00:00", "Z")
)
items = []
offset = 0
total = 10
while offset < total:
r = requests.get(
f"{BASE_URL}/history",
auth=requests.auth.HTTPBasicAuth(
settings.SIPGATE_INCOMING_TOKEN_ID, settings.SIPGATE_INCOMING_TOKEN
),
params=params | {"offset": offset},
)
r.raise_for_status()
data = r.json()
items += data["items"]
offset += limit
total = data["totalCount"]
return list(map(lambda item: SMS(item), items))

@ -1,32 +0,0 @@
import requests
from django.conf import settings
from phonenumber_field.phonenumber import PhoneNumber
BASE_URL = "https://api.sipgate.com/v2"
def send(recipient, message):
if not settings.SIPGATE_TOKEN_ID:
raise RuntimeError("required setting SIPGATE_TOKEN_ID is not set")
if not settings.SIPGATE_TOKEN:
raise RuntimeError("required setting SIPGATE_TOKEN is not set")
if not settings.SIPGATE_SMS_EXTENSION:
raise RuntimeError("required setting SIPGATE_SMS_EXTENSION is not set")
if not PhoneNumber.from_string(recipient).is_valid():
raise ValueError("invalid phone number")
r = requests.post(
f"{BASE_URL}/sessions/sms",
auth=requests.auth.HTTPBasicAuth(
settings.SIPGATE_TOKEN_ID, settings.SIPGATE_TOKEN
),
json={
"smsId": settings.SIPGATE_SMS_EXTENSION,
"recipient": recipient,
"message": message,
},
)
r.raise_for_status()

@ -1,3 +1,5 @@
import logging
import sentry_sdk
from celery import shared_task
from django.conf import settings
@ -5,44 +7,36 @@ from django.db import transaction
from django.utils import timezone
from dynamic_preferences.registries import global_preferences_registry
from shiftregister.messaging.outbound import send
from .models import Message, ShiftRegistration
from .sipgate.sms import send as send_sms
global_preferences = global_preferences_registry.manager()
logger = logging.getLogger(__name__)
def send(msg):
if not (
settings.SIPGATE_SMS_EXTENSION
and settings.SIPGATE_TOKEN
and settings.SIPGATE_TOKEN_ID
):
print(f"would send message to {msg.to.phone}\n---\n{msg.text}")
return
send_sms(str(msg.to.phone), msg.text)
# cron task to send normal messages(reminders,changes) in batches
# cron task to send normal messages(reminders, changes) in batches
@shared_task
def send_messages():
if not global_preferences["helper__send_sms"]:
print("sms disabled, not sending")
logger.info("sms disabled, not sending")
return
msgs = Message.objects.select_for_update().filter(sent_at__isnull=True)[
: global_preferences["helper__sms_rate"] * 2
]
with transaction.atomic():
for msg in msgs:
if msg.sent_at:
continue
try:
send(msg)
msg.sent_at = timezone.now()
msg.save()
except Exception as e:
sentry_sdk.capture_exception(e)
sent_msg_ids = []
try:
for msg in send(msg.as_outbound() for msg in msgs if not msg.sent_at):
sent_msg_ids.append(msg.key)
except Exception as e:
sentry_sdk.capture_exception(e)
Message.objects.select_for_update().filter(id__in=sent_msg_ids).update(
sent_at=timezone.now()
)
# singlemessage so registration links arrive faster.
@ -59,15 +53,17 @@ def send_message(msgid, is_retry=False):
msg = Message.objects.select_for_update().get(pk=msgid)
if msg.sent_at:
return
send(msg)
send(msg.as_outbound())
msg.sent_at = timezone.now()
msg.save()
except Message.DoesNotExist:
if not is_retry:
print("message not found, retrying")
logger.warning("message not found, retrying")
send_message.apply_async((msgid, True), countdown=0.2)
else:
print(f"message {msgid} not found in retry, giving up")
logger.error(f"message {msgid} not found in retry, giving up")
@shared_task
@ -81,6 +77,7 @@ def send_reminders():
for reg in regs:
if reg.reminder_sent:
continue
try:
reg.send_reminder()
except Exception as e:

@ -0,0 +1 @@
from .message import Message, MessageType

@ -0,0 +1,6 @@
from django.apps import AppConfig
class AppConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "shiftregister.messaging"

@ -0,0 +1,17 @@
from abc import ABC, abstractmethod
class Receiver(ABC):
@abstractmethod
def fetch(self):
raise NotImplementedError
@abstractmethod
def handle(self, **kwargs):
raise NotImplementedError
class Sender(ABC):
@abstractmethod
def send(self, messages):
raise NotImplementedError

@ -0,0 +1,102 @@
import json
from datetime import datetime, timezone
from clicksend_client import ApiClient
from clicksend_client import Configuration as BaseConfiguration
from clicksend_client import SMSApi, SmsMessage, SmsMessageCollection
from clicksend_client.rest import ApiException
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from ..exceptions import OutboundMessageError
from ..message import Message, MessageType
from .abc import Receiver as BaseReceiver
from .abc import Sender as BaseSender
__all__ = ("Receiver", "Sender")
MAX_BATCH_SIZE = 1000 # see https://developers.clicksend.com/docs/rest/v3/#how-many-messages-can-i-send
class Configuration(BaseConfiguration):
def __init__(self):
super().__init__()
self.username = settings.SMS_SETTINGS["clicksend_username"]
self.password = settings.SMS_SETTINGS["clicksend_password"]
try:
settings.SMS_SETTINGS["clicksend_sender_id"]
except KeyError:
raise ImproperlyConfigured(
"'clicksend_sender_id' must be set in SMS_SETTINGS for ClickSend backend"
)
client = ApiClient(Configuration())
class Receiver(BaseReceiver):
fetch = None
def handle(self, timestamp="", body="", message_id="", **kwargs):
try:
timestamp = int(timestamp)
except ValueError:
raise ValueError("invalid timestamp")
if not message_id:
raise ValueError("empty message id")
yield Message(
message_id,
sender=kwargs["from"],
text=body,
type=MessageType.INBOUND,
created_at=datetime.fromtimestamp(timestamp, timezone.utc),
)
class Sender(BaseSender):
def send(self, messages):
messages = messages[:MAX_BATCH_SIZE]
try:
response = (
SMSApi(client)
.sms_send_post(
SmsMessageCollection(
messages=[
SmsMessage(
**{
"from": settings.SMS_SETTINGS[
"clicksend_sender_id"
],
"body": message.text,
"to": message.recipient,
"source": "shiftregister",
"custom_string": message.key,
}
)
for message in messages
]
)
)
.data
)
except ApiException as e:
if e.body:
response = e.body
else:
raise OutboundMessageError(f"{e.status} {e.reason}")
response = json.loads(response)
for message in response.get("messages", []):
if message["status"] == "SUCCESS":
yield Message(
message["custom_string"],
recipient=message["to"],
sender=message["from"],
text=message["body"],
)

@ -0,0 +1,47 @@
import logging
from pprint import pformat
from uuid import uuid4
from ..message import Message, MessageType
from .abc import Receiver as BaseReceiver
from .abc import Sender as BaseSender
logger = logging.getLogger(__name__)
def make_dummy_message():
return Message(
uuid4(),
sender="+4915228817386",
text="Test Message Please Ignore",
type=MessageType.INBOUND,
)
class Receiver(BaseReceiver):
def fetch(self):
yield make_dummy_message()
handle = None
class Sender(BaseSender):
def send(self, messages):
for message in messages:
logger.info(f"would send sms\nto: {message.recipient}\n\n{message.text}")
yield message
class WebhookReceiver(BaseReceiver):
fetch = None
def handle(self, key="", sender="", text="", **kwargs):
if not key:
raise ValueError("empty message key")
if not sender:
raise ValueError("message has no sender")
logging.getLogger("django.server").info(
f"received sms via webhook\nkey: {key}\nfrom: {sender}\nadditional fields: {pformat(kwargs)}\n\n{text}"
)
yield Message(key, sender=sender, text=text, type=MessageType.INBOUND)

@ -0,0 +1,81 @@
from datetime import datetime, timezone
import requests
from django.conf import settings
from requests.auth import HTTPBasicAuth
from ..message import Message, MessageType
from .abc import Receiver as BaseReceiver
from .abc import Sender as BaseSender
__all__ = ("Receiver", "Sender")
BASE_URL = "https://api.sipgate.com/v2"
auth = HTTPBasicAuth(
settings.SMS_SETTINGS.get("sipgate_token_id", settings.SIPGATE_TOKEN_ID),
settings.SMS_SETTINGS.get("sipgate_token", settings.SIPGATE_TOKEN),
)
class Receiver(BaseReceiver):
def __init__(self):
self.from_dt = None
def fetch(self):
limit = 10
params = {
"directions": "INCOMING",
"limit": limit,
"types": "SMS",
}
if self.from_dt is not None:
params["from"] = (
from_dt.astimezone(timezone.utc)
.isoformat(timespec="seconds")
.replace("+00:00", "Z")
)
offset = 0
total = 10
while offset < total:
r = requests.get(
f"{BASE_URL}/history", auth=auth, params=params | {"offset": offset}
)
r.raise_for_status()
data = r.json()
for item in data["items"]:
created_at = datetime.fromisoformat(item["created"])
self.from_dt = max(self.from_dt, created_at)
yield Message(
item["id"],
sender=item["source"],
text=item["smsContent"],
type=MessageType.INBOUND,
created_at=created_at,
)
offset += limit
total = data["totalCount"]
handle = None
class Sender(BaseSender):
def send(self, messages):
for message in messages:
r = requests.post(
f"{BASE_URL}/sessions/sms",
auth=auth,
json={
"smsId": settings.SMS_SETTINGS.get(
"sipgate_sms_extension", settings.SIPGATE_SMS_EXTENSION
),
"recipient": message.recipient,
"message": message.text,
},
)
r.raise_for_status()

@ -0,0 +1,6 @@
class OutboundMessageError(Exception):
def __init__(self, description):
self.description = description
def __str__(self):
return self.description

@ -0,0 +1,29 @@
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from .backends.abc import Receiver
from .utils import import_class
__all__ = ("receiver",)
def resolve_backend():
receiver_cls = settings.SMS_INBOUND_BACKEND
if isinstance(receiver_cls, str):
receiver_cls = import_class(receiver_cls, "Receiver")
if not issubclass(receiver_cls, Receiver):
raise ImproperlyConfigured(
"SMS_INBOUND_BACKEND must be a subclass of shiftregister.messaging.backends.abc.Receiver"
)
receiver = receiver_cls()
if receiver.handle is not None and settings.SMS_WEBHOOK_SECRET is None:
raise ImproperlyConfigured(
"the specified SMS_INBOUND_BACKEND requires SMS_WEBHOOK_SECRET to be set"
)
return receiver
receiver = resolve_backend()

@ -0,0 +1,55 @@
from datetime import datetime
from enum import Enum, auto
from django.utils import timezone
from phonenumber_field.phonenumber import PhoneNumber
class MessageType(Enum):
INBOUND = auto()
OUTBOUND = auto()
class Message:
def __init__(
self,
key,
recipient="",
sender="",
text="",
type=None,
created_at=timezone.now(),
):
key = str(key)
recipient = str(recipient)
sender = str(sender)
text = str(text)
if not isinstance(type, MessageType):
raise TypeError("message type must be of type MessageType")
if not isinstance(created_at, datetime):
raise TypeError("message created_at must be of type datetime")
if (
type == MessageType.OUTBOUND
and not PhoneNumber.from_string(recipient).is_valid()
):
raise ValueError(
f"invalid recipient phone number for outbound message: {recipient}"
)
self.key = key
self.recipient = recipient
self.sender = sender
self.text = text
self.created_at = created_at
def __eq__(self, other):
if other is None:
return False
elif isinstance(other, Message):
other = other.key
else:
other = str(other)
return self.key == other

@ -0,0 +1,40 @@
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from . import Message
from .backends.abc import Sender
from .exceptions import OutboundMessageError
from .utils import import_class
__all__ = ("send",)
def resolve_backend():
sender_cls = settings.SMS_OUTBOUND_BACKEND
if isinstance(sender_cls, str):
sender_cls = import_class(sender_cls, "Sender")
if not issubclass(sender_cls, Sender):
raise ImproperlyConfigured(
"SMS_OUTBOUND_BACKEND must be a subclass of shiftregister.messaging.backends.abc.Sender"
)
return sender_cls()
sender = resolve_backend()
def send(messages):
if isinstance(messages, Message):
messages = [messages]
else:
messages = list(messages)
sent_messages = 0
for message in sender.send(messages):
sent_messages += 1
yield message
if sent_messages == 0 and len(messages) > 0:
raise OutboundMessageError("no messages have been sent")

@ -0,0 +1,3 @@
from django.dispatch import Signal
incoming_message = Signal()

@ -0,0 +1,12 @@
from celery import shared_task
from .inbound import receiver
from .signals import incoming_message
@shared_task
def fetch_messages():
if receiver.fetch is None:
return
incoming_message.send(receiver, messages=receiver.fetch())

@ -0,0 +1,8 @@
from django.urls import path
from . import views
app_name = "messaging"
urlpatterns = [
path("inbound", views.handle_inbound, name="handle_inbound"),
]

@ -0,0 +1,12 @@
from importlib import import_module
def import_class(path, default_class):
try:
module = import_module(path)
cls = default_class
except ModuleNotFoundError:
path, cls = path.rsplit(".", maxsplit=1)
module = import_module(path)
return getattr(module, cls)

@ -0,0 +1,56 @@
import json
from hashlib import sha256
from hmac import compare_digest
import sentry_sdk
from django.conf import settings
from django.http import (
HttpResponse,
HttpResponseBadRequest,
HttpResponseNotFound,
HttpResponseServerError,
)
from .inbound import receiver
from .signals import incoming_message
def handle_inbound(request):
if receiver.handle is None:
return HttpResponseNotFound()
kwargs = request.GET.dict()
try:
secret = kwargs.pop("secret")
if not compare_digest(
sha256(settings.SMS_WEBHOOK_SECRET.encode("utf-8")).digest(),
sha256(secret.encode("utf-8")).digest(),
):
return HttpResponseNotFound()
except KeyError:
return HttpResponseNotFound()
kwargs |= request.POST.dict()
if request.content_type == "application/json":
try:
body = json.loads(request.read())
except json.JSONDecodeError:
return HttpResponseBadRequest()
if not isinstance(body, dict):
return HttpResponseBadRequest()
kwargs |= body
try:
incoming_message.send(receiver, messages=receiver.handle(**kwargs))
except (IndexError, KeyError, ValueError):
return HttpResponseBadRequest()
except Exception as e:
sentry_sdk.capture_exception(e)
return HttpResponseServerError()
return HttpResponse()

@ -67,6 +67,7 @@ LOCAL_APPS = [
"shiftregister.fallback",
"shiftregister.feedback",
"shiftregister.importer",
"shiftregister.messaging",
"shiftregister.metrics",
"shiftregister.pages",
"shiftregister.signage",
@ -181,14 +182,14 @@ CELERY_BEAT_SCHEDULE = {
"task": "shiftregister.app.tasks.send_reminders",
"schedule": env.float("REMINDER_SEND_INTERVAL", default=300.0), # seconds
},
"receive-messages-every-300-seconds": {
"task": "shiftregister.team.tasks.receive_messages",
"schedule": env.float("MESSAGE_RECEIVE_INTERVAL", default=300.0), # seconds
},
"deactivate-fallbacks-every-300-seconds": {
"task": "shiftregister.fallback.tasks.deactivate_fallbacks",
"schedule": env.float("FALLBACK_DEACTIVATE_INTERVAL", default=300.0), # seconds
},
"fetch-messages-every-300-seconds": {
"task": "shiftregister.messaging.tasks.fetch_messages",
"schedule": env.float("MESSAGE_FETCH_INTERVAL", default=300.0), # seconds
},
}
CELERY_BEAT_SCHEDULE_FILENAME = str(BASE_DIR / "storage" / "celerybeat-schedule")
@ -207,6 +208,8 @@ MESSAGE_TAGS = {
messages.ERROR: "danger",
}
# Legacy sipgate settings
SIPGATE_SMS_EXTENSION = env("SIPGATE_SMS_EXTENSION", default=None)
SIPGATE_TOKEN_ID = env("SIPGATE_TOKEN_ID", default=None)
@ -216,3 +219,24 @@ SIPGATE_TOKEN = env("SIPGATE_TOKEN", default=None)
SIPGATE_INCOMING_TOKEN_ID = env("SIPGATE_INCOMING_TOKEN_ID", default=None)
SIPGATE_INCOMING_TOKEN = env("SIPGATE_INCOMING_TOKEN", default=None)
# New messaging settings
SMS_INBOUND_BACKEND = ".".join(
(
"shiftregister",
"messaging",
"backends",
env("SMS_INBOUND_BACKEND", default="dummy"),
)
)
SMS_OUTBOUND_BACKEND = ".".join(
(
"shiftregister",
"messaging",
"backends",
env("SMS_OUTBOUND_BACKEND", default="dummy"),
)
)
SMS_SETTINGS = env.dict("SMS_SETTINGS", default={})
SMS_WEBHOOK_SECRET = env("SMS_WEBHOOK_SECRET", default=None)

@ -0,0 +1,18 @@
# Generated by Django 5.0.4 on 2024-05-09 21:44
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("team", "0004_roomviewtoken"),
]
operations = [
migrations.AlterField(
model_name="incomingmessage",
name="id",
field=models.CharField(max_length=50, primary_key=True, serialize=False),
),
]

@ -13,7 +13,7 @@ class IncomingMessage(models.Model):
get_latest_by = "created_at"
indexes = (models.Index(fields=("sender",)),)
id = models.BigIntegerField(primary_key=True)
id = models.CharField(max_length=50, primary_key=True)
sender = PhoneNumberField()
content = models.TextField()
created_at = models.DateTimeField()

@ -1,8 +1,10 @@
import sentry_sdk
from django.dispatch import receiver
from django.shortcuts import reverse
from dynamic_preferences.registries import global_preferences_registry
from shiftregister.core.signals import populate_nav
from shiftregister.messaging.signals import incoming_message
from .models import IncomingMessage
@ -40,3 +42,19 @@ def populate_team_nav(sender, **kwargs):
)
return nav_items
@receiver(incoming_message, dispatch_uid="team_incoming_message")
def incoming_message(sender, messages=[], **kwargs):
for message in messages:
try:
IncomingMessage.objects.get_or_create(
id=message.key,
defaults={
"content": message.text,
"created_at": message.created_at,
"sender": message.sender,
},
)
except Exception as e:
sentry_sdk.capture_exception(e)

@ -1,31 +0,0 @@
import sentry_sdk
from celery import shared_task
from django.conf import settings
from shiftregister.app.sipgate.history import list_incoming_sms
from .models import IncomingMessage
@shared_task
def receive_messages():
if not settings.SIPGATE_INCOMING_TOKEN or not settings.SIPGATE_INCOMING_TOKEN_ID:
return
try:
from_dt = IncomingMessage.objects.latest().created_at
except IncomingMessage.DoesNotExist:
from_dt = None
try:
for sms in reversed(list_incoming_sms(from_dt)):
IncomingMessage.objects.get_or_create(
id=sms.id,
defaults={
"content": sms.content,
"created_at": sms.created_at,
"sender": sms.sender,
},
)
except Exception as e:
sentry_sdk.capture_exception(e)

@ -17,7 +17,7 @@ urlpatterns = [
path("mark_as_failed/<int:pk>", views.mark_as_failed, name="mark_as_failed"),
path("remove_helper/<int:pk>", views.delete_shiftregistration, name="unregister"),
path("incoming/", views.incoming_messages, name="incoming_messages"),
path("incoming/<int:pk>", views.incoming_message, name="incoming_message"),
path("incoming/mark_as_read/<int:pk>", views.mark_as_read, name="mark_as_read"),
path("incoming/<slug:pk>", views.incoming_message, name="incoming_message"),
path("incoming/mark_as_read/<slug:pk>", views.mark_as_read, name="mark_as_read"),
path("list/<slug:token>", views.room_view_token, name="room_view_token"),
]

@ -25,5 +25,6 @@ urlpatterns = [
path("team/", include("shiftregister.team.urls")),
path("team/", include("shiftregister.fallback.urls")),
path("dashboard/", include("shiftregister.signage.urls")),
path("messages/", include("shiftregister.messaging.urls")),
path("admin/", admin.site.urls),
]

Loading…
Cancel
Save