Язык программирования Python

Разработка REST API с помощью Twisted Klein в Python

Klein – это очень маленький веб-фреймворк, построенный на базе Twisted, асинхронной сетевой платформы, о которой мы уже много говорили, и Werkzeug, библиотеки для разработки приложений WSGI.

Поскольку фреймворк Flask был разработан поверх последнего, разработка приложений на Klein будет довольно знакома тем, кто уже имеет опыт работы с ним.

Будучи асинхронным фреймворком, он идеально подходит для разработки веб-сервисов, поскольку может обрабатывать несколько запросов одновременно, в отличие от WSGI-фреймворков ─ таких как Django, web2py, Pyramid.

В этой статье мы разработаем небольшой REST API, который позволит отправлять электронные письма через SMTP, а затем проверять их статус.

Это улучшит, например, пользовательский опыт в веб-приложении, которое требует отправки электронной почты, поскольку вам не придется напрямую взаимодействовать с SMTP-сервером, а через HTTP с нашим API, который будет отвечать значительно быстрее.

Установка

Прежде чем мы начнем, давайте установим Klein с помощью pip:

pip install klein

Эта команда установит все зависимости, включая Twisted и Werkzeug.

Первые шаги

Что ж, начнем с определения базовой структуры приложения Klein, которое будет работать на локальном адресе и порту 7001.

#!/usr/bin/env python # -*- coding: utf-8 -*- from klein import Klein class EmailService: app = Klein() if __name__ == '__main__': emailservice = EmailService() emailservice.app.run('localhost', 7001)

Далее мы откроем маршрут /email, определив метод EmailService.email(), который будет принимать две операции: POST, чтобы отправить письмо, и GET, который вернет список отправленных сообщений.

@app.route('/email', methods=['GET', 'POST']) def email(self, request): if request.method == b'POST': return 'Отправить письмо.' elif request.method == b'GET': return 'Список отправленных сообщений.'
Code language: PHP (php)

Как вы видите, вторым аргументом каждого метода всегда является request, который содержит информацию о HTTP-запросе.

Возвращаемое значение функции будет отправлено пользователю в качестве ответа.

Давайте запустим наш небольшой код и проведем несколько тестов, используя библиотеку Requests (если она у вас не установлена, просто запустите pip install requests).

>>> import requests >>> url = 'https://localhost:7001/email' >>> r = requests.get(url) >>> r.text 'Список отправленных сообщений.' >>> r = requests.post(url) >>> r.text 'Отправить письмо.'
Code language: JavaScript (javascript)

Отлично, наше приложение правильно отвечает на запросы.

Сначала мы разработаем операцию POST. Мы сделаем его требующим параметры to, subject и message, которые будут указывать получателя письма, тему и сообщение.

Мы можем получить доступ к параметрам запроса через словарь request.args.

if request.method == b'POST': print(request.args[b'to']) print(request.args[b'subject']) print(request.args[b'message'])
Code language: PHP (php)

Если мы проведем следующий тест:

>>> params = {'to': 'mail@site.ru', 'subject': 'Тест', ... 'message': 'Привет, мир!'} >>> requests.post(url, data=params)
Code language: JavaScript (javascript)

Мы увидим в консоли, как выводятся значения параметров.

2021-05-26 15:02:48-0300 [-] [b'maile@site.ru'] 2021-05-26 15:02:48-0300 [-] [b'Тест'] 2021-05-26 15:02:48-0300 [-] [b'\xc2\xa1Привет, мир!']
Code language: CSS (css)

Обратите внимание, что параметры всегда являются списками, поэтому мы должны получить доступ к их первому элементу, который является экземпляром типа bytes (обратите внимание на b перед строками).

Но сначала я хочу проверить, что все параметры присутствуют.

if request.method == b'POST': # Поиск недостающих параметров. for param in (b'to', b'message', b'subject'): if param not in request.args: return "Отсутствует параметр: '{}'.".format( param.decode('utf-8'))
Code language: PHP (php)
>>> r = requests.post(url) >>> r.text "Отсутствует параметр: 'to'."
Code language: JavaScript (javascript)

Однако REST API обычно работают с форматами JSON и XML в своих ответах, а не с обычным текстом или HTML-кодом.

Мы остановим свой выбор на JSON, который используется наиболее часто.

Таким образом, ответ нашего приложения будет выглядеть примерно так:

{'succeed': true, 'status': 200, ...}
Code language: JavaScript (javascript)

То есть, он всегда будет иметь значения succeed и status, которые будут указывать на успешность выполнения операции и код возврата HTTP (200 означает успешный запрос).

Остальные возвращаемые значения будут зависеть от операции.

Что ж, давайте создадим метод, который позаботится о создании возвращаемого значения в формате JSON и установит соответствующий HTTP-заголовок.

def response(self, request, succeed=True, status=200, **kwargs): """ Создание тела ответа в виде JSON и установка соответствующего заголовка content-type. """ request.setHeader('Content-Type', 'application/json') request.setResponseCode(status) return json.dumps( {'succeed': succeed, 'status': status, **kwargs})
Code language: PHP (php)

Не забудем импортировать модуль json в начале файла.

import json
Code language: JavaScript (javascript)

В конце давайте изменим возвращаемое значение в случае, если какой-либо параметр отсутствует в операции POST.

if param not in request.args: return self.response( request, succeed=False, status=400, reason="Отсутствует параметр: '{}'.".format( param.decode('utf-8')) )
Code language: PHP (php)
>>> r = requests.post(url) >>> r.json() {'succeed': False, 'status': 400, 'reason': "Отсутствует параметр: 'to'."}
Code language: PHP (php)

В данном случае мы возвращаем код ошибки 400, который указывает на неправильно сформированный запрос от клиента.

Обработка ошибок

Есть два случая, когда Klein автоматически возвращает две HTTP-ошибки.

Один – при попытке доступа к несуществующему пути (404); другой – при выполнении неопределенной операции на существующем пути (405), например, PUT или DELETE на /email.

Мы увидим, что ответ по умолчанию – это HTML-код.

>>> r = requests.put(url) >>> r <Response [405]> >>> r.text '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n<title>405 Method Not Allowed</title>\n<h1>Method Not Allowed</h1>\n<p>The method is not allowed for t he requested URL.</p>\n'
Code language: HTML, XML (xml)

Наш API должен уметь обрабатывать эти ошибки и возвращать вместо них ответ в формате JSON в соответствии с указанным нами форматом.

Для этого мы импортируем соответствующие исключения.

from werkzeug.exceptions import NotFound, MethodNotAllowed
Code language: JavaScript (javascript)

Затем мы определяем методы, которые будут обрабатывать эти исключения.

@app.handle_errors(NotFound) def notFoundHandler(self, request, failure): """ Вызывается при появлении сообщения 404 not found. """ return self.response( request, succeed=False, status=404, reason='Not found.') @app.handle_errors(MethodNotAllowed) def methodNotAllowedHandler(self, request, failure): """ Вызывается, когда возникает ошибка 405. """ return self.response( request, succeed=False, status=405, reason="Method not allowed.")
Code language: PHP (php)

Давайте убедимся, что все работает правильно:

>>> r = requests.put(url) >>> r.json() {'succeed': False, 'status': 405, 'reason': 'Method not allowed.'} >>> r = requests.get('https://localhost:7001/no-existe') >>> r.json() {'succeed': False, 'status': 404, 'reason': 'Not found.'}
Code language: PHP (php)

База данных

Теперь давайте посмотрим, как хранить в базе данных электронные письма, которые должна отправить наш сервис.

Мы будем использовать механизм базы данных SQLite с помощью стандартного модуля sqlite3, а также модуль adbapi от Twisted.

Последний позволяет нам получить доступ к синхронному API ─ SQLite ─ через асинхронную обертку.

Перед этим мы создадим сценарий initdb.py, который будет отвечать за инициализацию базы данных, то есть за создание файла и соответствующей таблицы.

#!/usr/bin/env python # -*- coding: utf-8 -*- import sqlite3 conn = sqlite3.connect('emailservice.db') cursor = conn.cursor() cursor.execute( 'CREATE TABLE emails (id INTEGER PRIMARY KEY, recipient TEXT, sent INTEGER);' ) conn.commit() conn.close()
Code language: PHP (php)

Запустим его один раз, чтобы сгенерировать файл emailservice.db.

Теперь, возвращаясь к нашему веб-API, первым шагом будет импорт класса adbapi.ConnectionPool.

from twisted.enterprise.adbapi import ConnectionPool
Code language: JavaScript (javascript)

Затем мы определим связь как атрибут.

class EmailService: app = Klein() conn = ConnectionPool('sqlite3', 'emailservice.db')

Мы хотим вставлять строку в нашу таблицу каждый раз, когда POST-запрос делается к /email, чтобы это работало как запись. Тогда ее статус может быть запрошен. Давайте создадим следующий метод для выполнения первого.

def insertEmail(self, transaction, to): """ Эта функция будет вызываться Twisted в потоке, поэтому мы можем безопасно использовать блокирующие функции, такие как execute(). Возвращаемое значение передается в основной поток, где была вызвана функция runInteraction(). """ transaction.execute( 'INSERT INTO emails (recipient, sent) VALUES (?, ?);', (to, False) ) return transaction.lastrowid
Code language: PHP (php)

Первый аргумент transaction эквивалентен обычному (стандарт DB-API) курсору для выполнения запросов (предоставляется Twisted), второй указывает на получателя сообщения.

После выполнения запроса мы возвращаем уникальный идентификатор, присвоенный SQLite.

Внутри нашего метода email(), после проверки параметров, мы добавим следующее.

to = request.args[b'to'][0].decode('utf-8') lastID = yield self.conn.runInteraction(self.insertEmail, to) return self.response(request, emailID=lastID)
Code language: PHP (php)

Это заставит Twisted выполнять наш запрос в отдельном потоке, чтобы не блокировать наш API.

Использование yield аналогично стандартному модулю asyncio: Twisted будет обрабатывать другие события в ожидании перезапуска запроса.

Нам нужно применить декоратор inlineCallbacks. Давайте сначала импортируем его:

from twisted.internet.defer import inlineCallbacks
Code language: JavaScript (javascript)

Затем мы применяем его, оставляя нашу функцию в таком виде:

@app.route('/email', methods=['GET', 'POST']) @inlineCallbacks def email(self, request): if request.method == b'POST': # Поиск отсутствующих параметров. for param in (b'to', b'message', b'subject'): if param not in request.args: return self.response( request, succeed=False, status=400, reason="Отсутствующий параметр: '{}'.".format( param.decode('utf-8')) ) to = request.args[b'to'][0].decode('utf-8') lastID = yield self.conn.runInteraction(self.insertEmail, to) return self.response(request, emailID=lastID) elif request.method == b'GET': return 'Список отправленных сообщений..'
Code language: PHP (php)

Проверяем, что все работает.

>>> params = { ... 'to': 'mail@site.ru', ... 'subject': 'Тест', ... 'message': 'Привет, мир!' ... } >>> r = requests.post(url, data=params) >>> r.json() {'succeed': True, 'status': 200, 'emailID': 1}
Code language: PHP (php)

Это только половина работы.

Остается обратное действие: возврат списка всех писем или конкретного письма через GET-запрос.

Для этого мы должны переопределить структуру нашего метода, чтобы опционально принимать ID сообщения.

@app.route('/email', methods=['GET', 'POST'], defaults={'emailID': None}) @app.route('/email/<int:emailID>', methods=['GET']) @inlineCallbacks def email(self, request, emailID):
Code language: PHP (php)

Теперь давайте определим логику, которая будет получать информацию из базы данных и возвращать ее пользователю в формате JSON.

elif request.method == b'GET': if emailID is None: # Получение полного списка писем. emails = yield self.query( 'SELECT id, recipient, sent FROM emails;') else: # Получение указанного электронного адреса. emails = yield self.query( 'SELECT id, recipient, sent FROM emails WHERE id=?', (emailID,) ) if not emails: raise NotFound emails = emails_to_json(emails) if emailID is not None: emails = emails[0] response = self.response(request, emails=emails) return response
Code language: PHP (php)

Здесь мы сделали ссылку на два объекта, которые мы еще не определили, а именно на метод query(), который выполняет запрос:

def query(self, *args): """Запуск запроса с использованием текущего соединения.""" return self.conn.runQuery(*args)
Code language: PHP (php)

И функция emails_to_json() (вне класса), которая преобразует результат, полученный из SQLite, в JSON:

def emails_to_json(rows): def row_to_json(row): return json.dumps( {'id': row[0], 'to': row[1], 'sent': bool(row[2])}) return tuple(map(row_to_json, rows))
Code language: JavaScript (javascript)

Далее соответствующие тесты:

>>> r = requests.get(url) >>> r.json() {'succeed': True, 'status': 200, 'emails': ['{"id": 1, "to": "mail@site.ru", "sent": false}']} >>> r = requests.get(url + '/1') >>> r.json() {'succeed': True, 'status': 200, 'emails': '{"id": 1, "to": "mail@site.ru", "sent": false}'}
Code language: PHP (php)

Отправка электронных писем

Давайте начнем с импорта того, что нам нужно: функции sendmail() из Twisted и стандартного класса MIMEText для создания тела письма.

from email.mime.text import MIMEText # [...] from twisted.mail.smtp import sendmail
Code language: CSS (css)

Затем изменим метод email() так, чтобы при получении POST-запроса он отправлял сообщение после того, как сохранит его в базе данных.

if request.method == b'POST': # [...] lastID = yield self.conn.runInteraction(self.insertEmail, to) message = MIMEText( request.args[b'message'][0].decode("utf-8")) message['Subject'] = \ request.args[b'subject'][0].decode("utf-8") message['From'] = USER message['To'] = to d = sendmail( EMAIL_SERVER, USER, [to], message, username=USER, password=PASSWORD ) d.addCallback(self.emailSent, lastID) return self.response(request, emailID=lastID)
Code language: PHP (php)

Мы должны создать константы EMAIL_SERVER, USER и PASSWORD с соответствующими данными SMTP-сервера.

Определим событие emailSent(), которое будет вызываться, когда Twisted успешно отправит сообщение.

def emailSent(self, result, emailID): emails_sent = result[0] if emails_sent == 0: return self.query('UPDATE emails SET sent=1 WHERE id=?', (emailID,))
Code language: PHP (php)

С этими правками наш API уже будет эффективно отправлять электронные письма.

# Отправление письма. >>> r = requests.post(url, data=params) >>> r.json() {'succeed': True, 'status': 200, 'emailID': 3} # Проверка состояния. >>> r = requests.get(url + '/3') >>> r.json() {'succeed': True, 'status': 200, 'emails': '{"id": 3, "to": "mail@site.ru", "sent": true}'}
Code language: PHP (php)

Кэширование

Включим небольшую систему кэширования в памяти, используя словарь.

class EmailService: app = Klein() cache = {} conn = ConnectionPool('sqlite3', 'emailservice.db')

Это предотвратит многократное выполнение одного и того же запроса при получении списка сообщений или какого-либо конкретного сообщения.

elif request.method == b'GET': # Прежде чем обращаться к базе данных, ищем кэшированный результат. key = 'emails' if emailID is None else 'email-{}'.format(emailID) try: return self.cache[key] except KeyError: pass if emailID is None: # Получение полного списка писем. emails = yield self.query( 'SELECT id, recipient, sent FROM emails;') else: # Получение определенного электронного адреса. emails = yield self.query( 'SELECT id, recipient, sent FROM emails WHERE id=?', (emailID,) ) if not emails: raise NotFound emails = emails_to_json(emails) if emailID is not None: emails = emails[0] response = self.response(request, emails=emails) self.cache[key] = response return response
Code language: PHP (php)

Полный код

#!/usr/bin/env python # -*- coding: utf-8 -*- from email.mime.text import MIMEText import json from klein import Klein from twisted.enterprise.adbapi import ConnectionPool from twisted.internet.defer import inlineCallbacks from twisted.mail.smtp import sendmail from werkzeug.exceptions import NotFound, MethodNotAllowed EMAIL_SERVER = '' USER = '' PASSWORD = '' def emails_to_json(rows): def row_to_json(row): return json.dumps( {'id': row[0], 'to': row[1], 'sent': bool(row[2])}) return tuple(map(row_to_json, rows)) class EmailService: app = Klein() cache = {} conn = ConnectionPool('sqlite3', 'emailservice.db') def query(self, *args): """Запуск запроса с использованием текущего соединения.""" return self.conn.runQuery(*args) def response(self, request, succeed=True, status=200, **kwargs): """ Создание тела ответа в виде JSON и установка соответствующего типа содержимого заголовок. """ request.setHeader('Content-Type', 'application/json') request.setResponseCode(status) return json.dumps( {'succeed': succeed, 'status': status, **kwargs}) def emailSent(self, result, emailID) emails_sent = result[0] if emails_sent == 0: return self.query('UPDATE emails SET sent=1 WHERE id=?', (emailID,)) def insertEmail(self, transaction, to): """ Эта функция будет вызываться Twisted в потоке, поэтому мы можем безопасно использовать блокирующие функции, такие как execute(). Возвращаемое значение передается в основной поток, где была вызвана функция runInteraction(). """ transaction.execute( 'INSERT INTO emails (recipient, sent) VALUES (?, ?);', (to, False) ) return transaction.lastrowid @app.handle_errors(NotFound) def notFoundHandler(self, request, failure): """ Вызывается при появлении сообщения 404 not found. """ return self.response( request, succeed=False, status=404, reason='Not found.') @app.handle_errors(MethodNotAllowed) def methodNotAllowedHandler(self, request, failure): """ Вызывается, когда возникает ошибка 405. """ return self.response( request, succeed=False, status=405, reason="Method not allowed.") @app.route('/email', methods=['GET', 'POST'], defaults={'emailID': None}) @app.route('/email/<int:emailID>', methods=['GET']) @inlineCallbacks def email(self, request, emailID): if request.method == b'POST': # Поиск отутствующих параметров for param in (b'to', b'message', b'subject'): if param not in request.args: return self.response( request, succeed=False, status=400, reason="Отсутствующий параметр: '{}'.".format( param.decode('utf-8')) ) to = request.args[b'to'][0].decode('utf-8') # Выполнение запроса insert и возврат вставленного ID. lastID = yield self.conn.runInteraction(self.insertEmail, to) message = MIMEText( request.args[b'message'][0].decode("utf-8")) message['Subject'] = \ request.args[b'subject'][0].decode("utf-8") message['From'] = USER message['To'] = to d = sendmail( EMAIL_SERVER, USER, [to], message, username=USER, password=PASSWORD ) d.addCallback(self.emailSent, lastID) return self.response(request, emailID=lastID) elif request.method == b'GET': # Поиск кэшированного результата перед обращением к базе данных. key = 'emails' if emailID is None else 'email-{}'.format(emailID) try: return self.cache[key] except KeyError: pass if emailID is None: # Получение полного списка писем. emails = yield self.query( 'SELECT id, recipient, sent FROM emails;') else: # Получение определенного электронного адреса. emails = yield self.query( 'SELECT id, recipient, sent FROM emails WHERE id=?', (emailID,) ) if not emails: raise NotFound emails = emails_to_json(emails) if emailID is not None: emails = emails[0] response = self.response(request, emails=emails) self.cache[key] = response return response if __name__ == '__main__': emailservice = EmailService() emailservice.app.run('localhost', 7001)
Code language: HTML, XML (xml)

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *