Module for runtime monitoring of petri nets.
The module provides three kinds of monitors
Functions for modelling the petri net to be monitored can be found in module PetriNetStructure
PlaceTimeMonitor
Type for a PlaceTimeMonitor to check the time since when token remain in a certain place.
Offers the following functions:
Usage
import PetriNetStructure
import PetriNetMonitoring
in: t0: Events[Int]
def petriNet =
addPlace("p0", 5, Some(10), true,
addPlace("p1", 6, Some(8), false,
addPlace("p2", 0, None, true,
addTransition("t0", Set.add(Set.empty,("p0", 1)), Set.add(Set.add(Set.empty,("p1", 1)),("p2", 2)), t0,
emptyNet))))
def tokenCntMonitor = toPlaceTimeMonitor(petriNet)
out tokenCntMonitor.maxTime("p0")
out tokenCntMonitor.avgTime("p0")
out time(period(10)) - tokenCntMonitor.maxTime("p0") as maxTimeP0
The monitor assumes valid inputs for the firing of transitions.
TimeInterval
Type for a time interval where the interval borders are optional. Used to encode the minimal and maximal timestamp when a token passed a specific transition in the net For details see: TokenTrackingMonitor
TokenCntMonitor
Type for a TokenCntMonitor to check the number of tokens in a place and to detect under and overflows.
Offers the following functions:
Usage
import PetriNetStructure
import PetriNetMonitoring
in: t0: Events[Int]
def petriNet =
addPlace("p0", 5, Some(10), true,
addPlace("p1", 6, Some(8), false,
addPlace("p2", 0, None, true,
addTransition("t0", Set.add(Set.empty,("p0", 1)), Set.add(Set.add(Set.empty,("p1", 1)),("p2", 2)), t0,
emptyNet))))
def tokenCntMonitor = toTokenCntMonitor(petriNet)
out tokenCntMonitor.tokenCnt("p0")
out tokenCntMonitor.overflow("p0")
This kind of monitor does not require dynamic data structures.
TokenTrackingMonitor
Type for a TokenTrackingMonitor to check the time tokens need from one place of the network to another one.
The general idea behind this monitor is that all tokens store the timestamp when they were generated by a specific transition in the network. The other transitions then produce tokens that are "melted" together from the incoming tokens and contain a range of possible timestamps, determined by the timestamps of the consumed tokens. With this kind of monitor it is possible to track the time of token flows through the network. Especially in the case of linear networks.
Offers the following functions:
Usage
import PetriNetStructure
import PetriNetMonitoring
in: t0: Events[Int]
def petriNet =
addPlace("p0", 5, Some(10), true,
addPlace("p1", 6, Some(8), false,
addPlace("p2", 0, None, true,
addTransition("t0", Set.add(Set.empty,("p0", 1)), Set.add(Set.add(Set.empty,("p1", 1)),("p2", 2)), t0,
emptyNet))))
def tokenCntMonitor = toTokenTrackingMonitor(petriNet, "t0")
out tokenCntMonitor.minTokenTime("p0")
out tokenCntMonitor.maxTokenTime("p0")
out time(period(10)) - tokenCntMonitor.maxTokenTime("p0") as maxTimeP0
The monitor assumes valid inputs for the firing of transitions.
toPlaceTimeMonitor(net: PetriNet): {avgTime: ( Place) => Events[Float], maxTime: ( Place) => Events[Int], tokenTs: ( Place) => Events[List[Int]]}
Generates a PlaceTimeMonitor for a given Petri net
For a usage example see: PlaceTimeMonitor
def toPlaceTimeMonitor(net: PetriNet): {tokenTs : (Place) => Events[List[Int]], avgTime : (Place) => Events[Float], maxTime : (Place) => Events[Int]} = {
def m : Map[Place, Events[List[Int]]] = Map.fold(
net.places,
Map.empty[Place, Events[List[Int]]],
(mon, placeName, placeAtt) => {
def init = PetriNetUtils.addTimes(List.empty[Int], placeAtt.init, 0)
def trigger = Map.fold(net.transitions, nil[Int],
(str, t, te) => {
Set.fold(
te.conn,
str,
(c, e) => static if (e._1 == placeName) then merge(c, te.trig) else c
)
}
)
def addStreamNum: Events[Int] = Map.fold(net.transitions, const(0, trigger),
(str, t, te) => {
Set.fold(
te.conn,
str,
(c, e) => {
def pl = e._1
def w = e._2
static if (w > 0 && pl == placeName) then {
PetriNetUtils.asyncAdd(c, w * te.trig)
} else {
c
}
}
)
}
)
def addStream: Events[List[Int]] = PetriNetUtils.addTimes(default(nil, List.empty[Int]), addStreamNum, time(trigger))
def subStream: Events[Int] = Map.fold(net.transitions, const(0, trigger),
(str, t, te) => {
Set.fold(
te.conn,
str,
(c, e) => {
def pl = e._1
def w = e._2
static if (w < 0 && pl == placeName) then {
PetriNetUtils.asyncAdd(c, -w * te.trig)
} else {
c
}
}
)
}
)
def lastPlaceStream: Events[List[Int]] = default(last(placeStream2, trigger), init)
def placeStream: Events[List[Int]] = PetriNetUtils.removeNFront(lastPlaceStream, subStream)
def placeStreamApp: Events[List[Int]] = static if placeAtt.fifo then
PetriNetUtils.appendList(placeStream, addStream)
else
PetriNetUtils.prependList(placeStream, addStream)
def placeStream2: Events[List[Int]] = default(placeStreamApp, init)
Map.add(mon, placeName, placeStream2)
})
def tokenTs : (Place) => Events[List[Int]] = (p: Place) => Map.get(m, p)
def avgTime : (Place) => Events[Float] = (p: Place) => PetriNetUtils.listAvg(Map.get(m, p))
def maxTime : (Place) => Events[Int] = (p: Place) => PetriNetUtils.listMin(Map.get(m, p))
{
tokenTs = tokenTs, avgTime = avgTime, maxTime = maxTime
}
}
toTokenCntMonitor(net: PetriNet): {overflow: ( Place) => Events[Bool], tokenCnt: ( Place) => Events[Int], underflow: ( Place) => Events[Bool]}
Generates a TokenCntMonitor for a given Petri net
For a usage example see: TokenCntMonitor
def toTokenCntMonitor(net: PetriNet): {tokenCnt : (Place) => Events[Int], underflow : (Place) => Events[Bool], overflow : (Place) => Events[Bool]} = {
def m : Map[Place, {tokenCnt : Events[Int], underflow: Events[Bool], overflow: Events[Bool]}] = Map.fold(
net.places,
Map.empty[Place, {tokenCnt : Events[Int], underflow: Events[Bool], overflow: Events[Bool]}],
(mon, placeName, placeAtt) => {
def addStream: Events[Int] = Map.fold(net.transitions, nil[Int],
(str, t, te) => {
Set.fold(
te.conn,
str,
(c, e) => {
def pl = e._1
def w = e._2
static if (w > 0 && pl == placeName) then {
PetriNetUtils.asyncAdd(c, w * te.trig)
} else {
c
}
}
)
}
)
def subStream: Events[Int] = Map.fold(net.transitions, nil[Int],
(str, t, te) => {
Set.fold(
te.conn,
str,
(c, e) => {
def pl = e._1
def w = e._2
static if (w < 0 && pl == placeName) then {
PetriNetUtils.asyncAdd(c, w * te.trig)
} else {
c
}
}
)
}
)
def placeStream: Events[Int] = PetriNetUtils.asyncAdd(default(last(placeStream2, merge(subStream,addStream)), placeAtt.init), subStream)
def placeStream2: Events[Int] = default(PetriNetUtils.asyncAdd(placeStream, addStream), placeAtt.init)
def underflow: Events[Bool] = placeStream < 0
def overflow: Events[Bool] = isSome(default(nil, placeAtt.max)) && (placeStream2 > getSome(placeAtt.max))
Map.add(mon, placeName, {tokenCnt = placeStream2, underflow = underflow, overflow = overflow})
}
)
{
tokenCnt = (p: Place) => Map.get(m, p).tokenCnt,
underflow = (p: Place) => Map.get(m, p).underflow,
overflow = (p: Place) => Map.get(m, p).overflow
}
}
toTokenTrackingMonitor(net: PetriNet, stopFrom: Transition)
Generates a TokenTrackingMonitor for a given Petri net and a transition (stopFrom) for which the timestamps of token creation are recorded.
For a usage example see: TokenTrackingMonitor
def toTokenTrackingMonitor(net: PetriNet, stopFrom: Transition) = {
liftable def compute(map: Map[Place, List[TimeInterval]], firings: Map[Transition, Int], ts: Int) = {
def placesGive: Map[Place, Int] = Map.fold(net.places, Map.empty, (m, pn, pa) => {
def sum = Map.fold(net.transitions, 0, (s, tn, ta) => {
if (Map.get(firings, tn) > 0) then {
Set.fold(ta.conn, s, (s, e) => {
def n = e._1
def w = e._2
if n == pn && w < 0 then s - w * Map.get(firings, tn) else s
})
} else {
s
}
})
Map.add(m, pn, sum)
})
def placesGiveToken : Map[Place, (List[TimeInterval], List[TimeInterval])] = Map.fold(net.places, Map.empty, (m, pn, pa) => {
Map.add(m, pn, PetriNetUtils.removeNFrontColl(Map.get(map, pn), Map.get(placesGive, pn)))
})
def placesGiveInterval: Map[Place, TimeInterval] = Map.fold(placesGiveToken, Map.empty, (m, pn, lsts) => {
Map.add(m, pn, List.fold(lsts._2, (None, None), (c, i) => mergeIntervals(c,i)))
})
def transitionsInterval: Map[Transition, TimeInterval] = Map.fold(firings, Map.empty, (m, tn, tf) => {
def int : TimeInterval = Set.fold(Map.get(net.transitions, tn).conn, (None, None), (i, e) => {
def p = e._1
def w = e._2
if w < 0 then mergeIntervals(i, Map.get(placesGiveInterval, p)) else i
})
def int2 : TimeInterval = if tn == stopFrom then (Some(ts), Some(ts)) else int
Map.add(m, tn, int2)
})
def placesTake: Map[Place, (Int, TimeInterval)] = Map.fold(net.places, Map.empty, (m, pn, pa) => {
def v: (Int, TimeInterval) = Map.fold(net.transitions, (0, (None, None)), (v, tn, ta) => {
if (Map.get(firings, tn) > 0) then {
def fr = Set.fold(ta.conn, v, (v, e) => {
def n = e._1
def w = e._2
if n == pn && w > 0 then
(v._1 + w * Map.get(firings, tn), mergeIntervals(v._2, Map.get(transitionsInterval, tn)))
else
v
})
fr
} else {
v
}
})
Map.add(m, pn, v)
})
def mapAfterTake: Map[Place, List[TimeInterval]] = Map.fold(placesGiveToken, Map.empty, (m, n, e) => Map.add(m, n, e._1))
def mapAfterGive: Map[Place, List[TimeInterval]] = Map.fold(net.places, Map.empty, (m, pn, pa) => {
def v = Map.get(placesTake, pn)
def l = PetriNetUtils.addTimes(List.empty, v._1, v._2)
def ntl = if (pa.fifo) then {
PetriNetUtils.appendList(Map.get(mapAfterTake, pn), l)
} else {
PetriNetUtils.prependList(Map.get(mapAfterTake, pn), l)
}
Map.add(m, pn, ntl)
})
mapAfterGive
}
def init : Map[Place, List[TimeInterval]] = Map.fold(net.places, Map.empty[Place, List[TimeInterval]], (c, pn, patt) => {
Map.add(c, pn, PetriNetUtils.addTimes(List.empty[TimeInterval], patt.init, (None, None)))
})
def trigger = Map.fold(net.transitions, nil[Int], (str, t, te) => Set.fold(te.conn, str, (c, e) => merge(c, te.trig)))
def transitionFire: Events[Map[Transition, Int]] = Map.fold(net.transitions, const(Map.empty, trigger),
(c, tn, t) => lift(c, t.trig, (co, eo) => {
def m = getSome(co)
if isNone(eo) then
Some(Map.add(m, tn, 0))
else
Some(Map.add(m, tn, getSome(eo)))
}
))
def currTime = time(transitionFire)
def lastPlaceMap : Events[Map[Place, List[TimeInterval]]] = default(last(placeMaps, transitionFire), init)
def placeMaps = default(compute(lastPlaceMap, transitionFire, currTime), init)
def placeMap : (Place) => Events[List[TimeInterval]] = (p: Place) => Map.get(placeMaps, p)
liftable def minTokenTime(lst: List[TimeInterval]): Option[Int] = {
List.fold(lst, None, (c, e) => if !isSome(e._1) || (isSome(c) && getSome(c) < getSome(e._1)) then c else e._1)
}
liftable def maxTokenTime(lst: List[TimeInterval]): Option[Int] = {
List.fold(lst, None, (c, e) => if !isSome(e._2) || (isSome(c) && getSome(c) > getSome(e._2)) then c else e._2)
}
{
tokenTimes = placeMap,
minTokenTime = (p: Place) => minTokenTime(placeMap(p)),
maxTokenTime = (p: Place) => maxTokenTime(placeMap(p))
}
}
PetriNet, Place, PlaceAtt, Transition, addPlace, addTransition, emptyNet