= EPICS講習会で使用しているテスト用の simulator (simFG) = KEKで何度か開催されている[https://cerldev.kek.jp/trac/EpicsUsersJP/#EPICS%E5%85%A5%E9%96%80%E8%AC%9B%E7%BF%92%E4%BC%9A%E8%B3%87%E6%96%99%E3%81%AA%E3%81%A9 EPICS講習会]のstream deviceで使用してる simFG のソースコードです。 ご自由にお使いください。 {{{ #!/usr/bin/python import socketserver import string import sys import time import signal import random import math import traceback import re HOST = '' PORT = 9999 class EchoHandler(socketserver.BaseRequestHandler): def handle(self): self.t0 = time.time() print(time.ctime(self.t0), " Connected from ", self.client_address) # constant self.freq = 0.2 # frequency self.amp = 5.0 # ammplitude self.offset = 0.0 # offset # inner command functions def _getIDN(): return "Simulator" def _getVolt(): return self.creRetStr() def _getAmp(): return '%f' % self.amp def _getFreq(): return '%f' % self.freq def _getOffset(): return '%f' % self.offset def _getStat(): return '%f,%f,%f' %(self.amp, self.freq, self.offset) def _getInfo(): return 'AMP:%f,FREQ:%f,OFFSET:%f' %(self.amp, self.freq, self.offset) def _setAmp(buf): val = self.checkarg(buf) if val: self.amp = val def _setFreq(buf): val = self.checkarg(buf) if val: self.freq = val def _setOffset(buf): val = self.checkarg(buf) if val: self.offset = val # commands dictionary cmnds = { 'QUIT' :{'set':None, 'qry':None}, '*IDN' :{'set':None, 'qry':_getIDN}, 'VOLT' :{'set':None, 'qry':_getVolt}, 'AMP' :{'set':_setAmp, 'qry':_getAmp}, 'FREQ' :{'set':_setFreq, 'qry':_getFreq}, 'OFFSET':{'set':_setOffset, 'qry':_getOffset}, 'STAT' :{'set':None, 'qry':_getStat}, 'INFO' :{'set':None, 'qry':_getInfo} } flag = True fid = self.request.makefile() while flag: receivedData = fid.readline() if not receivedData: break try: # buf = string.strip(receivedData) buf = receivedData.strip() # sparate command string # comd = string.upper(re.split('[ ?]', buf)[0]) comd = re.split('[ ?]', buf)[0].upper() if len(comd) >=3: rtn = None # if cmnds.has_key(comd): if comd in cmnds.keys(): cmndDic = cmnds[comd] # "QUIT" is special command if comd == "QUIT": flag = False else: if buf[len(comd)] == '?': if cmndDic['qry'] != None: rtn = cmndDic['qry']() + '\r\n' rtn = rtn.encode() else: if cmndDic['set'] != None: cmndDic['set'](buf) if rtn != None: self.request.send(rtn) except: traceback.print_exc() pass fid.close() self.request.close() print(time.ctime(time.time()), " Disconnected (from ", self.client_address, ")") def checkarg(self, buf): try: c,v = buf.strip().split() val = float(v) except: #traceback.print_exc() val = None return val def creRetStr(self): #random.seed() t = time.time() - self.t0 val = self.amp * math.sin(2*math.pi*self.freq*t) + random.random()*1.0 + self.offset #buf = "A=%8.4f, B=%8.4f, C=%8.4f"%(val,val,val2*2.345) buf = "%8.4f"%val return buf # end of class EchoHandler def signal_handler(signal, frame): sys.exit(0) # __ main __ if __name__ == '__main__': signal.signal(signal.SIGINT, signal_handler) socketserver.ForkingTCPServer.allow_reuse_address = True srv = socketserver.ForkingTCPServer((HOST,PORT), EchoHandler) print("StreamDevice Simulator started.") srv.serve_forever() }}}