EPICS講習会で使用しているテスト用の simulator (simFG)
KEKで何度か開催されているEPICS講習会のstream deviceで使用してる simFG のソースコードです。
ご自由にお使いください。
#!/usr/bin/python
import socketserver
import string
import sys
import time
import signal
import random
import math
import traceback
import re
HOST = ''
PORT = 9999
class EchoHandler(socketserver.BaseRequestHandler):
def handle(self):
self.t0 = time.time()
print(time.ctime(self.t0), " Connected from ", self.client_address)
# constant
self.freq = 0.2 # frequency
self.amp = 5.0 # ammplitude
self.offset = 0.0 # offset
# inner command functions
def _getIDN():
return "Simulator"
def _getVolt():
return self.creRetStr()
def _getAmp():
return '%f' % self.amp
def _getFreq():
return '%f' % self.freq
def _getOffset():
return '%f' % self.offset
def _getStat():
return '%f,%f,%f' %(self.amp, self.freq, self.offset)
def _getInfo():
return 'AMP:%f,FREQ:%f,OFFSET:%f' %(self.amp, self.freq, self.offset)
def _setAmp(buf):
val = self.checkarg(buf)
if val:
self.amp = val
def _setFreq(buf):
val = self.checkarg(buf)
if val:
self.freq = val
def _setOffset(buf):
val = self.checkarg(buf)
if val:
self.offset = val
# commands dictionary
cmnds = {
'QUIT' :{'set':None, 'qry':None},
'*IDN' :{'set':None, 'qry':_getIDN},
'VOLT' :{'set':None, 'qry':_getVolt},
'AMP' :{'set':_setAmp, 'qry':_getAmp},
'FREQ' :{'set':_setFreq, 'qry':_getFreq},
'OFFSET':{'set':_setOffset, 'qry':_getOffset},
'STAT' :{'set':None, 'qry':_getStat},
'INFO' :{'set':None, 'qry':_getInfo}
}
flag = True
fid = self.request.makefile()
while flag:
receivedData = fid.readline()
if not receivedData: break
try:
# buf = string.strip(receivedData)
buf = receivedData.strip()
# sparate command string
# comd = string.upper(re.split('[ ?]', buf)[0])
comd = re.split('[ ?]', buf)[0].upper()
if len(comd) >=3:
rtn = None
# if cmnds.has_key(comd):
if comd in cmnds.keys():
cmndDic = cmnds[comd]
# "QUIT" is special command
if comd == "QUIT":
flag = False
else:
if buf[len(comd)] == '?':
if cmndDic['qry'] != None:
rtn = cmndDic['qry']() + '\r\n'
rtn = rtn.encode()
else:
if cmndDic['set'] != None:
cmndDic['set'](buf)
if rtn != None:
self.request.send(rtn)
except:
traceback.print_exc()
pass
fid.close()
self.request.close()
print(time.ctime(time.time()), " Disconnected (from ", self.client_address, ")")
def checkarg(self, buf):
try:
c,v = buf.strip().split()
val = float(v)
except:
#traceback.print_exc()
val = None
return val
def creRetStr(self):
#random.seed()
t = time.time() - self.t0
val = self.amp * math.sin(2*math.pi*self.freq*t) + random.random()*1.0 + self.offset
#buf = "A=%8.4f, B=%8.4f, C=%8.4f"%(val,val,val2*2.345)
buf = "%8.4f"%val
return buf
# end of class EchoHandler
def signal_handler(signal, frame):
sys.exit(0)
# __ main __
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)
socketserver.ForkingTCPServer.allow_reuse_address = True
srv = socketserver.ForkingTCPServer((HOST,PORT), EchoHandler)
print("StreamDevice Simulator started.")
srv.serve_forever()
Last modified
8 months ago
Last modified on 02/25/25 09:01:51
Note:
See TracWiki
for help on using the wiki.
