import json
from loguru import logger
from spade.behaviour import State, FSMBehaviour
from simfleet.customer import CustomerStrategyBehaviour
from simfleet.fleetmanager import FleetManagerStrategyBehaviour
from simfleet.helpers import PathRequestException, distance_in_meters
from simfleet.protocol import (
REQUEST_PERFORMATIVE,
ACCEPT_PERFORMATIVE,
REQUEST_PROTOCOL,
INFORM_PERFORMATIVE,
CANCEL_PERFORMATIVE,
PROPOSE_PERFORMATIVE,
QUERY_PROTOCOL,
REFUSE_PERFORMATIVE,
)
from simfleet.transport import TransportStrategyBehaviour
from simfleet.utils import (
TRANSPORT_WAITING,
TRANSPORT_WAITING_FOR_APPROVAL,
TRANSPORT_MOVING_TO_CUSTOMER,
TRANSPORT_NEEDS_CHARGING,
TRANSPORT_MOVING_TO_STATION,
TRANSPORT_IN_STATION_PLACE,
TRANSPORT_CHARGING,
TRANSPORT_CHARGED,
CUSTOMER_WAITING,
CUSTOMER_ASSIGNED,
)
################################################################
# #
# FleetManager Strategy #
# #
################################################################
[docs]class DelegateRequestBehaviour(FleetManagerStrategyBehaviour):
"""
The default strategy for the FleetManager agent. By default it delegates all requests to all transports.
"""
[docs] async def run(self):
if not self.agent.registration:
await self.send_registration()
msg = await self.receive(timeout=5)
logger.debug("Manager received message: {}".format(msg))
if msg:
for transport in self.get_transport_agents().values():
msg.to = str(transport["jid"])
logger.debug(
"Manager sent request to transport {}".format(transport["name"])
)
await self.send(msg)
################################################################
# #
# Transport Strategy #
# #
################################################################
[docs]class TransportWaitingState(TransportStrategyBehaviour, State):
[docs] async def on_start(self):
await super().on_start()
self.agent.status = TRANSPORT_WAITING
logger.debug("{} in Transport Waiting State".format(self.agent.jid))
[docs] async def run(self):
msg = await self.receive(timeout=60)
if not msg:
self.set_next_state(TRANSPORT_WAITING)
return
logger.debug("Transport {} received: {}".format(self.agent.jid, msg.body))
content = json.loads(msg.body)
performative = msg.get_metadata("performative")
if performative == REQUEST_PERFORMATIVE:
if not self.has_enough_autonomy(content["origin"], content["dest"]):
await self.cancel_proposal(content["customer_id"])
self.set_next_state(TRANSPORT_NEEDS_CHARGING)
return
else:
await self.send_proposal(content["customer_id"], {})
self.set_next_state(TRANSPORT_WAITING_FOR_APPROVAL)
return
else:
self.set_next_state(TRANSPORT_WAITING)
return
[docs]class TransportNeedsChargingState(TransportStrategyBehaviour, State):
[docs] async def on_start(self):
await super().on_start()
self.agent.status = TRANSPORT_NEEDS_CHARGING
logger.debug("{} in Transport Needs Charging State".format(self.agent.jid))
[docs] async def run(self):
if (
self.agent.stations is None
or len(self.agent.stations) < 1
and not self.get(name="stations_requested")
):
logger.info("Transport {} looking for a station.".format(self.agent.name))
self.set(name="stations_requested", value=True)
await self.send_get_stations()
msg = await self.receive(timeout=600)
if not msg:
self.set_next_state(TRANSPORT_NEEDS_CHARGING)
return
logger.debug("Transport received message: {}".format(msg))
try:
content = json.loads(msg.body)
except TypeError:
content = {}
performative = msg.get_metadata("performative")
protocol = msg.get_metadata("protocol")
if protocol == QUERY_PROTOCOL:
if performative == INFORM_PERFORMATIVE:
self.agent.stations = content
logger.info(
"Transport {} got list of current stations: {}".format(
self.agent.name, len(list(self.agent.stations.keys()))
)
)
elif performative == CANCEL_PERFORMATIVE:
logger.info(
"Transport {} got a cancellation of request for stations information.".format(
self.agent.name
)
)
self.set(name="stations_requested", value=False)
self.set_next_state(TRANSPORT_NEEDS_CHARGING)
return
else:
self.set_next_state(TRANSPORT_NEEDS_CHARGING)
return
station_positions = []
for key in self.agent.stations.keys():
dic = self.agent.stations.get(key)
station_positions.append((dic["jid"], dic["position"]))
closest_station = min(
station_positions,
key=lambda x: distance_in_meters(x[1], self.agent.get_position()),
)
logger.debug("Closest station {}".format(closest_station))
station = closest_station[0]
self.agent.current_station_dest = (
station,
self.agent.stations[station]["position"],
)
logger.info(
"Transport {} selected station {}.".format(self.agent.name, station)
)
try:
station, position = self.agent.current_station_dest
await self.go_to_the_station(station, position)
self.set_next_state(TRANSPORT_MOVING_TO_STATION)
return
except PathRequestException:
logger.error(
"Transport {} could not get a path to station {}. Cancelling...".format(
self.agent.name, station
)
)
await self.cancel_proposal(station)
self.set_next_state(TRANSPORT_WAITING)
return
except Exception as e:
logger.error(
"Unexpected error in transport {}: {}".format(self.agent.name, e)
)
self.set_next_state(TRANSPORT_WAITING)
return
[docs]class TransportMovingToStationState(TransportStrategyBehaviour, State):
[docs] async def on_start(self):
await super().on_start()
self.agent.status = TRANSPORT_MOVING_TO_STATION
logger.debug("{} in Transport Moving to Station".format(self.agent.jid))
[docs] async def run(self):
if self.agent.get("in_station_place"):
logger.warning(
"Transport {} already in station place".format(self.agent.jid)
)
await self.agent.request_access_station()
return self.set_next_state(TRANSPORT_IN_STATION_PLACE)
self.agent.transport_in_station_place_event.clear() # new
self.agent.watch_value(
"in_station_place", self.agent.transport_in_station_place_callback
)
await self.agent.transport_in_station_place_event.wait()
await self.agent.request_access_station() # new
return self.set_next_state(TRANSPORT_IN_STATION_PLACE)
[docs]class TransportInStationState(TransportStrategyBehaviour, State):
# car arrives to the station and waits in queue until receiving confirmation
[docs] async def on_start(self):
await super().on_start()
logger.debug("{} in Transport In Station Place State".format(self.agent.jid))
self.agent.status = TRANSPORT_IN_STATION_PLACE
[docs] async def run(self):
msg = await self.receive(timeout=60)
if not msg:
self.set_next_state(TRANSPORT_IN_STATION_PLACE)
return
content = json.loads(msg.body)
performative = msg.get_metadata("performative")
if performative == ACCEPT_PERFORMATIVE:
if content.get("station_id") is not None:
logger.debug(
"Transport {} received a message with ACCEPT_PERFORMATIVE from {}".format(
self.agent.name, content["station_id"]
)
)
await self.charge_allowed()
self.set_next_state(TRANSPORT_CHARGING)
return
else:
# if the message I receive is not an ACCEPT, I keep waiting in the queue
self.set_next_state(TRANSPORT_IN_STATION_PLACE)
return
[docs]class TransportChargingState(TransportStrategyBehaviour, State):
# car charges in a station
[docs] async def on_start(self):
await super().on_start()
logger.debug("{} in Transport Charging State".format(self.agent.jid))
[docs] async def run(self):
# await "transport_charged" message
msg = await self.receive(timeout=60)
if not msg:
self.set_next_state(TRANSPORT_CHARGING)
return
content = json.loads(msg.body)
protocol = msg.get_metadata("protocol")
performative = msg.get_metadata("performative")
if protocol == REQUEST_PROTOCOL and performative == INFORM_PERFORMATIVE:
if content["status"] == TRANSPORT_CHARGED:
self.agent.transport_charged()
await self.agent.drop_station()
# canviar per un event?
self.set_next_state(TRANSPORT_WAITING)
return
else:
self.set_next_state(TRANSPORT_CHARGING)
return
[docs]class TransportWaitingForApprovalState(TransportStrategyBehaviour, State):
[docs] async def on_start(self):
await super().on_start()
self.agent.status = TRANSPORT_WAITING_FOR_APPROVAL
logger.debug(
"{} in Transport Waiting For Approval State".format(self.agent.jid)
)
[docs] async def run(self):
msg = await self.receive(timeout=60)
if not msg:
self.set_next_state(TRANSPORT_WAITING_FOR_APPROVAL)
return
content = json.loads(msg.body)
performative = msg.get_metadata("performative")
if performative == ACCEPT_PERFORMATIVE:
try:
logger.debug(
"Transport {} got accept from {}".format(
self.agent.name, content["customer_id"]
)
)
# new version
self.agent.status = TRANSPORT_MOVING_TO_CUSTOMER
if not self.check_and_decrease_autonomy(
content["origin"], content["dest"]
):
await self.cancel_proposal(content["customer_id"])
self.set_next_state(TRANSPORT_NEEDS_CHARGING)
return
else:
await self.pick_up_customer(
content["customer_id"], content["origin"], content["dest"]
)
self.set_next_state(TRANSPORT_MOVING_TO_CUSTOMER)
return
except PathRequestException:
logger.error(
"Transport {} could not get a path to customer {}. Cancelling...".format(
self.agent.name, content["customer_id"]
)
)
await self.cancel_proposal(content["customer_id"])
self.set_next_state(TRANSPORT_WAITING)
return
except Exception as e:
logger.error(
"Unexpected error in transport {}: {}".format(self.agent.name, e)
)
await self.cancel_proposal(content["customer_id"])
self.set_next_state(TRANSPORT_WAITING)
return
elif performative == REFUSE_PERFORMATIVE:
logger.debug(
"Transport {} got refusal from customer/station".format(self.agent.name)
)
self.set_next_state(TRANSPORT_WAITING)
return
else:
self.set_next_state(TRANSPORT_WAITING_FOR_APPROVAL)
return
[docs]class TransportMovingToCustomerState(TransportStrategyBehaviour, State):
[docs] async def on_start(self):
await super().on_start()
self.agent.status = TRANSPORT_MOVING_TO_CUSTOMER
logger.debug("{} in Transport Moving To Customer State".format(self.agent.jid))
[docs] async def run(self):
# Reset internal flag to False. coroutines calling
# wait() will block until set() is called
self.agent.customer_in_transport_event.clear()
# Registers an observer callback to be run when the "customer_in_transport" is changed
self.agent.watch_value(
"customer_in_transport", self.agent.customer_in_transport_callback
)
# block behaviour until another coroutine calls set()
await self.agent.customer_in_transport_event.wait()
return self.set_next_state(TRANSPORT_WAITING)
[docs]class FSMTransportStrategyBehaviour(FSMBehaviour):
[docs] def setup(self):
# Create states
self.add_state(TRANSPORT_WAITING, TransportWaitingState(), initial=True)
self.add_state(TRANSPORT_NEEDS_CHARGING, TransportNeedsChargingState())
self.add_state(
TRANSPORT_WAITING_FOR_APPROVAL, TransportWaitingForApprovalState()
)
self.add_state(TRANSPORT_MOVING_TO_CUSTOMER, TransportMovingToCustomerState())
self.add_state(TRANSPORT_MOVING_TO_STATION, TransportMovingToStationState())
self.add_state(TRANSPORT_IN_STATION_PLACE, TransportInStationState())
self.add_state(TRANSPORT_CHARGING, TransportChargingState())
# Create transitions
self.add_transition(
TRANSPORT_WAITING, TRANSPORT_WAITING
) # waiting for messages
self.add_transition(
TRANSPORT_WAITING, TRANSPORT_WAITING_FOR_APPROVAL
) # accepted by customer
self.add_transition(
TRANSPORT_WAITING, TRANSPORT_NEEDS_CHARGING
) # not enough charge
self.add_transition(
TRANSPORT_WAITING_FOR_APPROVAL, TRANSPORT_WAITING_FOR_APPROVAL
) # waiting for approval message
self.add_transition(
TRANSPORT_WAITING_FOR_APPROVAL, TRANSPORT_WAITING
) # transport refused
self.add_transition(
TRANSPORT_WAITING_FOR_APPROVAL, TRANSPORT_MOVING_TO_CUSTOMER
) # going to pick up customer
self.add_transition(
TRANSPORT_NEEDS_CHARGING, TRANSPORT_NEEDS_CHARGING
) # waiting for station list
self.add_transition(
TRANSPORT_NEEDS_CHARGING, TRANSPORT_MOVING_TO_STATION
) # going to station
self.add_transition(
TRANSPORT_NEEDS_CHARGING, TRANSPORT_WAITING
) # exception in go_to_the_station(station, position)
self.add_transition(
TRANSPORT_MOVING_TO_STATION, TRANSPORT_IN_STATION_PLACE
) # arrived to station
self.add_transition(
TRANSPORT_IN_STATION_PLACE, TRANSPORT_IN_STATION_PLACE
) # waiting in station queue
self.add_transition(
TRANSPORT_IN_STATION_PLACE, TRANSPORT_CHARGING
) # begin charging
self.add_transition(
TRANSPORT_CHARGING, TRANSPORT_CHARGING
) # waiting to finish charging
self.add_transition(TRANSPORT_CHARGING, TRANSPORT_WAITING) # restart strategy
self.add_transition(TRANSPORT_MOVING_TO_CUSTOMER, TRANSPORT_MOVING_TO_CUSTOMER)
self.add_transition(
TRANSPORT_MOVING_TO_CUSTOMER, TRANSPORT_WAITING
) # picked up customer or arrived to destination ??
################################################################
# #
# Customer Strategy #
# #
################################################################
[docs]class AcceptFirstRequestBehaviour(CustomerStrategyBehaviour):
"""
The default strategy for the Customer agent. By default it accepts the first proposal it receives.
"""
[docs] async def run(self):
if self.agent.fleetmanagers is None:
await self.send_get_managers(self.agent.fleet_type)
msg = await self.receive(timeout=300)
if msg:
protocol = msg.get_metadata("protocol")
if protocol == QUERY_PROTOCOL:
performative = msg.get_metadata("performative")
if performative == INFORM_PERFORMATIVE:
self.agent.fleetmanagers = json.loads(msg.body)
logger.info(
"{} got fleet managers {}".format(
self.agent.name, self.agent.fleetmanagers
)
)
elif performative == CANCEL_PERFORMATIVE:
logger.info(
"{} got cancellation of request for {} information".format(
self.agent.name, self.agent.type_service
)
)
return
if self.agent.status == CUSTOMER_WAITING:
await self.send_request(content={})
msg = await self.receive(timeout=5)
if msg:
performative = msg.get_metadata("performative")
transport_id = msg.sender
if performative == PROPOSE_PERFORMATIVE:
if self.agent.status == CUSTOMER_WAITING:
logger.debug(
"Customer {} received proposal from transport {}".format(
self.agent.name, transport_id
)
)
await self.accept_transport(transport_id)
self.agent.status = CUSTOMER_ASSIGNED
else:
await self.refuse_transport(transport_id)
elif performative == CANCEL_PERFORMATIVE:
if self.agent.transport_assigned == str(transport_id):
logger.warning(
"Customer {} received a CANCEL from Transport {}.".format(
self.agent.name, transport_id
)
)
self.agent.status = CUSTOMER_WAITING