Source code for simfleet.station

import datetime
import json
import time
from asyncio import CancelledError

from loguru import logger
from spade.agent import Agent
from spade.behaviour import TimeoutBehaviour
from spade.message import Message
from spade.template import Template

from .helpers import random_position
from .protocol import (
    REQUEST_PROTOCOL,
    REGISTER_PROTOCOL,
    ACCEPT_PERFORMATIVE,
    REFUSE_PERFORMATIVE,
    REQUEST_PERFORMATIVE,
    TRAVEL_PROTOCOL,
    CANCEL_PERFORMATIVE,
    INFORM_PERFORMATIVE,
)
from .utils import (
    StrategyBehaviour,
    CyclicBehaviour,
    FREE_STATION,
    BUSY_STATION,
    TRANSPORT_MOVING_TO_STATION,
    TRANSPORT_IN_STATION_PLACE,
    TRANSPORT_CHARGED,
)


[docs]class StationAgent(Agent): def __init__(self, agentjid, password): super().__init__(jid=agentjid, password=password) self.agent_id = None self.icon = None self.strategy = None self.running_strategy = False self.directory_id = None self.registration = False self.station_name = None self.station_type = None self.current_pos = None self.available_places = None self.status = None self.power = None self.stopped = False self.ready = False # waiting waiting_list self.waiting_list = list() # statistics self.charged_transports = 0 self.queue_length = 0 self.max_queue_length = 0 self.transports_in_queue_time = None self.empty_queue_time = None self.total_busy_time = None # total time with some transport waiting in queue
[docs] def is_ready(self): return self.ready
[docs] async def setup(self): self.total_busy_time = 0.0 logger.info("Station agent {} running".format(self.name)) self.set_type("station") self.set_status() try: template = Template() template.set_metadata("protocol", REGISTER_PROTOCOL) register_behaviour = RegistrationBehaviour() self.add_behaviour(register_behaviour, template) while not self.has_behaviour(register_behaviour): logger.warning( "Station {} could not create RegisterBehaviour. Retrying...".format( self.agent_id ) ) self.add_behaviour(register_behaviour, template) except Exception as e: logger.error( "EXCEPTION creating RegisterBehaviour in Station {}: {}".format( self.agent_id, e ) ) try: template = Template() template.set_metadata("protocol", TRAVEL_PROTOCOL) travel_behaviour = TravelBehaviour() self.add_behaviour(travel_behaviour, template) while not self.has_behaviour(travel_behaviour): logger.warning( "Customer {} could not create TravelBehaviour. Retrying...".format( self.agent_id ) ) self.add_behaviour(travel_behaviour, template) except Exception as e: logger.error( "EXCEPTION creating TravelBehaviour in Station {}: {}".format( self.agent_id, e ) ) self.ready = True
[docs] async def send(self, msg): if not msg.sender: msg.sender = str(self.jid) logger.debug(f"Adding agent's jid as sender to message: {msg}") aioxmpp_msg = msg.prepare() await self.client.send(aioxmpp_msg) msg.sent = True self.traces.append(msg, category=str(self))
[docs] def set_id(self, agent_id): """ Sets the agent identifier Args: agent_id (str): The new Agent Id """ self.agent_id = agent_id
[docs] def set_icon(self, icon): self.icon = icon
[docs] def run_strategy(self): """ Sets the strategy for the transport agent. """ if not self.running_strategy: template = Template() template.set_metadata("protocol", REQUEST_PROTOCOL) self.add_behaviour(self.strategy(), template) self.running_strategy = True
[docs] def set_registration(self, status): """ Sets the status of registration Args: status (boolean): True if the transport agent has registered or False if not """ self.registration = status
[docs] def set_directory(self, directory_id): """ Sets the directory JID address Args: directory_id (str): the DirectoryAgent jid """ self.directory_id = directory_id
[docs] def set_type(self, station_type): self.station_type = station_type
[docs] def set_position(self, coords=None): """ Sets the position of the station. If no position is provided it is located in a random position. Args: coords (list): a list coordinates (longitude and latitude) """ if coords: self.current_pos = coords else: self.current_pos = random_position() logger.debug( "Station {} position is {}".format(self.agent_id, self.current_pos) )
[docs] def get_position(self): """ Returns the current position of the station. Returns: list: the coordinates of the current position of the customer (lon, lat) """ return self.current_pos
[docs] def set_status(self, state=FREE_STATION): self.status = state
[docs] def get_status(self): return self.status
[docs] def set_available_places(self, places): self.available_places = places
[docs] def get_available_places(self): return self.available_places
[docs] def set_power(self, charge): self.power = charge
[docs] def get_power(self): return self.power
[docs] def to_json(self): """ Serializes the main information of a station agent to a JSON format. It includes the id of the agent, its current position, the destination coordinates of the agent, the current status, the transport that it has assigned (if any) and its waiting time. Returns: dict: a JSON doc with the main information of the station. Example:: { "id": "cphillips", "position": [ 39.461327, -0.361839 ], "status": True, "places": 10, "power": 10 } """ return { "id": self.agent_id, "position": self.current_pos, "status": self.status, "places": self.available_places, "power": self.power, "icon": self.icon, }
[docs] async def assigning_place(self): """ Set a space in the charging station for the transport that has been accepted, when the available spaces are zero, the status will change to BUSY_STATION """ p = self.get_available_places() if p - 1 <= 0: self.set_status(BUSY_STATION) self.set_available_places(p - 1) logger.info( "Station {} assigned place. Available places are now {}.".format( self.name, self.get_available_places() ) )
[docs] async def deassigning_place(self): """ Leave a space of the charging station, when the station has free spaces, the status will change to FREE_STATION """ if self.waiting_list: transport_id = self.waiting_list.pop(0) # time statistics update if len(self.waiting_list) == 0: self.empty_queue_time = time.time() self.total_busy_time += ( self.empty_queue_time - self.transports_in_queue_time ) logger.debug( "Station {} has a place to charge transport {}".format( self.agent_id, transport_id ) ) # confirm EXPLICITLY to transport it can start charging reply = Message() reply.to = str(transport_id) reply.set_metadata("protocol", REQUEST_PROTOCOL) reply.set_metadata("performative", ACCEPT_PERFORMATIVE) content = {"station_id": self.agent_id} reply.body = json.dumps(content) await self.send(reply) # await send_confirmation_to_transport(transport_id) else: p = self.get_available_places() if p + 1: self.set_status(FREE_STATION) self.set_available_places(p + 1)
[docs] async def charging_transport(self, need, transport_id): total_time = need / self.get_power() now = datetime.datetime.now() start_at = now + datetime.timedelta(seconds=total_time) logger.info( "Station {} started charging transport {} for {} seconds. From {} to {}.".format( self.name, transport_id, total_time, now, start_at ) ) # charged transports update self.charged_transports += 1 charge_behaviour = ChargeBehaviour(start_at=start_at, transport_id=transport_id) self.add_behaviour(charge_behaviour)
[docs]class ChargeBehaviour(TimeoutBehaviour): def __init__(self, start_at, transport_id): self.transport_id = transport_id super().__init__(start_at)
[docs] async def charging_complete(self): """ Send a message to the transport agent that the vehicle load has been completed """ reply = Message() reply.to = str(self.transport_id) reply.set_metadata("protocol", REQUEST_PROTOCOL) reply.set_metadata("performative", INFORM_PERFORMATIVE) content = {"status": TRANSPORT_CHARGED} reply.body = json.dumps(content) await self.send(reply)
[docs] async def run(self): logger.debug("Station {} finished charging.".format(self.agent.name)) self.set("current_station", None) await self.agent.deassigning_place() await self.charging_complete()
[docs]class RegistrationBehaviour(CyclicBehaviour):
[docs] async def on_start(self): logger.debug("Strategy {} started in directory".format(type(self).__name__))
[docs] def set_registration(self, decision): self.agent.registration = decision
[docs] async def send_registration(self): """ Send a ``spade.message.Message`` with a proposal to directory to register. """ logger.info( "Station {} sent proposal to register to directory {}".format( self.agent.name, self.agent.directory_id ) ) content = { "jid": str(self.agent.jid), "type": self.agent.station_type, "status": self.agent.status, "position": self.agent.get_position(), "charge": self.agent.power, } msg = Message() msg.to = str(self.agent.directory_id) msg.set_metadata("protocol", REGISTER_PROTOCOL) msg.set_metadata("performative", REQUEST_PERFORMATIVE) msg.body = json.dumps(content) await self.send(msg)
[docs] async def run(self): try: if not self.agent.registration: await self.send_registration() msg = await self.receive(timeout=10) if msg: performative = msg.get_metadata("performative") if performative == ACCEPT_PERFORMATIVE: self.set_registration(True) logger.debug("Registration in the directory") except CancelledError: logger.debug("Cancelling async tasks...") except Exception as e: logger.error( "EXCEPTION in RegisterBehaviour of Station {}: {}".format( self.agent.name, e ) )
[docs]class TravelBehaviour(CyclicBehaviour): """ This is the internal behaviour that manages the inform of the station. It is triggered when the transport informs the station that it is going to the customer's position until the customer is droppped in its destination. """
[docs] async def on_start(self): logger.debug("Station {} started TravelBehavior.".format(self.agent.name))
[docs] async def run(self): try: msg = await self.receive(timeout=5) if not msg: return content = json.loads(msg.body) transport_id = msg.sender logger.debug("Station {} informed of: {}".format(self.agent.name, content)) if "status" in content: status = content["status"] if status == TRANSPORT_MOVING_TO_STATION: logger.info( "Transport {} coming to station {}.".format( transport_id, self.agent.name ) ) elif status == TRANSPORT_IN_STATION_PLACE: logger.info( "Station {} is going to start charging transport {}".format( self.agent.name, transport_id ) ) await self.agent.charging_transport(content["need"], transport_id) except CancelledError: logger.debug("Cancelling async tasks...") except Exception as e: logger.error( "EXCEPTION in Travel Behaviour of Station {}: {}".format( self.agent.name, e ) )
[docs]class StationStrategyBehaviour(StrategyBehaviour): """ Class from which to inherit to create a station strategy. You must overload the :func:`run` method Helper functions: * :func:`get_transport_agents` """
[docs] async def on_start(self): logger.debug("Strategy {} started in station".format(type(self).__name__))
[docs] async def accept_transport(self, transport_id): """ Sends a ``spade.message.Message`` to a transport to accept a travel proposal for charge. It uses the REQUEST_PROTOCOL and the ACCEPT_PERFORMATIVE. Args: transport_id (str): The Agent JID of the transport """ reply = Message() reply.to = str(transport_id) reply.set_metadata("protocol", REQUEST_PROTOCOL) reply.set_metadata("performative", INFORM_PERFORMATIVE) content = {"station_id": str(self.agent.jid), "dest": self.agent.current_pos} reply.body = json.dumps(content) await self.send(reply) logger.debug( "Station {} accepted proposal for charge from transport {}".format( self.agent.name, transport_id ) )
[docs] async def refuse_transport(self, transport_id): """ Sends an ``spade.message.Message`` to a transport to refuse a travel proposal for charge. It uses the REQUEST_PROTOCOL and the REFUSE_PERFORMATIVE. Args: transport_id (str): The Agent JID of the transport """ reply = Message() reply.to = str(transport_id) reply.set_metadata("protocol", REQUEST_PROTOCOL) reply.set_metadata("performative", REFUSE_PERFORMATIVE) content = {} reply.body = json.dumps(content) await self.send(reply) logger.debug( "Station {} refused proposal for charge from transport {}".format( self.agent.name, transport_id ) )
[docs] async def run(self): msg = await self.receive(timeout=5) if msg: performative = msg.get_metadata("performative") transport_id = msg.sender if performative == CANCEL_PERFORMATIVE: logger.warning( "Station {} received a CANCEL from Transport {}.".format( self.agent.name, transport_id ) ) await self.agent.deassigning_place() elif ( performative == ACCEPT_PERFORMATIVE ): # comes from send_confirmation_travel if self.agent.get_status() == FREE_STATION: logger.info( "Station {} has a place to charge transport {}".format( self.agent.name, transport_id ) ) # confirm EXPLICITLY to transport it can start charging reply = Message() reply.to = str(transport_id) reply.set_metadata("protocol", REQUEST_PROTOCOL) reply.set_metadata("performative", ACCEPT_PERFORMATIVE) content = {"station_id": self.agent.name} reply.body = json.dumps(content) await self.send(reply) await self.agent.assigning_place() # self.agent.assigning_place() else: # self.agent.get_status() == BUSY_STATION # time statistics update if len(self.agent.waiting_list) == 0: self.agent.transports_in_queue_time = time.time() # transport waits in a waiting_list until it is available to charge self.agent.waiting_list.append(str(transport_id)) # list length statistics update self.agent.queue_length = len(self.agent.waiting_list) if self.agent.queue_length > self.agent.max_queue_length: self.agent.max_queue_length = self.agent.queue_length logger.info( "{} is waiting at {}, whose waiting list is {}".format( transport_id, self.agent.name, self.agent.waiting_list ) )