For Advent of Code 2019 Day 7, we end up building a feedback loop for the IntCode Computers. I probably over-engineered it again, but it’s multi-threaded, and the inputs and outputs are asynchronous. Working with the IntCode Computers is probably my favorite activity. I got to employ rxScala, and Observables to chain the inputs and outputs.
Part 1: Chained IntCode Computers
The puzzle requires you to set up multiple IntCode computers, chaining their input and output. You also have to first send in the configuration parameters to set them up. The first part, I didn’t use observables for, and I don’t have that code, any more. It was just simply taking the single output value adding it to a stack, and shipping it to the next one. Mutable stacks is how I was handling input and output for part 1. Very simple, and all my IntCode operators worked great!
Part 2: Feedback Loop!
This is where the fun part begins. The problem description now just adjusts the output of Amp E back into Amp A.
O-------O O-------O O-------O O-------O O-------O 0 -+->| Amp A |->| Amp B |->| Amp C |->| Amp D |->| Amp E |-. | O-------O O-------O O-------O O-------O O-------O | | | '--------------------------------------------------------+ | v (to thrusters)
I chose to set up each Amp on it’s own thread, and use RxScala Observables to connect them to each other. I did convert them to blocking, so my Amps will actually block that thread. In this case it’s okay. I didn’t redesign the entire IntCode engine to be a chain of observables, but I probably could. Maybe if there is another IntCode problem that builds on this, I’ll refactor it to be a chain of operations. Here’s my updated IntCode class.
package is.kow.adventofcode._2019.opcodes import com.typesafe.scalalogging.LazyLogging import rx.lang.scala.Observable import rx.lang.scala.subjects.PublishSubject import scala.annotation.tailrec import scala.concurrent.ExecutionContext class IntCode(data: List[Int]) extends LazyLogging { private val publishSubject = PublishSubject[Int]() private val outputOpCode = new OutputOpCode(publishSubject) def execute(input: Observable[Int])(implicit executionContext: ExecutionContext): Observable[Int] = { logger.debug("EXECUTE") val END_OP = EndOpCode() val OP_CODES = List( new JumpIfFalseOpCode(), new JumpIfTrueOpCode(), new LessThanOpCode(), new EqualsOpCode(), new AddOpCode(), new MultOpCode(), END_OP, outputOpCode, new InputOpCode(input.doOnNext(i => logger.info(s"NEXTINPUT: $i")).toBlocking.toIterable.iterator) ) @tailrec def process(completedOps: List[OpCode], acc: List[Int], opLocation: Int = 0): List[Int] = { val currentOp = acc.drop(opLocation).head if (END_OP.matches(currentOp)) { acc } else { val maybeOp = OP_CODES.find(_.matches(currentOp)) val operated = maybeOp.map { operation => operation.operate(opLocation, acc) } getOrElse { logger.error(s"Unknown Opcode: $currentOp @ $opLocation: $acc") throw new RuntimeException(s"Invalid Operation: $currentOp @ $opLocation") } val op = maybeOp.get process(op :: completedOps, operated.data, operated.opPointer) } } executionContext.execute(() => { try { logger.debug("IN EXECUTION CONTEXT -- STARTING") process(List.empty, data) publishSubject.onCompleted() logger.debug("ALL DONE, SENT COMPLETE") } catch { case e: Exception => logger.error("some kind of failure!", e) publishSubject.onError(e) } }) publishSubject } }
The significant changes here are that it now uses observables, and the execute happens on a provided ExecutionContext. The output is published to an observable that is created. I have to do a lot of manual chaining, so it’s possible I did the glue wrong.
Puzzle Logic
def thrusterPower(phaseSettings: List[Int], opcodes: List[Int]): Int = { //amp a, b, c, d, e val ampA = new IntCode(opcodes) val ampB = new IntCode(opcodes) val ampC = new IntCode(opcodes) val ampD = new IntCode(opcodes) val ampE = new IntCode(opcodes) implicit val executionContext: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(5)) val inputA = ReplaySubject[Int] inputA.onNext(phaseSettings(0)) inputA.onNext(0) //Initial input signal inputA.doOnNext(n => s"InputA: $n") val inputB = ReplaySubject[Int] inputB.onNext(phaseSettings(1)) inputB.doOnNext(n => s"InputB: $n") val inputC = ReplaySubject[Int] inputC.onNext(phaseSettings(2)) inputC.doOnNext(n => s"InputC: $n") val inputD = ReplaySubject[Int] inputD.onNext(phaseSettings(3)) inputD.doOnNext(n => s"InputD: $n") val inputE = ReplaySubject[Int] inputE.onNext(phaseSettings(4)) inputE.doOnNext(n => s"InputE: $n") val subs = new mutable.Stack[Subscription]() subs.push(ampA.execute(inputA).doOnNext(n => logger.debug(s"outputA: ${n}")).subscribe(out => inputB.onNext(out))) subs.push(ampB.execute(inputB).doOnNext(n => logger.debug(s"outputB: ${n}")).subscribe(out => inputC.onNext(out))) subs.push(ampC.execute(inputC).doOnNext(n => logger.debug(s"outputC: ${n}")).subscribe(out => inputD.onNext(out))) subs.push(ampD.execute(inputD).doOnNext(n => logger.debug(s"outputD: ${n}")).subscribe(out => inputE.onNext(out))) val ampEOutput = ampE.execute(inputE).doOnNext(n => logger.debug(s"outputE: ${n}")) //Last output signal from E is the final answer subs.push(ampEOutput.subscribe(out => inputA.onNext(out))) val finalOutput = ampEOutput.toBlocking.last //Complete all inputs inputA.onCompleted() inputB.onCompleted() inputC.onCompleted() inputD.onCompleted() inputE.onCompleted() //Unsubscribe all subs.foreach(sub => sub.unsubscribe()) executionContext.shutdown() finalOutput }
The above is all about setting up the wiring between the Amps. Then a small bit of cleanup code to unsubscribe and to make sure all the inputs are completed. Shut down the execution context and return that final value. I should probably pass the execution context in from another level, because I call this many times, which means I’m creating and destroying the thread pools often, once for each permutation of the puzzle.
Input/Output Observables
class InputOpCode(val input: Iterator[Int]) extends OpCode(3, 1) { override def operate(offset: Int, data: List[Int]): OperationResult = { data.drop(offset) match { case opParams :: p1 :: xs => logger.info("BEFORE TAKE BLOCKING") val value: Int = input.next() //This will cause it to block, which is fine, need a timeout, or a kill logger.info(s"INPUT VALUE: ${value}") logOperation(offset, "INPUT", codeFormat(opParams), p1, value) OperationResult(data.updated(p1, value), offset + 1 + parameterSize) } } } //Can have multiple outputs... class OutputOpCode(output: Subject[Int]) extends OpCode(4, 1) { override def operate(offset: Int, data: List[Int]): OperationResult = { data.drop(offset) match { case opParams :: p1 :: xs => val fOpParams = codeFormat(opParams) val operand1 = if (fOpParams(0) == POSITION) data(p1) else p1 logOperation(offset, "OUTPUT", codeFormat(opParams), operand1) output.onNext(operand1) OperationResult(data, offset + 1 + parameterSize) } } }
Here is the Input and Output opcodes. Input takes an Iterator, which I’m distilling from the input observable. The instructions call for the IntCode computer to block when it doesn’t have input yet, so this was the easiest way to do it. The Output Opcode just onNext’s to a Subject. Super easy. This subject and observable are set up in the IntCode computer itself.
This puzzle was immensely fun. I am really enjoying evolving this IntCode Computer, and I hope I get to do more with it, as I continue doing Advent of Code 2019!