|     IEEE Xplore Digital Library     |     IEEE Standards     |     IEEE Spectrum     |     More Sites

Verified Commit 2947426f authored by Emi Simpson's avatar Emi Simpson
Browse files

Made coordinator connection async

parent 07e98927
Pipeline #239 passed with stages
in 1 minute and 54 seconds
......@@ -29,6 +29,7 @@ from mystic.coordination import CoordinatorConnection
from os import path, getcwd
from elasticsearch.client import Elasticsearch
import asyncio
import pymysql
from pymysql.connections import Connection
from pymysql.cursors import Cursor
......@@ -68,10 +69,10 @@ def get_database() -> Connection[Cursor]:
g.database = db
return db
def connect_coordinator() -> CoordinatorConnection:
async def connect_coordinator() -> CoordinatorConnection:
coord: Optional[CoordinatorConnection] = g.get('coordinator', None)
if coord is None:
coord = CoordinatorConnection.connect(current_app.config['COORDINATOR_SOCKET'])
coord = await CoordinatorConnection.connect(current_app.config['COORDINATOR_SOCKET'])
g.coord = coord
return coord
......@@ -166,7 +167,7 @@ def create_app(test_config: Optional[Dict[str, Any]] = None) -> Flask:
def set_up_error_flagging() -> None:
db_config: Dict[str, str] = current_app.config['DATABASE']
coordinator = connect_coordinator()
coordinator: str = current_app.config['COORDINATOR_SOCKET']
projects_json: str = current_app.config['PROJECTS_FILE']
def run_daemon() -> None:
db = pymysql.connect(
from asyncio.streams import StreamReader, StreamWriter
from pymysql.cursors import Cursor
from mystic.sources import Source
from typing import Any, Callable, Generator, List, NamedTuple, Optional, TypeVar, TypedDict, Union
from typing import Any, AsyncGenerator, Callable, List, NamedTuple, Optional, Tuple, TypeVar, TypedDict, Union
from mystic.database import Project
from socket import socket, AF_UNIX, SOCK_STREAM
from threading import BoundedSemaphore
from asyncio import BoundedSemaphore, open_unix_connection
import msgpack
class CoordinatorConnection:
......@@ -17,14 +17,14 @@ class CoordinatorConnection:
Should not be instantiated with the constructor. Instead, use :func:`connect` to
aquire an instance.
connection: socket
connection: Tuple[StreamReader, StreamWriter]
msg_count: int
receive_lock: BoundedSemaphore
unpacker: msgpack.Unpacker
inbox: List['RecievedMessage']
def connect(socket_addr: str) -> 'CoordinatorConnection':
async def connect(socket_addr: str) -> 'CoordinatorConnection':
Connect to an address, and return the new connection
......@@ -34,8 +34,7 @@ class CoordinatorConnection:
The passed address should be a filesystem path to an existing unix socket.
sock = socket(AF_UNIX, SOCK_STREAM)
sock = await open_unix_connection(socket_addr)
conn = CoordinatorConnection()
conn.connection = sock
conn.msg_count = 0
......@@ -44,7 +43,7 @@ class CoordinatorConnection:
conn.inbox = list()
return conn
def post_job(
async def post_job(
project: Project,
desc: str,
......@@ -78,9 +77,10 @@ class CoordinatorConnection:
msg_packed: Optional[bytes] = msgpack.packb(msg)
assert msg_packed is not None
self.msg_count += 1
await self.connection[1].drain()
def rescan_source(
async def rescan_source(
source: Source,
priority: int = 20,
......@@ -107,9 +107,9 @@ class CoordinatorConnection:
for source in source.expand_source()
self.post_job(project, desc, priority, backends)
await self.post_job(project, desc, priority, backends)
def scan_project(
async def scan_project(
project: Project,
c: Cursor,
......@@ -134,9 +134,9 @@ class CoordinatorConnection:
for metasource in project.get_data_sources(c)
for source in metasource.expand_source()
self.post_job(project, desc, priority, backends)
await self.post_job(project, desc, priority, backends)
def jobs_for_project(
async def jobs_for_project(
project: Project,
) -> List['JobDict']:
......@@ -160,19 +160,18 @@ class CoordinatorConnection:
msg_packed: Optional[bytes] = msgpack.packb(msg)
assert msg_packed is not None
self.msg_count += 1
await self.connection[1].drain()
def matching_response(msg: 'RecievedMessage') -> Optional[Response]:
if isinstance(msg, Response):
if msg.msg_id == self.msg_count - 1:
return msg
return None
response = next(self._recieve_specific_messages(matching_response))
assert response is not None, "Connection to coordinator closed unexpectedly"
response = await self._recieve_specific_messages(matching_response).__anext__()
return response.response
def subscribe_to_notification(self, method_name: str) -> Generator[List[Any], None, None]:
def subscribe_to_notification(self, method_name: str) -> AsyncGenerator[List[Any], None]:
Produces a generator returning any notification type messages to the given method
......@@ -190,13 +189,14 @@ class CoordinatorConnection:
return msg
return None
return (
for n
async for n
in self._recieve_specific_messages(is_matching_notification)
def _receive_single_message(self, already_has_lock: bool = False) -> Optional['RecievedMessage']:
async def _receive_single_message(self, already_has_lock: bool = False) -> Optional['RecievedMessage']:
Blocks until a single message is read from the connection
......@@ -214,9 +214,9 @@ class CoordinatorConnection:
Returns `None` if the stream closes before a full object is read.
if not already_has_lock:
await self.receive_lock.acquire()
while True:
buf: bytes = self.connection.recv(1024**2)
buf: bytes = await self.connection[0].read(1024*8)
if not buf:
if not already_has_lock:
......@@ -233,10 +233,10 @@ class CoordinatorConnection:
raise AssertionError(f"Invalid msgtype recieved from coordinator: {resp[0]}")
M = TypeVar('M', 'Notification', 'Response')
def _recieve_specific_messages(
async def _recieve_specific_messages(
check: Callable[['RecievedMessage'], Optional[M]]
) -> Generator[M, None, None]:
) -> AsyncGenerator[M, None]:
Repeatedly call :func:`_recieve_single_message` until a matching message is found.
......@@ -254,7 +254,7 @@ class CoordinatorConnection:
non-exhaustively call the generator in order to recieve only a single message.
while True:
await self.receive_lock.acquire()
# While we were waiting for the lock, something might've been recieved
for i in range(len(self.inbox)):
......@@ -265,12 +265,12 @@ class CoordinatorConnection:
yield msg
msg = self._receive_single_message(already_has_lock = True)
msg = await self._receive_single_message(already_has_lock = True)
# If the message is none, the connection closed
if msg is None:
return None
msgcheck = check(msg)
if msgcheck is not None:
import asyncio
import logging
from typing import NoReturn
from pymysql.connections import Connection
......@@ -5,9 +6,10 @@ from .coordination import CoordinatorConnection
from . import database, config
def flag_errors_daemon(connection: CoordinatorConnection, conn: Connection, projects_json: str) -> NoReturn:
async def async_flag_errors_daemon(coordinator_socket: str, conn: Connection, projects_json: str) -> NoReturn:
c = conn.cursor()
for notification in connection.subscribe_to_notification('repository_error'):
connection = await CoordinatorConnection.connect(coordinator_socket)
async for notification in connection.subscribe_to_notification('repository_error'):
assert len(notification) == 2, "Incorrect number of arguments passed to 'repository_error' by coordinator"
backend = notification[0]
repo = notification[1]
......@@ -19,3 +21,7 @@ def flag_errors_daemon(connection: CoordinatorConnection, conn: Connection, proj
config.save_projects_json(c, path = projects_json)
raise AssertionError('Connection to coordinator closed, no longer recieving error notifications')
def flag_errors_daemon(coordinator_socket: str, conn: Connection, projects_json: str) -> NoReturn:, conn, projects_json))
raise AssertionError('Unreachable')
......@@ -12,6 +12,7 @@ receiving an invalid request, or handing processing off to another module.
This module also includes methods like `get_user` and `get_project`, which can be used to
determine the active user/project.
import asyncio
from mystic.coordination import JobDict
from import QueryMachine
from import DASHBOARDS, Dashboard
......@@ -24,7 +25,7 @@ from mystic.sources import SOURCE_PROCESSORS
from flask import request, session, g
from mystic.database import MalformedId, Project, User
from mystic.config import connect_coordinator, service_provider, save_projects_json
from typing import Callable, Dict, List, Optional, cast
from typing import Any, Callable, Coroutine, Dict, List, Optional, cast
class ErrorTuple(Exception):
......@@ -165,7 +166,9 @@ def subaction_add_source(cursor: Cursor) -> None:
if not project.is_draft(cursor):
async def send_scan() -> None:
await (await connect_coordinator()).rescan_source(source)
def subaction_add_owner(cursor: Cursor) -> None:
......@@ -292,7 +295,9 @@ def subaction_edit_properties(cursor: Cursor) -> None:
project.finalize_draft(cursor, name, desc)
connect_coordinator().scan_project(project, cursor)
async def send_scan() -> None:
await (await connect_coordinator()).scan_project(project, cursor)
needs_update = False
if 'name' in request.form:
......@@ -396,5 +401,7 @@ def get_analytics(c: Cursor, es: Elasticsearch, project: Project) -> List[Dashbo
return cast(List[Dashboard], dashboards)
def get_pending_jobs(p: Project) -> List[JobDict]:
coordinator = connect_coordinator()
return coordinator.jobs_for_project(p)
async def get_jobs() -> List[JobDict]:
coordinator = await connect_coordinator()
return await coordinator.jobs_for_project(p)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment