Scripting a Python batch processor with distribution and progress observing
Disclaimer: The code in this post is written on version 10.2.101000.0 of Simplygon. If you encounter this post at a later stage, some of the API calls might have changed. However, the core concepts should still remain valid.
Introduction
A crucial part of an automatic pipeline is the ability to distribute processes. With that you can complete large batches of assets much faster than if you're running them on a single machine. Distribution is a first class citizen of the Simplygon API, and in this post we will explore how to build a simple distribution over Simplygon Grid using Python.
Simplygon Grid
Simplygon Grid is a simple distribution solution that allows you to run several processing agents across a local network. When a machine starts a batch job the individual processes will be distributed to the agents to speed up the process. You can check out this quick overview video to learn more about it.
Batch processor
There is a batch processor tool in the Simplygon SDK. This allows you to easily distribute processes across Simplygon Grid, without writing any code. If you're only using setting files exported from one of our integrations together with supported asset format, that will cover your needs. However, if you want more control over the processings it might be a good idea to build your own solution. We will create a skeleton batch processor in this post.
Our script
The goal is to create a script that can take a folder with assets, start processing each of them and show the progress for all the processes.
Simplygon process progress
Simplygon processes fires of progress events with regular intervals. By attaching an observer to the process you will know how far along the processes are. In our script we will add this little class to handle that.
class SimplygonProcessObserver( Simplygon.Observer ):
def create_progress_ui(self, ui, name):
# Tells the UI to add a progress bar for this process
self.__process_name = name
ui.add_progress_bar(self.__process_name)
self.__ui = ui
def OnProgress( self, subject: Simplygon.spObject, progress: float ):
# Called from Simplygon when the process is updated
self.__ui.update_progress(self.__process_name, progress)
return True
Due to the nature of Simplygon's Python API you cannot have an __init__
function in the class. It will cause an error when it is being passed into the core API.
We want the observer to to notify a UI when progress is happening, let's add that.
Progress UI
We will use this super simple UI to handle progress.
We will not go into detail around the UI, as there isn't anyting Simplygon specific in that. You will find the code in the script at the end of this post.
Processing function
For our example we will use a simple reduction pipeline. This is the where you can expand the functionality to expand the capabilities of the script.
def reduce_asset(sg: Simplygon.ISimplygon, input_path: str, output_path: str, process_observer: Simplygon.Observer):
reduction_pipeline = sg.CreateReductionPipeline()
reduction_pipeline.AddObserver(process_observer)
reduction_settings = reduction_pipeline.GetReductionSettings()
reduction_settings.SetReductionTargetTriangleRatio(0.1)
reduction_pipeline.RunSceneFromFile(input_path, output_path, Simplygon.EPipelineRunMode_RunDistributedUsingSimplygonGrid)
In the function we're using AddObserver to add the process observer that will recieve progress events.
Running the pipeline with the flag Simplygon.EPipelineRunMode_RunDistributedUsingSimplygonGrid will instruct Simplygon to try to distribute the process over the Grid, if it is up and running. If the Grid isn't available, the fallback is to run the process locally.
Batching a folder
Now we just need a function that starts a process per asset that exists in folder. Nothing Simplygon specific in this function either. Nevertheless, here it is:
def process_assets(sg: Simplygon.ISimplygon, progress_ui: ProgressUI, input_dir: str, output_dir: str):
output_dir = os.path.join(os.path.abspath(os.getcwd()), output_dir)
os.chdir(input_dir)
for input_path in glob.glob("*.fbx"):
asset_path_without_ext = os.path.splitext(input_path.replace('\\', '/'))[0]
# Create the output path
output_path = os.path.join(output_dir, asset_path_without_ext+"_LOD.fbx")
# Create an observer and progress UI elements
process_observer = SimplygonProcessObserver()
process_observer.create_progress_ui(progress_ui, asset_path_without_ext)
# Start the worker thread running the Simplygon process
process_thread = threading.Thread(target = reduce_asset, args = (sg, input_path, output_path, process_observer))
process_thread.start()
That wraps everything up. We have created a simple skeleton for a batch processor that we can extend to meet our needs.
The Script
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from simplygon10 import simplygon_loader
from simplygon10 import Simplygon
from functools import partial
import threading
import glob,os
from tkinter import *
from tkinter.ttk import *
import queue
# Super simple UI with event handling that can show status
# on Simplygon process running in separate threads.
class ProgressUI:
progress_event_name = "<<progress_update>>"
def __init__(self, name: str):
# Set up the UI and the event handler
self.__ui_root = Tk()
self.__ui_root.title(name)
self.__progress_index = 0
self.__event_queue = queue.Queue()
self.__progress_bars = {}
handler = partial(self.__on_progress)
self.__ui_root.bind(ProgressUI.progress_event_name, handler)
def add_progress_bar(self, name: str):
# Add the UI elements for this process
label = Label(self.__ui_root, text=name)
label.grid(row = self.__progress_index, column=0, pady=10, padx=10)
bar = Progressbar(self.__ui_root, orient = HORIZONTAL, length = 100, mode = "determinate")
bar.grid(row = self.__progress_index, column=1, pady=10, padx=10)
self.__progress_bars[name] = bar
self.__progress_index += 1
def run(self):
self.__close_button = Button(self.__ui_root, text="Processing...", command=partial(self.quit))
self.__close_button.grid(row = self.__progress_index, columnspan=2, pady=10, padx=10)
self.__close_button["state"] = "disabled"
# Start the UI main loop
self.__ui_root.mainloop()
def done(self):
# Start the UI main loop
self.__close_button["state"] = "enabled"
self.__close_button["text"] = "Close"
def quit(self):
# Closes the UI
self.__ui_root.destroy()
def is_done(self):
# Returns true if all processes have been completed
for process_name in self.__progress_bars:
progress_bar = self.__progress_bars[process_name]
if progress_bar["value"] < 100:
return False
return True
def update_progress(self, process_name: str, progress: float):
# Fires an event to update the progress bar associated with the the process name
self.__event_queue.put({"name":process_name, "progress":progress})
self.__ui_root.event_generate(ProgressUI.progress_event_name, when='tail')
# Check if we can we're ready to close the UI
if self.is_done():
self.done()
def __on_progress(self, event):
# Recieves the progress updates events
update_event = self.__event_queue.get()
process_name = update_event["name"]
progress_bar = self.__progress_bars[process_name]
progress = update_event["progress"]
progress_bar["value"] = progress
# Observer class that recieves updates from the Simplygon processes and dispatches
# events to the UI. There cannot be an __init__ function the observer won't pass
# properly through to Simplygon
class SimplygonProcessObserver( Simplygon.Observer ):
def create_progress_ui(self, ui, name):
# Tells the UI to add a progress bar for this process
self.__process_name = name
ui.add_progress_bar(self.__process_name)
self.__ui = ui
def OnProgress( self, subject: Simplygon.spObject, progress: float ):
# Called from Simplygon when the process is updated
self.__ui.update_progress(self.__process_name, progress)
return True
def reduce_asset(sg: Simplygon.ISimplygon, input_path: str, output_path: str, process_observer: Simplygon.Observer):
# Simple function that reduces the incoming asset by 50% and outputs it to the specified location. This is where
# you would do the bulk of the processing work.
reduction_pipeline = sg.CreateReductionPipeline()
reduction_pipeline.AddObserver(process_observer)
reduction_settings = reduction_pipeline.GetReductionSettings()
reduction_settings.SetReductionTargetTriangleRatio(0.1)
# If Grid is up and running the processes will be distribributed on to the network.
reduction_pipeline.RunSceneFromFile(input_path, output_path, Simplygon.EPipelineRunMode_RunDistributedUsingSimplygonGrid)
def process_assets(sg: Simplygon.ISimplygon, progress_ui: ProgressUI, input_dir: str, output_dir: str):
# Make output directory absolute
output_dir = os.path.join(os.path.abspath(os.getcwd()), output_dir)
os.chdir(input_dir)
for input_path in glob.glob("*.fbx"):
# Get the file name
asset_path_without_ext = os.path.splitext(input_path.replace('\\', '/'))[0]
# Create the output path
output_path = os.path.join(output_dir, asset_path_without_ext+"_LOD.fbx")
# Create an observer and progress UI elements
process_observer = SimplygonProcessObserver()
process_observer.create_progress_ui(progress_ui, asset_path_without_ext)
# Start the worker thread running the Simplygon process
process_thread = threading.Thread(target = reduce_asset, args = (sg, input_path, output_path, process_observer))
process_thread.start()
def main():
sg = simplygon_loader.init_simplygon()
print(sg.GetVersion())
progress_ui = ProgressUI("Simplygon Process Monitor")
process_assets(sg, progress_ui, "input", "output")
progress_ui.run()
del sg
if __name__== "__main__":
main()