from octopus.transport.basic import serial, tcp from octopus.manufacturer import vapourtec, knauer, thalesnano, vici, mt from octopus.data.manipulation import Max, Min from octopus.sequence.control import StateMonitor from octopus.sequence.util import Looping, Dependent from octopus.image import source, tracker from octopus.runtime import * title("Zirconia Hydrolysis") id("zirconia-hydrolysis") r = vapourtec.R2R4(tcp("192.168.15.156", 9002)) ir = mt.ICIR(tcp("192.168.15.3", 8124), stream_names = ["product"]) # # Experiment Parameters # ir_start_trigger = variable(100, "start", "start") ir_low_trigger = variable(80, "low", "low") vapourtec_flow_rate = 200 column_dead_volume = 5000 sms_phone_no = "447941123456" # # Logic for SMS notification # smsnotifier = sms.ClockworkSMS("enter-api-key-here") def sms_notify (msg): msg = "2 step hydrog experiment: %s" % msg smsnotifier.notify(sms_phone_no, msg) # # Logic to discard output when parameters are not within acceptable levels. # # StateFlip is a monitor which observes a set of expressions. If one of these # tests fails, then fail_fn is called. When all of the expressions pass again, # pass_fn is called. This sequence runs in a loop. class StateFlip (Looping, Dependent): interval = 0.5 def __init__ (self, pass_fn = None, fail_fn = None): Dependent.__init__(self) Looping.__init__(self) # Depending on the implementation of Python, # __builtins__ can be a module or its dict. try: b_set = __builtins__.set except AttributeError: b_set = __builtins__["set"] self.tests = b_set() self.pass_fn = pass_fn self.fail_fn = fail_fn self._step = None self._ok = False def add (self, test): self.tests.add(test) def remove (self, test): self.tests.discard(test) def _iterate (self): step = None if (self._ok) and (not all(self.tests)): try: step = self.fail_fn() except TypeError: pass self._ok = False elif (not self._ok) and all(self.tests): try: step = self.pass_fn() except TypeError: pass self._ok = True if step is not None: if self._step is not None: try: self._step.cancel() self._step.log -= self.log except: pass step.log += self.log step.reset() step.run() self._step = step def _cancel (self): Looping._cancel(self) try: self._step.log -= self.log return d.append(self._step.cancel()) except: pass # # State Monitor Definition # collect_monitor = StateFlip( pass_fn = sequence( set(r.output, "collect"), log("Vapourtec parameters OK"), ), fail_fn = sequence( set(r.output, "waste"), log("Vapourtec parameters not OK"), ) ) # Collection should resume as soon as IR response is OK. collect_monitor.add(ir.product > ir_low_trigger) # Temperature and pressure must have been OK for one column volume for collection to # be allowed. collect_monitor.add(Min(r.heater3.temp, column_dead_volume / vapourtec_flow_rate * 60) > 95) collect_monitor.add(Max(r.pump2.airlock, column_dead_volume / vapourtec_flow_rate * 60) < 10000) # The Vapourtec R2 publishes an "airlock" parameter which goes very high # if there is a drop in pressure - usually this indicates an air bubble # in the pump. vapourtec_monitor = StateMonitor() vapourtec_monitor.add(r.pump2.airlock < 10000) vapourtec_monitor.step = sequence( log("Airlock problem in R2"), call(sms_notify, "Airlock problem in R2"), wait("5m") # Send SMS max once every 5 min! ) # # Experiment control sequence # long_wait = wait("8h") monitored_sequence = sequence( log("Injecting reagent"), set(r.pump2.input, "reagent"), long_wait, log("Stop injecting reagent"), set(r.pump2.input, "solvent") ) monitored_sequence.dependents.add(vapourtec_monitor) monitored_sequence.dependents.add(collect_monitor) rxn = sequence( log("Starting up..."), set(r.pressure_limit, 25000), set(r.heater3.target, 100), set(r.pump2.target, vapourtec_flow_rate), set(r.power, "on"), wait_until(r.heater3.temp > 95), wait_until(ir.product > ir_start_trigger), set(r.output, "collect"), monitored_sequence, wait_until(ir.product < ir_low_trigger), wait("20m"), set(r.power, "off"), log("Complete") ) run(rxn)