Source code for simfleet.utils

import asyncio
import json
import os
import socket
import sys
import time
import uuid
from abc import ABCMeta
from importlib import import_module

import aiohttp
from loguru import logger
from spade.behaviour import CyclicBehaviour, OneShotBehaviour
from spade.message import Message
from spade.template import Template

from .helpers import distance_in_meters, kmh_to_ms

TRANSPORT_WAITING = "TRANSPORT_WAITING"
TRANSPORT_MOVING_TO_CUSTOMER = "TRANSPORT_MOVING_TO_CUSTOMER"
TRANSPORT_IN_CUSTOMER_PLACE = "TRANSPORT_IN_CUSTOMER_PLACE"
TRANSPORT_MOVING_TO_DESTINATION = "TRANSPORT_MOVING_TO_DESTINATION"
TRANSPORT_WAITING_FOR_APPROVAL = "TRANSPORT_WAITING_FOR_APPROVAL"

TRANSPORT_MOVING_TO_STATION = "TRANSPORT_MOVING_TO_STATION"
TRANSPORT_IN_STATION_PLACE = "TRANSPORT_IN_STATION_PLACE"
TRANSPORT_WAITING_FOR_STATION_APPROVAL = "TRANSPORT_WAITING_FOR_STATION_APPROVAL"
TRANSPORT_NEEDS_CHARGING = "TRANSPORT_NEEDS_CHARGING"
TRANSPORT_CHARGING = "TRANSPORT_LOADING"
TRANSPORT_CHARGED = "TRANSPORT_LOADED"

FREE_STATION = "FREE_STATION"
BUSY_STATION = "BUSY_STATION"

CUSTOMER_WAITING = "CUSTOMER_WAITING"
CUSTOMER_IN_TRANSPORT = "CUSTOMER_IN_TRANSPORT"
CUSTOMER_IN_DEST = "CUSTOMER_IN_DEST"
CUSTOMER_LOCATION = "CUSTOMER_LOCATION"
CUSTOMER_ASSIGNED = "CUSTOMER_ASSIGNED"


[docs]def status_to_str(status_code): """ Translates an int status code to a string that represents the status Args: status_code (int): the code of the status Returns: str: the string that represents the status """ statuses = { 10: "TRANSPORT_WAITING", 11: "TRANSPORT_MOVING_TO_CUSTOMER", 12: "TRANSPORT_IN_CUSTOMER_PLACE", 13: "TRANSPORT_MOVING_TO_DESTINATION", 14: "TRANSPORT_WAITING_FOR_APPROVAL", 15: "TRANSPORT_MOVING_TO_STATION", 16: "TRANSPORT_IN_STATION_PLACE", 17: "TRANSPORT_WAITING_FOR_STATION_APPROVAL", 18: "TRANSPORT_LOADING", 19: "TRANSPORT_LOADED", 20: "CUSTOMER_WAITING", 21: "CUSTOMER_IN_TRANSPORT", 22: "CUSTOMER_IN_DESTINATION", 23: "CUSTOMER_LOCATION", 24: "CUSTOMER_ASSIGNED", 30: "FREE_STATION", 31: "BUSY_STATION", } if status_code in statuses: return statuses[status_code] return status_code
[docs]class StrategyBehaviour(CyclicBehaviour, metaclass=ABCMeta): """ The behaviour that all parent strategies must inherit from. It complies with the Strategy Pattern. """ pass
[docs]class RequestRouteBehaviour(OneShotBehaviour): """ A one-shot behaviour that is executed to request for a new route to the route agent. """ def __init__(self, msg: Message, origin: list, destination: list, route_host: str): """ Behaviour to request a route to a route agent Args: msg (Message): the message to be sent origin (list): origin of the route destination (list): destination of the route route_host (str): name of the route host server """ self.origin = origin self.destination = destination self._msg = msg self.route_host = route_host self.result = {"path": None, "distance": None, "duration": None} super().__init__()
[docs] async def run(self): try: response_time = time.time() path, distance, duration = await request_route_to_server( self.origin, self.destination, self.route_host ) response_time = time.time() - response_time if path is None: logger.error( "There was an unknown error requesting the route. Response time={}".format( response_time ) ) self.exit_code = {"type": "error"} self.kill() return logger.debug("Got route in response time={}".format(response_time)) reply_content = { "path": path, "distance": distance, "duration": duration, "type": "success", } self.kill(json.loads(json.dumps(reply_content))) except Exception as e: response_time = time.time() - response_time logger.error( "Exception requesting route, response time={}, error: {} ".format( response_time, e ) )
[docs]async def request_path(agent, origin, destination, route_host): """ Sends a message to the RouteAgent to request a path Args: agent: the agent who is requesting the path origin (list): a list with the origin coordinates [longitude, latitude] destination (list): a list with the target coordinates [longitude, latitude] route_host (str): name of the route host server Returns: list, float, float: a list of points (longitude and latitude) representing the path, the distance of the path in meters, a estimation of the duration of the path Examples: >>> path, distance, duration = request_path(agent, origin=[0,0], destination=[1,1]) >>> print(path) [[0,0], [0,1], [1,1]] >>> print(distance) 2.0 >>> print(duration) 3.24 """ if origin[0] == destination[0] and origin[1] == destination[1]: return [[origin[1], origin[0]]], 0, 0 msg = Message() msg.thread = str(uuid.uuid4()).replace("-", "") template = Template() template.thread = msg.thread behav = RequestRouteBehaviour(msg, origin, destination, route_host) agent.add_behaviour(behav, template) while not behav.is_killed(): await asyncio.sleep(0.01) if ( behav.exit_code is {} or "type" in behav.exit_code and behav.exit_code["type"] == "error" ): return None, None, None else: return ( behav.exit_code["path"], behav.exit_code["distance"], behav.exit_code["duration"], )
[docs]def unused_port(hostname): """Return a port that is unused on the current host.""" s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((hostname, 0)) port = s.getsockname()[1] s.close() return port
[docs]def chunk_path(path, speed_in_kmh): """ Splits the path into smaller chunks taking into account the speed. Args: path (list): the original path. A list of points (lon, lat) speed_in_kmh (float): the speed in km per hour at which the path is being traveled. Returns: list: a new path equivalent (to the first one), that has at least the same number of points. """ meters_per_second = kmh_to_ms(speed_in_kmh) length = len(path) chunked_lat_lngs = [] for i in range(1, length): _cur = path[i - 1] _next = path[i] if _cur == _next: continue distance = distance_in_meters(_cur, _next) factor = meters_per_second / distance if distance else 0 diff_lat = factor * (_next[0] - _cur[0]) diff_lng = factor * (_next[1] - _cur[1]) if distance > meters_per_second: while distance > meters_per_second: _cur = [_cur[0] + diff_lat, _cur[1] + diff_lng] distance = distance_in_meters(_cur, _next) chunked_lat_lngs.append(_cur) else: chunked_lat_lngs.append(_cur) chunked_lat_lngs.append(path[length - 1]) return chunked_lat_lngs
[docs]def load_class(class_path): """ Tricky method that imports a class form a string. Args: class_path (str): the path where the class to be imported is. Returns: class: the class imported and ready to be instantiated. """ sys.path.append(os.getcwd()) module_path, class_name = class_path.rsplit(".", 1) mod = import_module(module_path) return getattr(mod, class_name)
[docs]def avg(array): """ Makes the average of an array without Nones. Args: array (list): a list of floats and Nones Returns: float: the average of the list without the Nones. """ array_wo_nones = list(filter(None, array)) return ( (sum(array_wo_nones, 0.0) / len(array_wo_nones)) if len(array_wo_nones) > 0 else 0.0 )
[docs]async def request_route_to_server( origin, destination, route_host="http://router.project-osrm.org/" ): """ Queries the OSRM for a path. Args: origin (list): origin coordinate (longitude, latitude) destination (list): target coordinate (longitude, latitude) route_host (string): route to host server of OSRM service Returns: list, float, float = the path, the distance of the path and the estimated duration """ try: url = ( route_host + "route/v1/car/{src1},{src2};{dest1},{dest2}?geometries=geojson&overview=full" ) src1, src2, dest1, dest2 = origin[1], origin[0], destination[1], destination[0] url = url.format(src1=src1, src2=src2, dest1=dest1, dest2=dest2) async with aiohttp.ClientSession() as session: async with session.get(url) as response: result = await response.json() path = result["routes"][0]["geometry"]["coordinates"] path = [[point[1], point[0]] for point in path] duration = result["routes"][0]["duration"] distance = result["routes"][0]["distance"] if path[-1] != destination: path.append(destination) return path, distance, duration except Exception as e: return None, None, None