python-mesa:Agent Based Model 简单教程
教程链接:https://mesa.readthedocs.io/en/latest/tutorials/intro_tutorial.html
Mesa是用于构建,分析和可视化基于代理的模型(Agent Based Model)的模块化框架。
安装:
pip install mesa
讯享网
示例1:随机发红包游戏
假设有一个群,里面有十个人,游戏规则是这样的:
每个人初始有一元钱,游戏开始后,每轮每人随机选定群里的一个人(可以是自己)发一个1元红包。如果钱数为0,本轮就不用发。
游戏进行10轮,试着用代码描述这样的过程。
过程有三步:设置模型,添加调度器,进行仿真。
仿真主要有Agent和Model两类,一般在Agent中用面向对象的方式定义不同Agent中的竞争合作关系;而Model类用来管理若干个Agent,
本文完整代码链接:https://gitee.com/hzy/python_code_collection/tree/master/%E7%AE%97%E6%B3%95%E4%B8%8E%E5%BB%BA%E6%A8%A1/%E4%BB%BF%E7%9C%9F/mesa
讯享网from mesa import Agent, Model from mesa.time import RandomActivation class MoneyAgent(Agent): "带有固定初始财富的代理人Agent。" def __init__(self, id, model): super().__init__(id, model) # 自身的id和归属的模型。 self.wealth = 1 def step(self): if (self.wealth == 0): return otherAgent = self.random.choice(self.model.schedule.agents) # 在模型的多个Agent中,随机选出一个Agent otherAgent.wealth += 1 # 给他一块钱的红包。相当于是蒙特卡洛仿真一下。当然,过程结束之后谁得到多少钱是随机的。 self.wealth -= 1
class MoneyModel(Model): # 管理代理人的模型。 def __init__(self, N): self.agentNum = N self.schedule = RandomActivation(self) # 创建Agents for i in range(self.agentNum): a = MoneyAgent(i, self) self.schedule.add(a) def step(self): self.schedule.step()
最后调用matplotlib进行绘图。绘出的图像是每位代理人的财富(纵轴)与人数(横轴)关系的直方图。
讯享网import matplotlib.pyplot as plt agent_wealth = [a.wealth for a in model.schedule.agents] plt.hist(agent_wealth)

讯享网
如果将这样的游戏反复进行一百次,每次游戏进行十轮。将每次游戏后每个人的钱数看做观察的样本,那么应该会产生100次×10人=1000个样本。这一千个样本中,金钱是怎样分布的?
我们不妨按照程序编一下,这部分完整代码见链接。
本文完整代码链接:https://gitee.com/hzy/python_code_collection/tree/master/%E7%AE%97%E6%B3%95%E4%B8%8E%E5%BB%BA%E6%A8%A1/%E4%BB%BF%E7%9C%9F/mesa

上面就是一千个样本频数分布。可见大概40%的人亏到0了
示例2:基于MESA的Web可视化模块的排队论模型(原创)
mesa库与netlogo类似,在内部的基础上,还有一个创建仿真界面的功能。这个仿真界面前端使用web技术,后端是基于python的tornado开发的,可以说是适应了Web技术大行其道的潮流吧。
当然这种技术也有缺点。缺点之一就是python和javascript的速度都不怎么样,比起基于java的netlogo还是力有不逮;其次,网络通信的速度也一般,最高仿真速率只有20fps,也就是说一秒钟刷新20次。
不多说了,先上代码吧。
from mesa import Agent from mesa import Model from mesa.datacollection import DataCollector from mesa.space import Grid from mesa.time import RandomActivation import numpy as np from mesa.visualization.modules import CanvasGrid, ChartModule, PieChartModule, TextElement from mesa.visualization.ModularVisualization import ModularServer from mesa.visualization.UserParam import UserSettableParameter import mesa print(mesa.__file__) # 可以修改服务器函数中的一些功能. class StatusElement(TextElement): ''' 显示当前服务的状态。. ''' def __init__(self): pass def render(self, model): return "目前已经服务的人数: " + str(model.count_type(model, 'served')) class QueueModel(Model): """ 一个简单的排队论模型。 """ def __init__(self, height=100, width=100, meanServiceTime=10, mtba=10): # mtba的意思是平均到达的时间间隔 # Initialize model parameters self.height = height self.width = width self.mtba = mtba self.meanServiceTime = meanServiceTime self.customerNum = 0 self.interval = 0 # Set up model objects self.schedule = RandomActivation(self) self.grid = Grid(width, height, torus=False) self.datacollector = DataCollector( {
"serving": lambda m: self.count_type(m, "serving"), "served": lambda m: self.count_type(m, "served"), "waiting": lambda m: self.count_type(m, "waiting")}) self.running = True self.datacollector.collect(self) def generateCustomer(self): c = Customer(self.customerNum, self, meanServiceTime=self.meanServiceTime) self.schedule.add(c) self.grid.place_agent(c, (0, 0)) self.customerNum += 1 def step(self): #for i in range(10): self.schedule.step() self.datacollector.collect(self) if (self.interval <= 0): # 如果产生顾客的间隔归零 self.generateCustomer() # 就产生一个顾客 self.interval = np.random.poisson(lam=self.mtba) else: self.interval -= 1 @staticmethod def count_type(model, customer_condition): """ Helper method to count trees in a given condition in a given model. """ count = 0 for customer in model.schedule.agents: if customer.condition == customer_condition: count += 1 return count class Customer(Agent): def __init__(self, pos, model, meanServiceTime): # mtba是顾客到达的平均时间间隔(暂时是服从简单的random分布) super().__init__(pos, model) self.model = model self.pos = pos self.condition = "waiting" # "serving","waiting","served" self.serviceTime = np.random.poisson(lam=meanServiceTime) # 这一步是服务时间,可以做成随机生成的! def step(self): """ If the tree is on fire, spread it to fine trees nearby. """ if self.condition == "waiting": if (self.checkNextAgent() == False): # 如果下个单元格没有顾客占用的话 self.move(dx=1, dy=0) if self.pos[0] == 18: self.condition = "serving" elif self.condition == "serving": self.serviceTime -= 1 if (self.serviceTime <= 0): self.condition = "served" self.model.grid.remove_agent(self) def checkNextAgent(self): # 检测下一个单元格是否有顾客占用。 absoPos = self.convertAbsoPos(1, 0) nextAgent = self.model.grid.get_cell_list_contents([absoPos]) if nextAgent == []: return False else: return True def convertAbsoPos(self, dx: int, dy: int): # 输入相对这个Agent的位置返回绝对位置 x, y = self.pos[0] + dx, self.pos[1] + dy width = self.model.grid.width if (x >= width): x -= width elif x < 0: x += width if (y >= width): y -= width elif y < 0: y += width return (x, y) def move(self, dx: int, dy: int): new_position = (self.pos[0] + dx, self.pos[1] + dy) # 向右为正,向上为正。 self.model.grid.move_agent(self, new_position) def get_pos(self): return self.pos COLORS = {
"serving": "#00AA00", "waiting": "#", "served": "#000000"} def serviceTablePotrayal(customer): if customer is None: return portrayal = {
"Shape": "rect", "w": 1, "h": 1, "Filled": "true", "Layer": 0} (x, y) = customer.get_pos() portrayal["x"] = x portrayal["y"] = y portrayal["Color"] = COLORS[customer.condition] return portrayal height = 1 width = 20 canvas_element = CanvasGrid(serviceTablePotrayal, width, height, width * 20, height * 20) customer_chart = ChartModule([{
"Label": label, "Color": color} for (label, color) in COLORS.items()]) statusElement = StatusElement() pie_chart = PieChartModule([{
"Label": label, "Color": color} for (label, color) in COLORS.items()]) model_params = {
"height": height, "width": width, "mtba": UserSettableParameter("slider", "顾客到达的平均时间(MTBA)", 10, 1, 20, 1), "meanServiceTime": UserSettableParameter("slider", "处理每位顾客要求所需的时间", 10, 1, 20, 1) } server = ModularServer(QueueModel, [canvas_element, customer_chart, statusElement], "Forest Fire", model_params) server.launch() # --------------------------------以下被注释掉的内容是不启动图形化界面也可以运行的。特点是仿真速度非常快。 # # model= QueueModel(meanServiceTime=5,mtba=4.5) # time = 0 # while(1): # time+=1 # model.step() # print(time,"served: " + str(model.count_type(model,'served')),"waiting: " + str(model.count_type(model,'waiting')))
数据收集器
然后就是datacollector——这个数据收集器是做啥的呢?当然是收集当前的局部变量用的!定义顾客有三种状态,serving:正在被服务,served已经接受过服务,waiting:正在排队。就这三种状态。而datacollector的目的就是统计每种状态对应的人数:排队的,已经服务完了的,还有正在服务的。
然后写一下count_type方法,这个方法用于统计处于各种条件下的顾客人数。
排队本身的问题
在模型方面,写一下Model中的generateCustomer这个方法,用于生成顾客。生成顾客时,同时生成一个服从泊松分布的随机数self.interval。每次step的时候,随机数减一;减到零的时候,生成下一个顾客。
每个step中,若前方无人,则顾客向服务台运动一个单位。

着色与控件
定义网格画布控件:
讯享网def serviceTablePotrayal(tree): if tree is None: return portrayal = {
"Shape": "rect", "w": 1, "h": 1, "Filled": "true", "Layer": 0} (x, y) = tree.get_pos() portrayal["x"] = x portrayal["y"] = y portrayal["Color"] = COLORS[tree.condition] return portrayal height = 1 width = 20 canvas_element = CanvasGrid(serviceTablePotrayal, width, height, width * 20, height * 20)
然后再分析一下画布的宽高数据。输入的width和height是有多少个格子的意思,而再往后的两个参数我输入的分别是width20和height20,这是啥意思呢?这个的意思是每个方格的宽度和高度,都是20像素!所以整个画布占用的像素点就是400*20。
插入图表
教程中提到过插入图表,于是就插入吧。具体插入方法由于我暂时也不太理解原理,所以在这里不讲,大概是在代码的160行左右插入了两个图表。删除图表之后其实也是可以运行的。
运行效果与速度优化
于是开始点击运行,效果如下:

这样自然可以运行,但是美中不足的就是最高帧率才20fps——和netlogo比起来岂不是要慢的螺旋升天了?不过不要急,办法还是有的。
python和java在速度上的差距或许很难弥补回来,但是在网络通信方面还是可以优化的。
为什么要限制在20fps?我读了一下源码,大概意思是这样的:Web页面的js脚本每隔一段时间就向服务端发送一个请求,这个间隔时间就是fps的倒数。想象一下一秒钟发20个请求就已经很了不起了吧——如果一秒钟几百几千个请求,岂不是要上天?
没事没事!有办法!
原理:当服务端接收到前端的请求后,驱动Model(目前的Model是QueueModel)的step方法执行一次。
那好办,只要是重写一个函数,让step多执行几次不就是了?
原来的step
def step(self): #for i in range(10): self.schedule.step() self.datacollector.collect(self) if (self.interval <= 0): # 如果产生顾客的间隔归零 self.generateCustomer() # 就产生一个顾客 self.interval = np.random.poisson(lam=self.mtba) else: self.interval -= 1
改成下面这样:
讯享网 def step(self): for i in range(10): self.baseStep() def baseStep(self): #for i in range(10): self.schedule.step() self.datacollector.collect(self) if (self.interval <= 0): # 如果产生顾客的间隔归零 self.generateCustomer() # 就产生一个顾客 self.interval = np.random.poisson(lam=self.mtba) else: self.interval -= 1
这样一看,岂不是加速了十倍?不错不错!
加速效果:(由于执行时服务人数已经较多,所以看上去图线变化并不明显。但是如果看一下“已经服务的人数”这一栏中数字的变化,就能看出十分明显的差别!!)

当然要注意,加速的倍数不要太大,否则运算一次step的时间会长于两次请求之间的间隔,直接影响就是卡顿。我的电脑cpu是i5-6200u,当每秒刷新率为3fps时实测最大的加速倍数约为150~200倍,此时差不多能保证模型的流畅性。如果刷新率取最高200fps,恐怕还需要再降低一些。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/40619.html