import os, traceback, ctypes, csv, re, platform
import tkinter as tk
from tkinter import ttk
from tkinter import filedialog
from multiprocessing import Process, Value, Queue, set_start_method
from tkinter import ttk
import matplotlib
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import numpy as np
import psutil
import GPUtil
from collections import deque
import tracemalloc
try:
ctypes.windll.shcore.SetProcessDpiAwareness(True)
except AttributeError:
pass
from .gui_elements import spacrProgressBar, spacrButton, spacrFrame, spacrDropdownMenu , spacrSlider, set_dark_style
# Define global variables
[docs]
thread_control = {"run_thread": None, "stop_requested": False}
[docs]
def toggle_settings(button_scrollable_frame):
global vars_dict
from .settings import categories
from .gui_utils import hide_all_settings
if vars_dict is None:
raise ValueError("vars_dict is not initialized.")
active_categories = set()
def toggle_category(settings):
for setting in settings:
if setting in vars_dict:
label, widget, _, frame = vars_dict[setting]
if widget.grid_info():
label.grid_remove()
widget.grid_remove()
frame.grid_remove()
else:
label.grid()
widget.grid()
frame.grid()
def on_category_select(selected_category):
if selected_category == "Select Category":
return
if selected_category in categories:
toggle_category(categories[selected_category])
if selected_category in active_categories:
active_categories.remove(selected_category)
else:
active_categories.add(selected_category)
category_dropdown.update_styles(active_categories)
category_var.set("Select Category")
category_var = tk.StringVar()
non_empty_categories = [category for category, settings in categories.items() if any(setting in vars_dict for setting in settings)]
category_dropdown = spacrDropdownMenu(button_scrollable_frame.scrollable_frame, category_var, non_empty_categories, command=on_category_select)
category_dropdown.grid(row=0, column=4, sticky="ew", pady=2, padx=2)
vars_dict = hide_all_settings(vars_dict, categories)
#clear_unused_figures()
[docs]
def process_fig_queue():
global canvas, fig_queue, canvas_widget, parent_frame, uppdate_frequency, figures, figure_index, index_control
from .gui_elements import standardize_figure
try:
while not fig_queue.empty():
fig = fig_queue.get_nowait()
if fig is None:
print("Warning: Retrieved a None figure from fig_queue.")
continue
# Standardize the figure appearance before adding it
fig = standardize_figure(fig)
figures.append(fig)
# OPTIONAL: Cap the size of the figures deque at 100
MAX_FIGURES = 100
while len(figures) > MAX_FIGURES:
# Discard the oldest figure
old_fig = figures.popleft()
# If needed, you could also close the figure to free memory:
matplotlib.pyplot.close(old_fig)
# Update slider maximum
index_control.set_to(len(figures) - 1)
# If no figure has been displayed yet
if figure_index == -1:
figure_index = 0
display_figure(figures[figure_index])
index_control.set(figure_index)
except Exception as e:
print("Exception in process_fig_queue:", e)
traceback.print_exc()
finally:
# Schedule process_fig_queue() to run again
after_id = canvas_widget.after(uppdate_frequency, process_fig_queue)
parent_frame.after_tasks.append(after_id)
[docs]
def setup_plot_section(vertical_container, settings_type):
global canvas, canvas_widget, figures, figure_index, index_control
from .gui_utils import display_media_in_plot_frame
style_out = set_dark_style(ttk.Style())
bg = style_out['bg_color']
fg = style_out['fg_color']
# Initialize deque for storing figures and the current index
figures = deque()
figure_index = -1 # Start with no figure displayed
# Create a frame for the plot section
plot_frame = tk.Frame(vertical_container)
plot_frame.configure(bg=bg)
vertical_container.add(plot_frame, stretch="always")
# Clear the plot_frame (optional)
for widget in plot_frame.winfo_children():
widget.destroy()
# Create a figure and plot (initial figure)
figure = Figure(figsize=(30, 4), dpi=100)
plot = figure.add_subplot(111)
plot.plot([], [])
plot.axis('off')
if settings_type == 'map_barcodes':
current_dir = os.path.dirname(__file__)
resources_path = os.path.join(current_dir, 'resources', 'icons')
gif_path = os.path.join(resources_path, 'dna_matrix.mp4')
display_media_in_plot_frame(gif_path, plot_frame)
canvas = FigureCanvasTkAgg(figure, master=plot_frame)
canvas.get_tk_widget().configure(cursor='arrow', highlightthickness=0)
canvas_widget = canvas.get_tk_widget()
return canvas, canvas_widget
canvas = FigureCanvasTkAgg(figure, master=plot_frame)
canvas.get_tk_widget().configure(cursor='arrow', highlightthickness=0)
canvas_widget = canvas.get_tk_widget()
canvas_widget.grid(row=0, column=0, sticky="nsew")
plot_frame.grid_rowconfigure(0, weight=1)
plot_frame.grid_columnconfigure(0, weight=1)
canvas.draw()
canvas.figure = figure
figure.patch.set_facecolor(bg)
plot.set_facecolor(bg)
containers = [plot_frame]
# Create slider
control_frame = tk.Frame(plot_frame, height=15*2, bg=bg)
control_frame.grid(row=1, column=0, sticky="ew", padx=10, pady=5)
control_frame.grid_propagate(False)
index_control = spacrSlider(control_frame, from_=0, to=0, value=0, thickness=2, knob_radius=10,
position="center", show_index=True, command=update_figure)
index_control.grid(row=0, column=0, sticky="ew")
control_frame.grid_columnconfigure(0, weight=1)
widgets = [canvas_widget, index_control]
style = ttk.Style(vertical_container)
_ = set_dark_style(style, containers=containers, widgets=widgets)
# Now ensure the first figure is displayed and recognized:
figures.append(figure)
figure_index = 0
display_figure(figures[figure_index])
index_control.set_to(len(figures) - 1) # Slider max = 0 in this case, since there's only one figure
index_control.set(figure_index) # Set slider to 0 to indicate the first figure
return canvas, canvas_widget
[docs]
def set_globals(thread_control_var, q_var, console_output_var, parent_frame_var, vars_dict_var, canvas_var, canvas_widget_var, scrollable_frame_var, fig_queue_var, progress_bar_var, usage_bars_var):
global thread_control, q, console_output, parent_frame, vars_dict, canvas, canvas_widget, scrollable_frame, fig_queue, progress_bar, usage_bars
thread_control = thread_control_var
q = q_var
console_output = console_output_var
parent_frame = parent_frame_var
vars_dict = vars_dict_var
canvas = canvas_var
canvas_widget = canvas_widget_var
scrollable_frame = scrollable_frame_var
fig_queue = fig_queue_var
#figures = figures_var
#figure_index = figure_index_var
#index_control = index_control_var
progress_bar = progress_bar_var
usage_bars = usage_bars_var
[docs]
def import_settings(settings_type='mask'):
global vars_dict, scrollable_frame, button_scrollable_frame
from .gui_utils import convert_settings_dict_for_gui, hide_all_settings
from .settings import generate_fields, set_default_settings_preprocess_generate_masks, get_measure_crop_settings, set_default_train_test_model
from .settings import set_default_generate_barecode_mapping, set_default_umap_image_settings, get_analyze_recruitment_default_settings
from .settings import get_default_generate_activation_map_settings, get_analyze_plaque_settings
def read_settings_from_csv(csv_file_path):
settings = {}
with open(csv_file_path, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
key = row['Key']
value = row['Value']
settings[key] = value
return settings
def update_settings_from_csv(variables, csv_settings):
new_settings = variables.copy() # Start with a copy of the original settings
for key, value in csv_settings.items():
if key in new_settings:
# Get the variable type and options from the original settings
var_type, options, _ = new_settings[key]
# Update the default value with the CSV value, keeping the type and options unchanged
new_settings[key] = (var_type, options, value)
return new_settings
csv_file_path = filedialog.askopenfilename(filetypes=[("CSV files", "*.csv")])
if not csv_file_path: # If no file is selected, return early
return
#vars_dict = hide_all_settings(vars_dict, categories=None)
csv_settings = read_settings_from_csv(csv_file_path)
if settings_type == 'mask':
settings = set_default_settings_preprocess_generate_masks(settings={})
elif settings_type == 'measure':
settings = get_measure_crop_settings(settings={})
elif settings_type == 'classify':
settings = set_default_train_test_model(settings={})
elif settings_type == 'sequencing':
settings = set_default_generate_barecode_mapping(settings={})
elif settings_type == 'umap':
settings = set_default_umap_image_settings(settings={})
elif settings_type == 'recruitment':
settings = get_analyze_recruitment_default_settings(settings={})
elif settings_type == 'activation':
settings = get_default_generate_activation_map_settings(settings={})
elif settings_type == 'analyze_plaques':
settings = get_analyze_plaque_settings(settings={})
elif settings_type == 'convert':
settings = {}
else:
raise ValueError(f"Invalid settings type: {settings_type}")
variables = convert_settings_dict_for_gui(settings)
new_settings = update_settings_from_csv(variables, csv_settings)
vars_dict = generate_fields(new_settings, scrollable_frame)
vars_dict = hide_all_settings(vars_dict, categories=None)
[docs]
def setup_settings_panel(vertical_container, settings_type='mask'):
global vars_dict, scrollable_frame
from .settings import get_identify_masks_finetune_default_settings, set_default_analyze_screen, set_default_settings_preprocess_generate_masks
from .settings import get_measure_crop_settings, deep_spacr_defaults, set_default_generate_barecode_mapping, set_default_umap_image_settings
from .settings import get_map_barcodes_default_settings, get_analyze_recruitment_default_settings, get_check_cellpose_models_default_settings, get_analyze_plaque_settings
from .settings import generate_fields, get_perform_regression_default_settings, get_train_cellpose_default_settings, get_default_generate_activation_map_settings
from .gui_utils import convert_settings_dict_for_gui
from .gui_elements import set_element_size
size_dict = set_element_size()
settings_width = size_dict['settings_width']
# Create a PanedWindow for the settings panel
settings_paned_window = tk.PanedWindow(vertical_container, orient=tk.HORIZONTAL, width=size_dict['settings_width'])
vertical_container.add(settings_paned_window, stretch="always")
settings_frame = tk.Frame(settings_paned_window, width=settings_width)
settings_frame.pack_propagate(False) # Prevent the frame from resizing based on its children
settings_paned_window.add(settings_frame)
scrollable_frame = spacrFrame(settings_frame)
scrollable_frame.grid(row=1, column=0, sticky="nsew")
settings_frame.grid_rowconfigure(1, weight=1)
settings_frame.grid_columnconfigure(0, weight=1)
if settings_type == 'mask':
settings = set_default_settings_preprocess_generate_masks(settings={})
elif settings_type == 'measure':
settings = get_measure_crop_settings(settings={})
elif settings_type == 'classify':
settings = deep_spacr_defaults(settings={})
elif settings_type == 'umap':
settings = set_default_umap_image_settings(settings={})
elif settings_type == 'train_cellpose':
settings = get_train_cellpose_default_settings(settings={})
elif settings_type == 'ml_analyze':
settings = set_default_analyze_screen(settings={})
elif settings_type == 'cellpose_masks':
settings = get_identify_masks_finetune_default_settings(settings={})
elif settings_type == 'cellpose_all':
settings = get_check_cellpose_models_default_settings(settings={})
elif settings_type == 'map_barcodes':
settings = set_default_generate_barecode_mapping(settings={})
elif settings_type == 'regression':
settings = get_perform_regression_default_settings(settings={})
elif settings_type == 'recruitment':
settings = get_analyze_recruitment_default_settings(settings={})
elif settings_type == 'activation':
settings = get_default_generate_activation_map_settings(settings={})
elif settings_type == 'analyze_plaques':
settings = get_analyze_plaque_settings(settings={})
elif settings_type == 'convert':
settings = {'src':'path to images'}
else:
raise ValueError(f"Invalid settings type: {settings_type}")
variables = convert_settings_dict_for_gui(settings)
vars_dict = generate_fields(variables, scrollable_frame)
containers = [settings_frame]
widgets = [scrollable_frame]
style = ttk.Style(vertical_container)
_ = set_dark_style(style, containers=containers, widgets=widgets)
print("Settings panel setup complete")
return scrollable_frame, vars_dict
[docs]
def setup_console(vertical_container):
global console_output
from .gui_elements import set_dark_style
# Apply dark style and get style output
style = ttk.Style()
style_out = set_dark_style(style)
# Create a frame for the console section
console_frame = tk.Frame(vertical_container, bg=style_out['bg_color'])
vertical_container.add(console_frame, stretch="always")
# Create a thicker frame at the top for the hover effect
top_border = tk.Frame(console_frame, height=5, bg=style_out['bg_color'])
top_border.grid(row=0, column=0, sticky="ew", pady=(0, 2))
# Create the scrollable frame (which is a Text widget) with white text
family = style_out['font_family']
font_size = style_out['font_size']
font_loader = style_out['font_loader']
console_output = tk.Text(console_frame, bg=style_out['bg_color'], fg=style_out['fg_color'], font=font_loader.get_font(size=font_size), bd=0, highlightthickness=0)
console_output.grid(row=1, column=0, sticky="nsew") # Use grid for console_output
# Configure the grid to allow expansion
console_frame.grid_rowconfigure(1, weight=1)
console_frame.grid_columnconfigure(0, weight=1)
def on_enter(event):
top_border.config(bg=style_out['active_color'])
def on_leave(event):
top_border.config(bg=style_out['bg_color'])
#def on_enter_key(event):
# user_input = console_output.get("1.0", "end-1c").strip() # Get the user input from the console
# if user_input:
# # Print the user input with the (user) tag
# console_output.insert("end", f"\n(user): {user_input}\n")
#
# # Get the AI response from the chatbot
# response = chatbot.ask_question(user_input)
#
# # Print the AI response with the (ai) tag
# console_output.insert("end", f"(ai): {response}\n")
#
# console_output.see("end") # Scroll to the end
# #console_output.delete("1.0", "end") # Clear the input field
# return "break" # Prevent the default behavior of inserting a new line
console_output.bind("<Enter>", on_enter)
console_output.bind("<Leave>", on_leave)
#console_output.bind("<Return>", on_enter_key)
return console_output, console_frame
[docs]
def setup_usage_panel(horizontal_container, btn_col, uppdate_frequency):
global usage_bars
from .gui_elements import set_dark_style, set_element_size
usg_col = 1
def update_usage(ram_bar, vram_bar, gpu_bar, usage_bars, parent_frame):
# Update RAM usage
ram_usage = psutil.virtual_memory().percent
ram_bar['value'] = ram_usage
# Update GPU and VRAM usage
gpus = GPUtil.getGPUs()
if gpus:
gpu = gpus[0]
vram_usage = gpu.memoryUtil * 100
gpu_usage = gpu.load * 100
vram_bar['value'] = vram_usage
gpu_bar['value'] = gpu_usage
# Update CPU usage for each core
cpu_percentages = psutil.cpu_percent(percpu=True)
for bar, usage in zip(usage_bars[3:], cpu_percentages):
bar['value'] = usage
# Schedule the function to run again after 1000 ms (1 second)
parent_frame.after(uppdate_frequency, update_usage, ram_bar, vram_bar, gpu_bar, usage_bars, parent_frame)
size_dict = set_element_size()
usage_panel_height = size_dict['panel_height']
usage_frame = tk.Frame(horizontal_container, height=usage_panel_height)
horizontal_container.add(usage_frame)
usage_frame.grid_rowconfigure(0, weight=0)
usage_frame.grid_rowconfigure(1, weight=1)
usage_frame.grid_columnconfigure(0, weight=1)
usage_frame.grid_columnconfigure(1, weight=1)
usage_scrollable_frame = spacrFrame(usage_frame, scrollbar=False)
usage_scrollable_frame.grid(row=1, column=0, sticky="nsew", columnspan=2)
widgets = [usage_scrollable_frame.scrollable_frame]
usage_bars = []
max_elements_per_column = 5
row = 0
col = 0
# Initialize RAM, VRAM, and GPU bars as None
ram_bar, vram_bar, gpu_bar = None, None, None
# Configure the style for the label
style = ttk.Style()
style_out = set_dark_style(style)
font_loader = style_out['font_loader']
font_size = style_out['font_size'] - 2
style.configure("usage.TLabel", font=font_loader.get_font(size=font_size), foreground=style_out['fg_color'])
# Try adding RAM bar
try:
ram_info = psutil.virtual_memory()
ram_label_text = f"RAM"
label = tk.Label(usage_scrollable_frame.scrollable_frame,text=ram_label_text,anchor='w',font=font_loader.get_font(size=font_size),bg=style_out['bg_color'],fg=style_out['fg_color'])
label.grid(row=row, column=2 * col, pady=5, padx=5, sticky='w')
ram_bar = spacrProgressBar(usage_scrollable_frame.scrollable_frame, orient='horizontal', mode='determinate', length=size_dict['bar_size'], label=False)
ram_bar.grid(row=row, column=2 * col + 1, pady=5, padx=5, sticky='ew')
widgets.append(label)
widgets.append(ram_bar)
usage_bars.append(ram_bar)
row += 1
except Exception as e:
print(f"Could not add RAM usage bar: {e}")
# Try adding VRAM and GPU usage bars
try:
gpus = GPUtil.getGPUs()
if gpus:
gpu = gpus[0]
vram_label_text = f"VRAM"
label = tk.Label(usage_scrollable_frame.scrollable_frame,text=vram_label_text,anchor='w',font=font_loader.get_font(size=font_size),bg=style_out['bg_color'],fg=style_out['fg_color'])
label.grid(row=row, column=2 * col, pady=5, padx=5, sticky='w')
vram_bar = spacrProgressBar(usage_scrollable_frame.scrollable_frame, orient='horizontal', mode='determinate', length=size_dict['bar_size'], label=False)
vram_bar.grid(row=row, column=2 * col + 1, pady=5, padx=5, sticky='ew')
widgets.append(label)
widgets.append(vram_bar)
usage_bars.append(vram_bar)
row += 1
gpu_label_text = f"GPU"
label = tk.Label(usage_scrollable_frame.scrollable_frame,text=gpu_label_text,anchor='w',font=font_loader.get_font(size=font_size),bg=style_out['bg_color'],fg=style_out['fg_color'])
label.grid(row=row, column=2 * col, pady=5, padx=5, sticky='w')
gpu_bar = spacrProgressBar(usage_scrollable_frame.scrollable_frame, orient='horizontal', mode='determinate', length=size_dict['bar_size'], label=False)
gpu_bar.grid(row=row, column=2 * col + 1, pady=5, padx=5, sticky='ew')
widgets.append(label)
widgets.append(gpu_bar)
usage_bars.append(gpu_bar)
row += 1
except Exception as e:
print(f"Could not add VRAM or GPU usage bars: {e}")
# Add CPU core usage bars
try:
cpu_cores = psutil.cpu_count(logical=True)
cpu_freq = psutil.cpu_freq()
for core in range(cpu_cores):
if row > 0 and row % max_elements_per_column == 0:
col += 1
row = 0
label = tk.Label(usage_scrollable_frame.scrollable_frame,text=f"C{core+1}",anchor='w',font=font_loader.get_font(size=font_size),bg=style_out['bg_color'],fg=style_out['fg_color'])
label.grid(row=row, column=2 * col, pady=2, padx=5, sticky='w')
bar = spacrProgressBar(usage_scrollable_frame.scrollable_frame, orient='horizontal', mode='determinate', length=size_dict['bar_size'], label=False)
bar.grid(row=row, column=2 * col + 1, pady=2, padx=5, sticky='ew')
widgets.append(label)
widgets.append(bar)
usage_bars.append(bar)
row += 1
except Exception as e:
print(f"Could not add CPU core usage bars: {e}")
style = ttk.Style(horizontal_container)
_ = set_dark_style(style, containers=[usage_frame], widgets=widgets)
if ram_bar is None:
ram_bar = spacrProgressBar(usage_scrollable_frame.scrollable_frame, orient='horizontal', mode='determinate', length=size_dict['bar_size'], label=False)
if vram_bar is None:
vram_bar = spacrProgressBar(usage_scrollable_frame.scrollable_frame, orient='horizontal', mode='determinate', length=size_dict['bar_size'], label=False)
if gpu_bar is None:
gpu_bar = spacrProgressBar(usage_scrollable_frame.scrollable_frame, orient='horizontal', mode='determinate', length=size_dict['bar_size'], label=False)
update_usage(ram_bar, vram_bar, gpu_bar, usage_bars, usage_frame)
return usage_scrollable_frame, usage_bars, usg_col
[docs]
def initiate_abort():
global thread_control, q, parent_frame
if thread_control.get("run_thread") is not None:
try:
#q.put("Aborting processes...")
thread_control.get("run_thread").terminate()
thread_control["run_thread"] = None
q.put("Processes aborted.")
except Exception as e:
q.put(f"Error aborting process: {e}")
thread_control = {"run_thread": None, "stop_requested": False}
[docs]
def check_src_folders_files(settings, settings_type, q):
"""
Checks if 'src' is a key in the settings dictionary and if it exists as a valid path.
If 'src' is a list, iterates through the list and checks each path.
If any path is missing, prompts the user to edit or remove invalid paths.
"""
request_stop = False
def _folder_has_images(folder_path, image_extensions = {".png", ".jpg", ".jpeg", ".bmp", ".gif", ".tiff", ".tif", ".webp", ".npy", ".npz", "nd2", "czi", "lif"}):
"""Check if a folder contains any image files."""
return any(file.lower().endswith(tuple(image_extensions)) for file in os.listdir(folder_path))
def _has_folder(parent_folder, sub_folder="measure"):
"""Check if a specific sub-folder exists inside the given folder."""
return os.path.isdir(os.path.join(parent_folder, sub_folder))
from .utils import normalize_src_path, generate_image_path_map
settings['src'] = normalize_src_path(settings['src'])
src_value = settings.get("src")
# **Skip if 'src' is missing**
if src_value is None:
return request_stop
# Convert single string src to a list for uniform handling
if isinstance(src_value, str):
src_list = [src_value]
elif isinstance(src_value, list):
src_list = src_value
else:
request_stop = True
return request_stop # Ensure early exit
# Identify missing paths
missing_paths = {i: path for i, path in enumerate(src_list) if not os.path.exists(path)}
if missing_paths:
q.put(f'Error: The following paths are missing: {missing_paths}')
request_stop = True
return request_stop # Ensure early exit
conditions = [True] # Initialize conditions list
for path in src_list: # Fixed: Use src_list instead of src_value
if settings_type == 'mask':
if settings['consolidate']:
image_map = generate_image_path_map(path)
if len(image_map) > 0:
request_stop = False
return request_stop
else:
q.put(f"Error: Missing subfolders with images for: {path}")
request_stop = True
return request_stop
else:
pictures_continue = _folder_has_images(path)
folder_chan_continue = _has_folder(path, "1")
folder_stack_continue = _has_folder(path, "stack")
folder_npz_continue = _has_folder(path, "norm_channel_stack")
if not pictures_continue:
if not any([folder_chan_continue, folder_stack_continue, folder_npz_continue]):
if not folder_chan_continue:
q.put(f"Error: Missing channel folder in folder: {path}")
if not folder_stack_continue:
q.put(f"Error: Missing stack folder in folder: {path}")
if not folder_npz_continue:
q.put(f"Error: Missing norm_channel_stack folder in folder: {path}")
else:
q.put(f"Error: No images in folder: {path}")
#q.put(f"path:{path}")
#q.put(f"pictures_continue:{pictures_continue}, folder_chan_continue:{folder_chan_continue}, folder_stack_continue:{folder_stack_continue}, folder_npz_continue:{folder_npz_continue}")
conditions = [pictures_continue, folder_chan_continue, folder_stack_continue, folder_npz_continue]
if settings_type == 'measure':
if not os.path.basename(path) == 'merged':
path = os.path.join(path, "merged")
npy_continue = _folder_has_images(path, image_extensions={".npy"})
conditions = [npy_continue]
#if settings_type == 'recruitment':
# if not os.path.basename(path) == 'measurements':
# path = os.path.join(path, "measurements")
# db_continue = _folder_has_images(path, image_extensions={".db"})
# conditions = [db_continue]
#if settings_type == 'umap':
# if not os.path.basename(path) == 'measurements':
# path = os.path.join(path, "measurements")
# db_continue = _folder_has_images(path, image_extensions={".db"})
# conditions = [db_continue]
#if settings_type == 'analyze_plaques':
# if not os.path.basename(path) == 'measurements':
# path = os.path.join(path, "measurements")
# db_continue = _folder_has_images(path, image_extensions={".db"})
# conditions = [db_continue]
#if settings_type == 'map_barcodes':
# if not os.path.basename(path) == 'measurements':
# path = os.path.join(path, "measurements")
# db_continue = _folder_has_images(path, image_extensions={".db"})
# conditions = [db_continue]
#if settings_type == 'regression':
# if not os.path.basename(path) == 'measurements':
# path = os.path.join(path, "measurements")
# db_continue = _folder_has_images(path, image_extensions={".db"})
# conditions = [db_continue]
#if settings_type == 'classify':
# if not os.path.basename(path) == 'measurements':
# path = os.path.join(path, "measurements")
# db_continue = _folder_has_images(path, image_extensions={".db"})
# conditions = [db_continue]
#if settings_type == 'analyze_plaques':
# if not os.path.basename(path) == 'measurements':
# path = os.path.join(path, "measurements")
# db_continue = _folder_has_images(path, image_extensions={".db"})
# conditions = [db_continue]
if not any(conditions):
q.put(f"Error: The following path(s) is missing images or folders: {path}")
request_stop = True
return request_stop
[docs]
def start_process(q=None, fig_queue=None, settings_type='mask'):
global thread_control, vars_dict, parent_frame
from .settings import check_settings, expected_types
from .gui_utils import run_function_gui, set_cpu_affinity, initialize_cuda, display_gif_in_plot_frame, print_widget_structure
if q is None:
q = Queue()
if fig_queue is None:
fig_queue = Queue()
try:
settings, errors = check_settings(vars_dict, expected_types, q)
if len(errors) > 0:
return
if check_src_folders_files(settings, settings_type, q):
return
except ValueError as e:
q.put(f"Error: {e}")
return
if isinstance(thread_control, dict) and thread_control.get("run_thread") is not None:
initiate_abort()
stop_requested = Value('i', 0)
thread_control["stop_requested"] = stop_requested
# Initialize CUDA in the main process
initialize_cuda()
process_args = (settings_type, settings, q, fig_queue, stop_requested)
if settings_type in ['mask', 'umap', 'measure', 'simulation', 'sequencing', 'classify', 'analyze_plaques',
'cellpose_dataset', 'train_cellpose', 'ml_analyze', 'cellpose_masks', 'cellpose_all',
'map_barcodes', 'regression', 'recruitment', 'cellpose_compare', 'vision_scores',
'vision_dataset', 'convert']:
# Start the process
process = Process(target=run_function_gui, args=process_args)
process.start()
# Set CPU affinity if necessary
set_cpu_affinity(process)
# Store the process in thread_control for future reference
thread_control["run_thread"] = process
else:
q.put(f"Error: Unknown settings type '{settings_type}'")
return
[docs]
def process_console_queue():
global q, console_output, parent_frame, progress_bar, process_console_queue
# Initialize function attribute if it doesn't exist
if not hasattr(process_console_queue, "completed_tasks"):
process_console_queue.completed_tasks = []
if not hasattr(process_console_queue, "current_maximum"):
process_console_queue.current_maximum = None
ansi_escape_pattern = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
spacing = 5
# **Configure styles for different message types**
console_output.tag_configure("error", foreground="red", spacing3 = spacing)
console_output.tag_configure("warning", foreground="orange", spacing3 = spacing)
console_output.tag_configure("normal", foreground="white", spacing3 = spacing)
while not q.empty():
message = q.get_nowait()
clean_message = ansi_escape_pattern.sub('', message)
# **Detect Error Messages (Red)**
if clean_message.startswith("Error:"):
console_output.insert(tk.END, clean_message + "\n", "error")
console_output.see(tk.END)
#print("Run aborted due to error:", clean_message) # Debug message
#return # **Exit immediately to stop further execution**
# **Detect Warning Messages (Orange)**
elif clean_message.startswith("Warning:"):
console_output.insert(tk.END, clean_message + "\n", "warning")
# **Process Progress Messages Normally**
elif clean_message.startswith("Progress:"):
try:
# Extract the progress information
match = re.search(r'Progress: (\d+)/(\d+), operation_type: ([\w\s]*),(.*)', clean_message)
if match:
current_progress = int(match.group(1))
total_progress = int(match.group(2))
operation_type = match.group(3).strip()
additional_info = match.group(4).strip() # Capture everything after operation_type
# Check if the maximum value has changed
if process_console_queue.current_maximum != total_progress:
process_console_queue.current_maximum = total_progress
process_console_queue.completed_tasks = []
# Add the task to the completed set
process_console_queue.completed_tasks.append(current_progress)
# Calculate the unique progress count
unique_progress_count = len(np.unique(process_console_queue.completed_tasks))
# Update the progress bar
if progress_bar:
progress_bar['maximum'] = total_progress
progress_bar['value'] = unique_progress_count
# Store operation type and additional info
if operation_type:
progress_bar.operation_type = operation_type
progress_bar.additional_info = additional_info
# Update the progress label
if progress_bar.progress_label:
progress_bar.update_label()
# Clear completed tasks when progress is complete
if unique_progress_count >= total_progress:
process_console_queue.completed_tasks.clear()
except Exception as e:
print(f"Error parsing progress message: {e}")
# **Insert Normal Messages with Extra Line Spacing**
else:
console_output.insert(tk.END, clean_message + "\n", "normal")
console_output.see(tk.END)
# **Continue processing if no error was detected**
after_id = console_output.after(uppdate_frequency, process_console_queue)
parent_frame.after_tasks.append(after_id)
[docs]
def process_console_queue_v2():
global q, console_output, parent_frame, progress_bar, process_console_queue
# Initialize function attribute if it doesn't exist
if not hasattr(process_console_queue, "completed_tasks"):
process_console_queue.completed_tasks = []
if not hasattr(process_console_queue, "current_maximum"):
process_console_queue.current_maximum = None
ansi_escape_pattern = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
while not q.empty():
message = q.get_nowait()
clean_message = ansi_escape_pattern.sub('', message)
# **Abort Execution if an Error Message is Detected**
if clean_message.startswith("Error:"):
console_output.insert(tk.END, clean_message + "\n", "error")
console_output.see(tk.END)
print("Run aborted due to error:", clean_message) # Debug message
return # **Exit immediately to stop further execution**
# Check if the message contains progress information
if clean_message.startswith("Progress:"):
try:
# Extract the progress information
match = re.search(r'Progress: (\d+)/(\d+), operation_type: ([\w\s]*),(.*)', clean_message)
if match:
current_progress = int(match.group(1))
total_progress = int(match.group(2))
operation_type = match.group(3).strip()
additional_info = match.group(4).strip() # Capture everything after operation_type
# Check if the maximum value has changed
if process_console_queue.current_maximum != total_progress:
process_console_queue.current_maximum = total_progress
process_console_queue.completed_tasks = []
# Add the task to the completed set
process_console_queue.completed_tasks.append(current_progress)
# Calculate the unique progress count
unique_progress_count = len(np.unique(process_console_queue.completed_tasks))
# Update the progress bar
if progress_bar:
progress_bar['maximum'] = total_progress
progress_bar['value'] = unique_progress_count
# Store operation type and additional info
if operation_type:
progress_bar.operation_type = operation_type
progress_bar.additional_info = additional_info
# Update the progress label
if progress_bar.progress_label:
progress_bar.update_label()
# Clear completed tasks when progress is complete
if unique_progress_count >= total_progress:
process_console_queue.completed_tasks.clear()
except Exception as e:
print(f"Error parsing progress message: {e}")
else:
# Insert non-progress messages into the console
console_output.insert(tk.END, clean_message + "\n")
console_output.see(tk.END)
# **Continue processing if no error was detected**
after_id = console_output.after(uppdate_frequency, process_console_queue)
parent_frame.after_tasks.append(after_id)
[docs]
def main_thread_update_function(root, q, fig_queue, canvas_widget):
global uppdate_frequency
try:
while not q.empty():
message = q.get_nowait()
except Exception as e:
print(f"Error updating GUI canvas: {e}")
finally:
root.after(uppdate_frequency, lambda: main_thread_update_function(root, q, fig_queue, canvas_widget))
[docs]
def cleanup_previous_instance():
"""
Cleans up resources from the previous application instance.
"""
global parent_frame, usage_bars, figures, figure_index, thread_control, canvas, q, fig_queue
# 1. Destroy all widgets in the parent frame
if parent_frame is not None:
for widget in parent_frame.winfo_children():
try:
widget.destroy()
except Exception as e:
print(f"Error destroying widget: {e}")
parent_frame.update_idletasks()
parent_frame = None
# 2. Cancel all pending `after` tasks
if parent_frame is not None:
parent_window = parent_frame.winfo_toplevel()
if hasattr(parent_window, 'after_tasks'):
for after_id in parent_window.after_tasks:
parent_window.after_cancel(after_id)
parent_window.after_tasks = []
# 3. Clear global queues
if q is not None:
while not q.empty():
q.get()
q = None
if fig_queue is not None:
while not fig_queue.empty():
fig_queue.get()
fig_queue = None
# 4. Stop and reset global thread control
if thread_control is not None:
thread_control['stop'] = True
#thread_control = None
# 5. Reset usage bars, figures, and indices
usage_bars = []
figures = deque()
figure_index = -1
# 6. Clear canvas or other visualizations
if canvas is not None:
try:
if hasattr(canvas, 'figure'): # Check if it's a FigureCanvasTkAgg
canvas.figure.clear() # Clear the Matplotlib figure
canvas.get_tk_widget().destroy() # Destroy the Tkinter widget
else:
# Assume it's a standard Tkinter Canvas
canvas.delete("all")
except Exception as e:
print(f"Error clearing canvas: {e}")
canvas = None
print("Previous instance cleaned up successfully.")
[docs]
def initiate_root(parent, settings_type='mask'):
"""
Initializes the root window and sets up the GUI components based on the specified settings type.
Args:
parent (tkinter.Tk or tkinter.Toplevel): The parent window for the GUI.
settings_type (str, optional): The type of settings to be displayed in the GUI. Defaults to 'mask'.
Returns:
tuple: A tuple containing the parent frame and the dictionary of variables used in the GUI.
"""
global q, fig_queue, thread_control, parent_frame, scrollable_frame, button_frame, vars_dict, canvas, canvas_widget, button_scrollable_frame, progress_bar, uppdate_frequency, figures, figure_index, index_control, usage_bars
# Clean up any previous instance
cleanup_previous_instance()
from .gui_utils import setup_frame
from .gui_elements import create_menu_bar
from .settings import descriptions
#from .openai import Chatbot
uppdate_frequency = 500
num_cores = os.cpu_count()
#chatbot = Chatbot(api_key="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")
# Start tracemalloc and initialize global variables
tracemalloc.start()
set_start_method('spawn', force=True)
#set_start_method('forkserver', force=True)
print("Initializing root with settings_type:", settings_type)
# Initialize global variables
figures = deque()
figure_index = -1
parent_frame = parent
if not isinstance(parent_frame, (tk.Tk, tk.Toplevel)):
parent_window = parent_frame.winfo_toplevel()
else:
parent_window = parent_frame
parent_window.update_idletasks()
if not hasattr(parent_window, 'after_tasks'):
parent_window.after_tasks = []
q = Queue()
fig_queue = Queue()
parent_frame, vertical_container, horizontal_container, settings_container = setup_frame(parent_frame)
if settings_type == 'annotate':
from .app_annotate import initiate_annotation_app
initiate_annotation_app(horizontal_container)
elif settings_type == 'make_masks':
from .app_make_masks import initiate_make_mask_app
initiate_make_mask_app(horizontal_container)
else:
scrollable_frame, vars_dict = setup_settings_panel(settings_container, settings_type)
print('setup_settings_panel')
canvas, canvas_widget = setup_plot_section(vertical_container, settings_type)
console_output, _ = setup_console(vertical_container) #, chatbot)
button_scrollable_frame, btn_col = setup_button_section(horizontal_container, settings_type)
if num_cores > 12:
_, usage_bars, btn_col = setup_usage_panel(horizontal_container, btn_col, uppdate_frequency)
else:
usage_bars = []
set_globals(thread_control, q, console_output, parent_frame, vars_dict, canvas, canvas_widget, scrollable_frame, fig_queue, progress_bar, usage_bars)
description_text = descriptions.get(settings_type, "No description available for this module.")
q.put(f"Console")
q.put(f" ")
q.put(description_text)
process_console_queue()
process_fig_queue()
create_menu_bar(parent)
after_id = parent_window.after(uppdate_frequency, lambda: main_thread_update_function(parent_window, q, fig_queue, canvas_widget))
parent_window.after_tasks.append(after_id)
print("Root initialization complete")
return parent_frame, vars_dict