It is done. Finally, an over a year long project has come to an end. The project started as an attempt to make a tool to analyse climbing (boulder) videos, and point out differences between them. The result? A fully asynchronous cloud based automatic analysis tool: https://www.climbalyser.com.

Damn, I don’t think I have feature creeped myself so hard in my whole life. At first attempt I tried to implement it in Clojure. However, I ran into memory problems (processing 60 Hz full HD videos is not straight forward!). Outcome of that little experiment was a “lexical reference counting” (slightly buggy) library 1 2. After that failure I just rewrote everything in Python and went on with my life. I also found a bug in the RANSAC implementation in OpenCV, which has probably been lying around since day one 3. But it was fun, and I learned a great deal from it, not just practical computer vision stuff, but also many parts of the cloud infrastructure. I’ll briefly explain some of the things used to build the tool.

As I started to look into the problem, it quickly became apparent that the videos needed to be stitched, view compensated and synchronized in time. First, each video is stitched separately using homography estimation between the feature rich walls. It’s a quick and dirty way of doing it, but it’s a fairly simple and linear method. The tricky part was getting it working with moving objects in the scene (the climber). I came up with a quite interesting, purely geometrical background estimation algorithm, maybe I’ll describe it in a separate post some day. After stitching each video separately they are piped into pose estimation. Finally, the videos and detected skeletons are synchronized in time using dynamic time warping based on matching the detected human bounding box centers, and merged into the final result. Here is “the trailer” for the project (made by my only beta tester and genius girlfriend).

The whole project runs in Google Cloud. The front end server runs serverlessly on “Google Run”, and all processing is done with jobs. The jobs are split into several tasks in order to process the videos in parallel as fast as possible, and everything is tied together using workflows. The videos are stored in buckets, and a trigger is setup on the incoming data bucket to execute the workflow, and ends by sending an email to the supplied address given in the front end. There is no database, all information is kept in the workflow and in the file names (given globally unique identifiers) in the buckets. A cloud cron job schedules cleanup of all files older than a week, once per day.

The workflow starts by extracting all the frames, which is a surprisingly expensive step to do on cloud compute hardware, so I optimized that by just doing it once and then save the frames as a tarball in a bucket. That might sound like a bad optimization, a tarball is obviously much bigger than a well compressed video, but networking is just so much cheaper and faster in the cloud than compute is, so it works. Subsequent steps can then download the tarball, extract and run on a shard of the pre-extracted data.

Since the workflow is dispatched by a trigger on the incoming data to a bucket, there is no built in control over execution order of different jobs. As we started to stress test the system it quickly failed due to the limitation on number of concurrent jobs in Google Cloud. Since tasks are assigned a slot in what essentially looks like random order, there is no guarantee on processing time for a workflow. I solved it by using a cloud queue together with a step to redispatch the workflow in case there are too many concurrent executions running. Yes, that is a recursive cloud call. At least this makes the workflows run in the order they are received by the system, a pretty good property to maintain IMO.

At the bottom of this page is the main workflow I ended up with (and this is the reason behind my YAPL rant), and partly the reason why it took such a long time to finish the project. Worst of all was that I had to maintain the cloud workflow with this YAML () “script”, AND a local workflow written in bash. It was just extremely difficult to NOT introduce a bug when going from local to cloud. And on top of that, the debugging experience, oh my, I’m not sure what the cloud people are thinking about, but debugging a workflow, print debugging YAPL

main:
  params: [event]
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - workflow_name: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_ID")}
          - workflow_execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
          - queue_dispatch_workflow_name: queue-dispatch-workflow
          - event_bucket: ${event.data.bucket}
          - event_file: ${event.data.name}
          - job_location: europe-north1
          - job_namespace: ${"namespaces/" + project_id + "/jobs/"}
          - detection_group_count: "5"
          - detection_shard_count: "2"
          - max_num_concurrent_executions: 2
    - get_executions:
        call: http.get
        args:
          url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + job_location + "/workflows/" + workflow_name + "/executions"}
          query:
            filter: 'state="ACTIVE"'
          auth:
            type: OAuth2
        result: listExecutionsResponse
    - print_executions:
        call: sys.log
        args:
            data: ${"Number of active workflow executions = " +  len(listExecutionsResponse.body.executions)}
    - check_concurrent_executions:
        switch:
          - condition: ${len(listExecutionsResponse.body.executions) > max_num_concurrent_executions}
            next: wait_and_redispatch
          - condition: true
            next: check_input_file
    - wait_and_redispatch:
        call: sys.sleep
        args:
          seconds: 10
        next: redispatch_workflow
    - redispatch_workflow:
        call: http.post
        args:
          url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + job_location + "/workflows/" + queue_dispatch_workflow_name + "/executions"}
          body:
            argument: ${json.encode_to_string(event)}
          auth:
            type: OAuth2
        next: end
    - check_input_file:
        switch:
          - condition: ${event_bucket == "climbalyser-events"}
            next: register_workflow_execution
          - condition: true
            next: end
    - register_workflow_execution:
        call: googleapis.run.v1.namespaces.jobs.run
        args:
          name: ${job_namespace + "register-workflow-execution"}
          location: ${job_location}
          body:
            overrides:
              containerOverrides:
                args:
                  - --event-file
                  - ${event_file}
                  - --project-id
                  - ${project_id}
                  - --workflow-name
                  - ${workflow_name}
                  - --workflow-execution-id
                  - ${workflow_execution_id}
                  - --location
                  - ${job_location}
    - unpack_video:
        call: googleapis.run.v1.namespaces.jobs.run
        args:
          name: ${job_namespace + "unpack-video"}
          location: ${job_location}
          body:
            overrides:
              containerOverrides:
                args:
                  - ${event_file}
                  - climbalyzer-input
                  - climbalyzer-input
    - analyse:
        parallel:
          branches:
            - stitching_branch:
                steps:
                  - stitch:
                      call: googleapis.run.v1.namespaces.jobs.run
                      args:
                        name: ${job_namespace + "stitch-v2"}
                        location: ${job_location}
                        connector_params:
                          timeout: 7200
                        body:
                          overrides:
                            timeoutSeconds: 7200
                            containerOverrides:
                              args:
                                - ${event_file}
                                - climbalyzer-input
                                - climbalyser-stitches
                                - --verbose
                      result: stitch_result
            - detection_branch:
                steps:
                  - detect:
                      call: googleapis.run.v1.namespaces.jobs.run
                      args:
                        name: ${job_namespace + "detect"}
                        location: ${job_location}
                        connector_params:
                          timeout: 7200
                        body:
                          overrides:
                            timeoutSeconds: 7200
                            containerOverrides:
                              args:
                                - ${event_file}
                                - climbalyzer-input
                                - climbalyser-detections
                                - --shards
                                - ${detection_shard_count}
                      result: detection_result
    - align:
        call: googleapis.run.v1.namespaces.jobs.run
        args:
          name: ${job_namespace + "align"}
          location: ${job_location}
          connector_params:
            timeout: 3600
          body:
            overrides:
              timeoutSeconds: 3600
              containerOverrides:
                args:
                  - ${event_file}
                  - climbalyser-stitches
                  - climbalyser-detections
                  - ${detection_group_count}
                  - climbalyzer-output
        result: job_execution
    - mail:
        call: googleapis.run.v1.namespaces.jobs.run
        args:
          name: ${job_namespace + "mail"}
          location: ${job_location}
          body:
            overrides:
              containerOverrides:
                args:
                  - climbalyzer-output
                  - ${event_bucket}
                  - ${event_file}
        result: job_execution
    - finish:
        return: ${job_execution}