TeSSLa Petri-Net Library

module PetriNetMonitoring

Module for runtime monitoring of petri nets.

The module provides three kinds of monitors

  • TokenCntMonitor: With this monitor it is possible to check the number of tokens in a place and to detect under and overflows.
  • PlaceTimeMonitor: With this monitor it is possible to check the time since when token remain in a certain place.
  • TokenTrackingMonitor: With this monitor it is possible to check the time tokens need from one place of the network to another one.

Functions for modelling the petri net to be monitored can be found in module PetriNetStructure

Imports

ANCHOR Type PlaceTimeMonitor

PlaceTimeMonitor

Type for a PlaceTimeMonitor to check the time since when token remain in a certain place.

Offers the following functions:

  • tokenTs(place): Provides a stream containing a List of timestamps for a specific place. Each timestamp represents the time a token in the place has been added.
  • avgTime(place): Provides a stream containing the average timestamp of the tokens in the place
  • maxTime(place): Provides a stream containing the smallest timestamp of the tokens in the place

Usage

import PetriNetStructure
import PetriNetMonitoring

in: t0: Events[Int]

def petriNet =
addPlace("p0", 5, Some(10), true,
addPlace("p1", 6, Some(8), false,
addPlace("p2", 0, None, true,
addTransition("t0", Set.add(Set.empty,("p0", 1)), Set.add(Set.add(Set.empty,("p1", 1)),("p2", 2)), t0,
emptyNet))))

def tokenCntMonitor = toPlaceTimeMonitor(petriNet)

out tokenCntMonitor.maxTime("p0")
out tokenCntMonitor.avgTime("p0")

out time(period(10)) - tokenCntMonitor.maxTime("p0") as maxTimeP0

The monitor assumes valid inputs for the firing of transitions.

ANCHOR Type TimeInterval

TimeInterval

Type for a time interval where the interval borders are optional. Used to encode the minimal and maximal timestamp when a token passed a specific transition in the net For details see: TokenTrackingMonitor

ANCHOR Type TokenCntMonitor

TokenCntMonitor

Type for a TokenCntMonitor to check the number of tokens in a place and to detect under and overflows.

Offers the following functions:

  • tokenCnt(place): Provides a stream containing the current number of tokens in a place
  • underflow(place): Provides a stream indicating an underflow in a place
  • overflow(place): Provides a stream indicating an overflow in a place

Usage

import PetriNetStructure
import PetriNetMonitoring

in: t0: Events[Int]

def petriNet =
addPlace("p0", 5, Some(10), true,
addPlace("p1", 6, Some(8), false,
addPlace("p2", 0, None, true,
addTransition("t0", Set.add(Set.empty,("p0", 1)), Set.add(Set.add(Set.empty,("p1", 1)),("p2", 2)), t0,
emptyNet))))

def tokenCntMonitor = toTokenCntMonitor(petriNet)

out tokenCntMonitor.tokenCnt("p0")
out tokenCntMonitor.overflow("p0")

This kind of monitor does not require dynamic data structures.

ANCHOR Type TokenTrackingMonitor

TokenTrackingMonitor

Type for a TokenTrackingMonitor to check the time tokens need from one place of the network to another one.

The general idea behind this monitor is that all tokens store the timestamp when they were generated by a specific transition in the network. The other transitions then produce tokens that are "melted" together from the incoming tokens and contain a range of possible timestamps, determined by the timestamps of the consumed tokens. With this kind of monitor it is possible to track the time of token flows through the network. Especially in the case of linear networks.

Offers the following functions:

  • tokenTimes(place): Provides a stream containing a List of TimeIntervals for a specific place. Each TimeInterval represents the timestamp a token got when it passed a specific transition in the network.
  • minTokenTime(place): Provides a stream containing the minimal timestamp of the TimeIntervals in the corresponding place
  • maxTokenTime(place): Provides a stream containing the maximal timestamp of the TimeIntervals in the corresponding place

Usage

import PetriNetStructure
import PetriNetMonitoring

in: t0: Events[Int]

def petriNet =
addPlace("p0", 5, Some(10), true,
addPlace("p1", 6, Some(8), false,
addPlace("p2", 0, None, true,
addTransition("t0", Set.add(Set.empty,("p0", 1)), Set.add(Set.add(Set.empty,("p1", 1)),("p2", 2)), t0,
emptyNet))))

def tokenCntMonitor = toTokenTrackingMonitor(petriNet, "t0")

out tokenCntMonitor.minTokenTime("p0")
out tokenCntMonitor.maxTokenTime("p0")

out time(period(10)) - tokenCntMonitor.maxTokenTime("p0") as maxTimeP0

The monitor assumes valid inputs for the firing of transitions.

ANCHOR toPlaceTimeMonitor

toPlaceTimeMonitor(net: PetriNet): {avgTime: ( Place) => Events[Float], maxTime: ( Place) => Events[Int], tokenTs: ( Place) => Events[List[Int]]}

Generates a PlaceTimeMonitor for a given Petri net

For a usage example see: PlaceTimeMonitor

	def toPlaceTimeMonitor(net: PetriNet): {tokenTs : (Place) => Events[List[Int]], avgTime : (Place) => Events[Float], maxTime : (Place) => Events[Int]} = {
			def m : Map[Place, Events[List[Int]]] =	Map.fold(
				net.places,
				Map.empty[Place, Events[List[Int]]],
				(mon, placeName, placeAtt) => {

					def init = PetriNetUtils.addTimes(List.empty[Int], placeAtt.init, 0)

					def trigger = Map.fold(net.transitions, nil[Int],
						(str, t, te) => {
							Set.fold(
								te.conn,
								str,
								(c, e) => static if (e._1 == placeName) then merge(c, te.trig) else c
							)
						}
					)
					
						def addStreamNum: Events[Int] = Map.fold(net.transitions, const(0, trigger),
						(str, t, te) => {
							Set.fold(
								te.conn,
								str,
								(c, e) => {
									def pl = e._1
									def w = e._2
									static if (w > 0 && pl == placeName) then {
										PetriNetUtils.asyncAdd(c, w * te.trig)
									} else {
										c
									}
								}
							)
						}
					)
					def addStream: Events[List[Int]] = PetriNetUtils.addTimes(default(nil, List.empty[Int]), addStreamNum, time(trigger))

					def subStream: Events[Int] = Map.fold(net.transitions, const(0, trigger),
						(str, t, te) => {
							Set.fold(
								te.conn,
								str,
								(c, e) => {
									def pl = e._1
									def w = e._2
									static if (w < 0 && pl == placeName) then {
										PetriNetUtils.asyncAdd(c, -w * te.trig)
									} else {
										c
									}
								}
							)
						}
					)

					def lastPlaceStream: Events[List[Int]] = default(last(placeStream2, trigger), init)
					def placeStream: Events[List[Int]] = PetriNetUtils.removeNFront(lastPlaceStream, subStream)
					def placeStreamApp: Events[List[Int]] = static if placeAtt.fifo then 
							PetriNetUtils.appendList(placeStream, addStream)
						else
							PetriNetUtils.prependList(placeStream, addStream)
					def placeStream2: Events[List[Int]] = default(placeStreamApp, init)

					Map.add(mon, placeName, placeStream2)
				})

			def tokenTs : (Place) => Events[List[Int]] = (p: Place) => Map.get(m, p)
			def avgTime : (Place) => Events[Float] = (p: Place) => PetriNetUtils.listAvg(Map.get(m, p))
			def maxTime : (Place) => Events[Int] = (p: Place) => PetriNetUtils.listMin(Map.get(m, p))

			{
				tokenTs = tokenTs, avgTime = avgTime, maxTime = maxTime
			}
	}

ANCHOR toTokenCntMonitor

toTokenCntMonitor(net: PetriNet): {overflow: ( Place) => Events[Bool], tokenCnt: ( Place) => Events[Int], underflow: ( Place) => Events[Bool]}

Generates a TokenCntMonitor for a given Petri net

For a usage example see: TokenCntMonitor

	def toTokenCntMonitor(net: PetriNet): {tokenCnt : (Place) => Events[Int], underflow : (Place) => Events[Bool], overflow : (Place) => Events[Bool]} = {
		def m : Map[Place, {tokenCnt : Events[Int], underflow: Events[Bool], overflow: Events[Bool]}] = Map.fold(
			net.places,
			Map.empty[Place, {tokenCnt : Events[Int], underflow: Events[Bool], overflow: Events[Bool]}],
			(mon, placeName, placeAtt) => {

				def addStream: Events[Int] = Map.fold(net.transitions, nil[Int],
				(str, t, te) => {
					Set.fold(
						te.conn,
						str,
						(c, e) => {
							def pl = e._1
							def w = e._2
							static if (w > 0 && pl == placeName) then {
								PetriNetUtils.asyncAdd(c, w * te.trig)
							} else {
								c
							}
						}
					)
				}
			)
			
			def subStream: Events[Int] = Map.fold(net.transitions, nil[Int],
				(str, t, te) => {
					Set.fold(
						te.conn,
						str,
						(c, e) => {
							def pl = e._1
							def w = e._2
							static if (w < 0 && pl == placeName) then {
								PetriNetUtils.asyncAdd(c, w * te.trig)
							} else {
								c
							}
						}
					)
				}
			)

			def placeStream: Events[Int] = PetriNetUtils.asyncAdd(default(last(placeStream2, merge(subStream,addStream)), placeAtt.init), subStream)
			def placeStream2: Events[Int] = default(PetriNetUtils.asyncAdd(placeStream, addStream), placeAtt.init)
			def underflow: Events[Bool] = placeStream < 0
			def overflow: Events[Bool] = isSome(default(nil, placeAtt.max)) && (placeStream2 > getSome(placeAtt.max))

			Map.add(mon, placeName, {tokenCnt = placeStream2, underflow = underflow, overflow = overflow})
			}
		)
		
		{
			tokenCnt = (p: Place) => Map.get(m, p).tokenCnt,
			underflow = (p: Place) => Map.get(m, p).underflow,
			overflow = (p: Place) => Map.get(m, p).overflow
		}
	}

ANCHOR toTokenTrackingMonitor

toTokenTrackingMonitor(net: PetriNet, stopFrom: Transition)

Generates a TokenTrackingMonitor for a given Petri net and a transition (stopFrom) for which the timestamps of token creation are recorded.

For a usage example see: TokenTrackingMonitor

	def toTokenTrackingMonitor(net: PetriNet, stopFrom: Transition) = {

		liftable def compute(map: Map[Place, List[TimeInterval]], firings: Map[Transition, Int], ts: Int) = {

		 	def placesGive: Map[Place, Int] = Map.fold(net.places, Map.empty, (m, pn, pa) => {
		 		def sum = Map.fold(net.transitions, 0, (s, tn, ta) => {
		 					if (Map.get(firings, tn) > 0) then {
		 						Set.fold(ta.conn, s, (s, e) => {
		 							def n = e._1
		 							def w = e._2
		 							if n == pn && w < 0 then s - w * Map.get(firings, tn) else s
		 						})
		 					} else {
		 						s
		 					}
		 				})
		 		Map.add(m, pn, sum)
		 	})

		 	def placesGiveToken : Map[Place, (List[TimeInterval], List[TimeInterval])] = Map.fold(net.places, Map.empty, (m, pn, pa) => {
		 		Map.add(m, pn, PetriNetUtils.removeNFrontColl(Map.get(map, pn), Map.get(placesGive, pn)))
		 	})

		 	def placesGiveInterval: Map[Place, TimeInterval] = Map.fold(placesGiveToken, Map.empty, (m, pn, lsts) => {
		 		Map.add(m, pn, List.fold(lsts._2, (None, None), (c, i) => mergeIntervals(c,i)))
		 	})

		 	def transitionsInterval: Map[Transition, TimeInterval] = Map.fold(firings, Map.empty, (m, tn, tf) => {
		 		def int : TimeInterval = Set.fold(Map.get(net.transitions, tn).conn, (None, None), (i, e) => {
		 			def p = e._1
		 			def w = e._2
		 			if w < 0 then mergeIntervals(i, Map.get(placesGiveInterval, p)) else i
		 		})
		 		def int2 : TimeInterval = if tn == stopFrom then (Some(ts), Some(ts)) else int
		 		Map.add(m, tn, int2)
		 	})

		 	def placesTake: Map[Place, (Int, TimeInterval)] = Map.fold(net.places, Map.empty, (m, pn, pa) => {
		 		def v: (Int, TimeInterval) = Map.fold(net.transitions, (0, (None, None)), (v, tn, ta) => {
		 					if (Map.get(firings, tn) > 0) then {
		 						def fr = Set.fold(ta.conn, v, (v, e) => {
		 							def n = e._1
		 							def w = e._2
		 							if n == pn && w > 0 then 
		 								(v._1 + w * Map.get(firings, tn), mergeIntervals(v._2, Map.get(transitionsInterval, tn))) 
		 							else 
		 								v
		 						})
		 						fr
		 					} else {
		 						v
		 					}
		 				})
		 		Map.add(m, pn, v)
		 	})

		 	def mapAfterTake: Map[Place, List[TimeInterval]] = Map.fold(placesGiveToken, Map.empty, (m, n, e) => Map.add(m, n, e._1))

		 	def mapAfterGive: Map[Place, List[TimeInterval]] = Map.fold(net.places, Map.empty, (m, pn, pa) => {
		 		def v = Map.get(placesTake, pn)
		 		def l = PetriNetUtils.addTimes(List.empty, v._1, v._2)
		 		def ntl = if (pa.fifo) then {
		 			PetriNetUtils.appendList(Map.get(mapAfterTake, pn), l)
		 		} else {
		 			PetriNetUtils.prependList(Map.get(mapAfterTake, pn), l)
		 		}
		 		Map.add(m, pn, ntl)
		 	})

		mapAfterGive
		}

		def init : Map[Place, List[TimeInterval]] = Map.fold(net.places, Map.empty[Place, List[TimeInterval]], (c, pn, patt) => {
				Map.add(c, pn, PetriNetUtils.addTimes(List.empty[TimeInterval], patt.init, (None, None)))
		})

		def trigger = Map.fold(net.transitions, nil[Int], (str, t, te) => Set.fold(te.conn, str, (c, e) => merge(c, te.trig)))

		def transitionFire: Events[Map[Transition, Int]] = Map.fold(net.transitions, const(Map.empty, trigger),
			(c, tn, t) => lift(c, t.trig, (co, eo) => {
				def m = getSome(co)
				if isNone(eo) then 
					Some(Map.add(m, tn, 0))
				else
					Some(Map.add(m, tn, getSome(eo)))
			}
		))

		def currTime = time(transitionFire)
		def lastPlaceMap : Events[Map[Place, List[TimeInterval]]] = default(last(placeMaps, transitionFire), init)
		def placeMaps = default(compute(lastPlaceMap, transitionFire, currTime), init)
		def placeMap : (Place) => Events[List[TimeInterval]] = (p: Place) => Map.get(placeMaps, p)

		liftable def minTokenTime(lst: List[TimeInterval]): Option[Int] = {
			List.fold(lst, None, (c, e) => if !isSome(e._1) || (isSome(c) && getSome(c) < getSome(e._1)) then c else e._1)
		}

		liftable def maxTokenTime(lst: List[TimeInterval]): Option[Int] = {
			List.fold(lst, None, (c, e) => if !isSome(e._2) || (isSome(c) && getSome(c) > getSome(e._2)) then c else e._2)
		}

		{
			tokenTimes = placeMap,
			minTokenTime = (p: Place) => minTokenTime(placeMap(p)),
			maxTokenTime = (p: Place) => maxTokenTime(placeMap(p))
		}
	}
Imported from PetriNetStructure

PetriNet, Place, PlaceAtt, Transition, addPlace, addTransition, emptyNet