Skip to content

Commit 76f72b3

Browse files
authored
Merge pull request #361 from fabric-testbed/leaks
Network Service provisioning failures - interface already in use due to allocation algorithm deficiencies
2 parents 473f396 + 1e67698 commit 76f72b3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+402
-7796
lines changed

Dockerfile-auth

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
FROM python:3.11.0
22
MAINTAINER Komal Thareja<[email protected]>
33

4-
ARG HANDLERS_VER=1.6.1
4+
ARG HANDLERS_VER=1.6.2
55

66
RUN mkdir -p /usr/src/app
77
WORKDIR /usr/src/app
@@ -25,7 +25,6 @@ RUN mkdir -p "/etc/fabric/actor/config"
2525
RUN mkdir -p "/var/log/actor"
2626
RUN cp /usr/local/lib/python3.11/site-packages/fabric_mb/message_bus/schema/*.avsc /etc/fabric/message_bus/schema
2727
RUN pip3 install fabric-am-handlers==${HANDLERS_VER}
28-
RUN pip3 install fabric-message-bus==1.6.1
2928
RUN sh /usr/src/app/install.sh
3029

3130
ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]

fabric_cf/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
__version__ = "1.6.2"
1+
__version__ = "1.6.1"
22
__VERSION__ = __version__

fabric_cf/actor/boot/configuration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ def get_kafka_key_schema_location(self) -> str or None:
460460
return self.global_config.runtime.get(Constants.PROPERTY_CONF_KAFKA_KEY_SCHEMA, None)
461461

462462
def get_kafka_consumer_auto_commit_interval(self) -> int:
463-
return int(self.global_config.runtime.get(Constants.PROPERTY_CONF_KAFKA_AUTO_COMMIT_INTERVAL), 5)
463+
return int(self.global_config.runtime.get(Constants.PROPERTY_CONF_KAFKA_AUTO_COMMIT_INTERVAL, 5))
464464

465465
def get_kafka_consumer_commit_batch_size(self) -> int:
466466
return int(self.global_config.runtime.get(Constants.PROPERTY_CONF_KAFKA_BATCH_SIZE, 1))

fabric_cf/actor/core/apis/abc_actor_mixin.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,14 +554,15 @@ def fail_delegation(self, *, did: str, message: str):
554554
"""
555555

556556
@abstractmethod
557-
def close_by_rid(self, *, rid: ID):
557+
def close_by_rid(self, *, rid: ID, force: bool = False):
558558
"""
559559
Closes the reservation. Note: the reservation must have already been registered with the actor.
560560
This method may involve either a client or a server side action or both. When called on a broker,
561561
this method will only close the broker reservation.
562562
563563
Args:
564564
rid: reservation id
565+
force: force close
565566
Raises:
566567
Exception in case of error
567568
"""

fabric_cf/actor/core/apis/abc_database.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
from abc import abstractmethod, ABC
2929
from datetime import datetime
30-
from typing import TYPE_CHECKING, List, Union
30+
from typing import TYPE_CHECKING, List, Union, Tuple, Dict
3131

3232
from fabric_cf.actor.core.apis.abc_delegation import ABCDelegation
3333
from fabric_cf.actor.core.kernel.slice import SliceTypes
@@ -170,6 +170,17 @@ def get_reservations(self, *, slice_id: ID = None, graph_node_id: str = None, pr
170170
@throws Exception in case of error
171171
"""
172172

173+
@abstractmethod
174+
def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str], component: str = None,
175+
bdf: str = None) -> Dict[str, List[str]]:
176+
"""
177+
Retrieves the components.
178+
179+
@return list of components
180+
181+
@throws Exception in case of error
182+
"""
183+
173184
@abstractmethod
174185
def get_client_reservations(self, *, slice_id: ID = None) -> List[ABCReservationMixin]:
175186
"""

fabric_cf/actor/core/apis/abc_reservation_mixin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ def can_renew(self) -> bool:
282282
"""
283283

284284
@abstractmethod
285-
def close(self):
285+
def close(self, force: bool = False):
286286
"""
287287
Closes the reservation. Locked with the kernel lock.
288288
"""
@@ -359,7 +359,7 @@ def probe_pending(self):
359359
"""
360360

361361
@abstractmethod
362-
def reserve(self, *, policy: ABCPolicy):
362+
def reserve(self, *, policy: ABCPolicy) -> bool:
363363
"""
364364
Reserve resources: ticket() initiate or request, or redeem()
365365
request. New reservation.

fabric_cf/actor/core/core/actor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ def fail_delegation(self, *, did: str, message: str):
154154
def close_delegation(self, *, did: str):
155155
self.wrapper.close_delegation(did=did)
156156

157-
def close_by_rid(self, *, rid: ID):
158-
self.wrapper.close(rid=rid)
157+
def close_by_rid(self, *, rid: ID, force: bool = False):
158+
self.wrapper.close(rid=rid, force=force)
159159

160160
def close(self, *, reservation: ABCReservationMixin):
161161
if reservation is not None:

fabric_cf/actor/core/delegation/broker_delegation.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ def accept_delegation_update(self, *, incoming: ABCDelegation, update_data: Upda
218218
self.delegation_update_satisfies(incoming=incoming, update_data=update_data)
219219
self.absorb_delegation_update(incoming=incoming, update_data=update_data)
220220
except Exception as e:
221+
if incoming.get_graph() is not None:
222+
incoming.get_graph().delete_graph()
221223
success = False
222224
update_data.error(message=str(e))
223225
self.logger.error(traceback.format_exc())

fabric_cf/actor/core/kernel/authority_reservation.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,20 @@ def prepare(self, *, callback: ABCCallbackProxy, logger):
139139

140140
self.state = ReservationStates.Ticketed
141141

142-
def reserve(self, *, policy: ABCPolicy):
142+
def reserve(self, *, policy: ABCPolicy) -> bool:
143143
self.nothing_pending()
144144
self.incoming_request()
145145
if self.is_active():
146-
self.error(err="reservation already holds a lease")
146+
#self.error(err="reservation already holds a lease")
147+
self.logger.warning(f"Reservation: {self.get_reservation_id()} already holds a lease")
148+
return False
147149

148150
self.policy = policy
149151
self.approved = False
150152
self.bid_pending = True
151153
self.pending_recover = False
152154
self.map_and_update(extend=False)
155+
return True
153156

154157
def service_reserve(self):
155158
try:
@@ -210,7 +213,7 @@ def service_modify_lease(self):
210213
self.logger.error("authority failed servicing modifylease e: {}".format(e))
211214
self.fail_notify(message=str(e))
212215

213-
def close(self):
216+
def close(self, force: bool = False):
214217
self.logger.debug("Processing close for #{}".format(self.rid))
215218
self.transition(prefix="external close", state=self.state, pending=ReservationPendingStates.Closing)
216219

@@ -418,8 +421,12 @@ def probe_pending(self):
418421

419422
elif self.pending_state == ReservationPendingStates.Closing:
420423
if self.resources is None or self.resources.is_closed():
421-
self.transition(prefix="close complete", state=ReservationStates.Closed,
422-
pending=ReservationPendingStates.None_)
424+
if self.update_data.failed:
425+
self.transition(prefix="close complete failed", state=ReservationStates.CloseFail,
426+
pending=ReservationPendingStates.None_)
427+
else:
428+
self.transition(prefix="close complete", state=ReservationStates.Closed,
429+
pending=ReservationPendingStates.None_)
423430
self.pending_recover = False
424431
self.generate_update()
425432

@@ -515,7 +522,10 @@ def reap(self):
515522
released = self.resources.collect_released()
516523
if released is not None:
517524
if not released.get_notices().is_empty():
518-
self.update_data.post(event=released.get_notices().get_notice())
525+
notice = released.get_notices().get_notice()
526+
self.update_data.post(event=notice)
527+
if "Exception" in notice:
528+
self.update_data.error(message=notice)
519529
self.policy.release(resources=released)
520530
except Exception as e:
521531
self.logger.error("exception in authority reap e: {}".format(e))

fabric_cf/actor/core/kernel/broker_reservation.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ def prepare(self, *, callback: ABCCallbackProxy, logger):
221221

222222
self.set_dirty()
223223

224-
def reserve(self, *, policy: ABCPolicy):
224+
def reserve(self, *, policy: ABCPolicy) -> bool:
225225
# These handlers may need to be slightly more sophisticated, since a
226226
# client may bid multiple times on a ticket as part of an auction
227227
# protocol: so we may receive a reserve or extend when there is already
@@ -232,12 +232,13 @@ def reserve(self, *, policy: ABCPolicy):
232232
self.pending_state != ReservationPendingStates.Ticketing:
233233
# We do not want to fail the reservation simply log a warning and exit from reserve
234234
self.logger.warning("Duplicate ticket request")
235-
return
235+
return False
236236

237237
self.policy = policy
238238
self.approved = False
239239
self.bid_pending = True
240240
self.map_and_update(ticketed=False)
241+
return True
241242

242243
def service_reserve(self):
243244
# resources is null initially. It becomes non-null once the
@@ -276,13 +277,13 @@ def service_extend_ticket(self):
276277
pending=ReservationPendingStates.None_)
277278
self.generate_update()
278279

279-
def close(self):
280+
def close(self, force: bool = False):
280281
send_notification = False
281282
if self.state == ReservationStates.Nascent or self.pending_state != ReservationPendingStates.None_:
282283
self.logger.warning("Closing a reservation in progress")
283284
send_notification = True
284285

285-
if self.state != ReservationStates.Closed:
286+
if self.state not in [ReservationStates.Closed, ReservationStates.CloseFail] or force:
286287
if self.pending_state == ReservationPendingStates.Priming or \
287288
(self.pending_state == ReservationPendingStates.Ticketing and not self.bid_pending):
288289
# Close in Priming is a special case: when processing the close
@@ -296,7 +297,11 @@ def close(self):
296297
self.logger.debug("closing reservation #{} while in Priming".format(self.rid))
297298
self.closed_in_priming = True
298299

299-
self.transition(prefix="closed", state=ReservationStates.Closed, pending=ReservationPendingStates.None_)
300+
if not self.update_data.is_failed():
301+
self.transition(prefix="closed", state=ReservationStates.Closed, pending=ReservationPendingStates.None_)
302+
else:
303+
self.transition(prefix="closed-failed", state=ReservationStates.CloseFail,
304+
pending=ReservationPendingStates.None_)
300305
self.policy.closed(reservation=self)
301306

302307
if send_notification:
@@ -536,7 +541,16 @@ def set_authority(self, *, authority: ABCAuthorityProxy):
536541
def update_lease(self, *, incoming: ABCReservationMixin, update_data):
537542
self.logger.info(f"Received Update Lease: {incoming} at Broker")
538543
# TODO add any processing if needed
539-
self.logger.info(f"Do Nothing!")
544+
if incoming.get_resources() and incoming.get_resources().get_sliver() and incoming.get_resources().get_sliver().get_reservation_info():
545+
incoming_state = incoming.get_resources().get_sliver().get_reservation_info().reservation_state
546+
else:
547+
incoming_state = None
548+
self.logger.info(f"Update Lease from authority in state: {self.get_state()} "
549+
f"Incoming: {incoming_state}|{incoming.get_notices()} update_data: {update_data}!")
550+
if incoming_state and incoming_state == str(ReservationStates.CloseFail):
551+
self.update_data.absorb(other=update_data)
552+
self.logger.info("Closing a reservation which failed to delete at the authority")
553+
self.close()
540554

541555
def handle_failed_rpc(self, *, failed: FailedRPC):
542556
if failed.get_request_type() == RPCRequestType.UpdateTicket and \

0 commit comments

Comments
 (0)