from octopus.transport.basic import serial, tcp from octopus.manufacturer import vapourtec, knauer, thalesnano, vici, mt from octopus.data.manipulation import Max, Min from octopus.sequence.control import StateMonitor from octopus.sequence.util import Looping, Dependent from octopus.image import source, tracker from octopus.notifier import sms from octopus.runtime import * title("Two-step Hydration and Hydrogenation") id("hydrog-twostep") r = vapourtec.R2R4(tcp("192.168.15.156", 9002)) k = knauer.K120(serial("/dev/ttyUSB0", baudrate = 9600)) h = thalesnano.HCube(serial("/dev/ttyUSB1", baudrate = 9600)) v = vici.MultiValve(serial("/dev/ttyUSB2", baudrate = 9600)) t = tracker.SingleBlobTracker(source.cv_webcam(0)) ir = mt.ICIR(tcp("192.168.15.3", 8124), stream_names = ["product"]) # # Experiment Parameters # ir_start_trigger = 200 ir_low_trigger = 170 min_height_trigger = 70 start_height_trigger = 100 max_height_trigger = 360 knauer_flow_rate = 100 vapourtec_flow_rate = 100 column_dead_volume = 3000 hcube_dead_volume = 3000 connecting_tube_vol = 1000 sms_phone_no = "447941123456" # # Logic for SMS notification # smsnotifier = sms.ClockworkSMS("enter-api-key-here") def sms_notify (msg): msg = "2 step hydrog experiment: %s" % msg smsnotifier.notify(sms_phone_no, msg) # # Logic to discard output when parameters are not within acceptable levels. # class StateFlip (Looping, Dependent): interval = 0.5 def __init__ (self, pass_fn = None, fail_fn = None): Looping.__init__(self) # Depending on the implementation of Python, # __builtins__ can be a module or its dict. try: b_set = __builtins__.set except AttributeError: b_set = __builtins__["set"] self.tests = b_set() self.pass_fn = pass_fn self.fail_fn = fail_fn self._steps = [] self._triggered = False def add (self, test): self.tests.add(test) def remove (self, test): self.tests.discard(test) def _iterate (self): if (not self._triggered) and (not all(self.tests)): try: step = self.fail_fn() except TypeError: pass step.reset() step.run() self._steps.append(step) self._triggered = True elif (not self._triggered) and (not all(self.tests)): try: step = self.pass_fn() except TypeError: pass step.reset() step.run() self._steps.append(step) self._triggered = False def _cancel (self): from twisted.internet import defer Looping._cancel(self) d = [] for step in self._steps: try: d.append(step.cancel()) except: pass return defer.getherResults(d) # # State Monitor Definition # output_valve_to_waste = set(v.position, 1) output_valve_to_collect = set(v.position, 2) intermediate_valve_to_waste = set(r.output, "waste") intermediate_valve_to_collect = set(r.output, "collect") h_cube_monitor = StateFlip( pass_fn = sequence( output_valve_to_collect, log("H-Cube parameters OK"), ), fail_fn = sequence( output_valve_to_waste, log("H-Cube parameters not OK"), ) ) h_cube_monitor.add(Min(h.column_temperature, hcube_dead_volume / knauer_flow_rate * 60) >= 95) vapourtec_monitor = StateFlip( pass_fn = sequence( intermediate_valve_to_collect, log("Vapourtec parameters OK"), ), fail_fn = sequence( intermediate_valve_to_waste, log("Vapourtec parameters not OK"), ) ) vapourtec_monitor.add(Min(r.heater3.temp, column_dead_volume / vapourtec_flow_rate * 60) > 95) vapourtec_monitor.add(Max(r.pump2.airlock, column_dead_volume / vapourtec_flow_rate * 60) < 10) vapourtec_monitor.add(ir.product > ir_low_trigger) system_monitor = StateMonitor() system_monitor.add(h.system_pressure >= 4) system_monitor.add(r.pump2.airlock < 10) system_monitor.step = sequence( log("Loss of pressure in system"), call(sms_notify, "Loss of pressure in system"), wait("5m") ) # # Experiment control sequence # hydrog = sequence( wait_until(r.pump2.input == "solvent"), log("Washing column"), cancel(vapourtec_monitor), wait((column_dead_volume / vapourtec_flow_rate * 60) + (connecting_tube_vol / vapourtec_flow_rate * 60)), parallel( sequence( set(r.output, "waste"), wait("1m"), set(r.power, "off") ), sequence( log("Using up remaining intermediate"), wait_until(t.height < min_height_trigger), cancel(h_cube_monitor), log("End of hydrogenation"), call(h.stop_release_hydrogen), set(k.power, "off") ) ) ) hydrog.dependents.add(h_cube_monitor) hydrog.dependents.add(vapourtec_monitor) hydrog.dependents.add(system_monitor) rxn = sequence( log("Starting up..."), set(r.heater3.target, 100), set(r.pump2.target, 100), set(h.hydrogen_mode, "full"), set(h.column_temperature_target, 100), set(r.power, "on"), wait_until(r.heater3.temp > 95), parallel( sequence( log("Injecting reagent"), set(r.pump2.input, "reagent"), wait("2h"), log("Stop injecting reagent"), set(r.pump2.input, "solvent"), ), sequence( log("Waiting for conversion to intermediate"), wait_until(ir.product > ir_start_trigger), log("Collecting intermediate"), set(r.output, "collect"), wait_until(t.height > start_height_trigger), log("Starting knauer pump"), set(k.rate, knauer_flow_rate), set(k.power, "on"), wait("30s"), log("Starting hydrogenation"), call(h.start_hydrogenation), log("Waiting for temperature"), wait_until(h.column_temperature >= 99), log("Waiting for stability"), wait_until(h.message == "Stable."), hydrog, ) ) ) run(rxn)