wiki:epics/streamdevice/simFG

EPICS講習会で使用しているテスト用の simulator (simFG)

KEKで何度か開催されているEPICS講習会のstream deviceで使用してる simFG のソースコードです。

ご自由にお使いください。

#!/usr/bin/python
import socketserver
import string
import sys
import time
import signal
import random
import math
import traceback
import re

HOST = ''
PORT = 9999

class EchoHandler(socketserver.BaseRequestHandler):
  def handle(self):
    self.t0 = time.time()
    print(time.ctime(self.t0), "  Connected from ", self.client_address)

    # constant
    self.freq = 0.2   # frequency
    self.amp  = 5.0   # ammplitude
    self.offset = 0.0 # offset

    # inner command functions
    def _getIDN():
      return "Simulator"

    def _getVolt():
      return self.creRetStr()

    def _getAmp():
      return '%f' % self.amp

    def _getFreq():
      return '%f' % self.freq

    def _getOffset():
      return '%f' % self.offset

    def _getStat():
      return '%f,%f,%f' %(self.amp, self.freq, self.offset)

    def _getInfo():
      return 'AMP:%f,FREQ:%f,OFFSET:%f' %(self.amp, self.freq, self.offset)

    def _setAmp(buf):
      val = self.checkarg(buf)
      if val:
        self.amp = val

    def _setFreq(buf):
      val = self.checkarg(buf)
      if val:
        self.freq = val

    def _setOffset(buf):
      val = self.checkarg(buf)
      if val:
        self.offset = val


    # commands dictionary
    cmnds = {
      'QUIT'  :{'set':None,             'qry':None},
      '*IDN'  :{'set':None,             'qry':_getIDN},
      'VOLT'  :{'set':None,             'qry':_getVolt},
      'AMP'   :{'set':_setAmp,          'qry':_getAmp},
      'FREQ'  :{'set':_setFreq,         'qry':_getFreq},
      'OFFSET':{'set':_setOffset,       'qry':_getOffset},
      'STAT'  :{'set':None,             'qry':_getStat},
      'INFO'  :{'set':None,             'qry':_getInfo}
    }


    flag = True
    fid = self.request.makefile()
    while flag:
        receivedData = fid.readline()
        if not receivedData: break
        try:
          # buf = string.strip(receivedData)
          buf = receivedData.strip()
          # sparate command string
          # comd = string.upper(re.split('[ ?]', buf)[0])
          comd = re.split('[ ?]', buf)[0].upper()
          if len(comd) >=3:
            rtn = None
            # if cmnds.has_key(comd):
            if comd in cmnds.keys():
              cmndDic = cmnds[comd]
              # "QUIT" is special command
              if comd == "QUIT":
                flag = False
              else:
                if buf[len(comd)] == '?':
                  if cmndDic['qry'] != None:
                    rtn = cmndDic['qry']() + '\r\n'
                    rtn = rtn.encode()
                else:
                  if cmndDic['set'] != None:
                    cmndDic['set'](buf)

            if rtn != None:
              self.request.send(rtn)
        except:
          traceback.print_exc()
          pass

    fid.close()
    self.request.close()
    print(time.ctime(time.time()), "  Disconnected (from ", self.client_address, ")")

  def checkarg(self, buf):
    try:
      c,v = buf.strip().split()
      val = float(v)
    except:
      #traceback.print_exc()
      val = None
    return val

  def creRetStr(self):
    #random.seed()
    t = time.time() - self.t0
    val = self.amp * math.sin(2*math.pi*self.freq*t) + random.random()*1.0 + self.offset
    #buf = "A=%8.4f, B=%8.4f, C=%8.4f"%(val,val,val2*2.345)
    buf = "%8.4f"%val
    return buf

# end of class EchoHandler

def signal_handler(signal, frame):
  sys.exit(0)

# __ main __
if __name__ == '__main__':
  signal.signal(signal.SIGINT, signal_handler)
  socketserver.ForkingTCPServer.allow_reuse_address = True
  srv = socketserver.ForkingTCPServer((HOST,PORT), EchoHandler)
  print("StreamDevice Simulator started.")
  srv.serve_forever()
Last modified 3 weeks ago Last modified on 02/25/25 09:01:51
Note: See TracWiki for help on using the wiki.