
Python
import tkinter as tk
from tkinter import messagebox, scrolledtext, ttk, filedialog, simpledialog # Added simpledialog
import subprocess
import threading
import os
import platform
import shlex
# import webbrowser # Currently unused, uncomment if needed later
# import json # Currently unused, uncomment if needed later
import re
from datetime import datetime
import logging
# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class OllamaModelManager:
def __init__(self, root):
self.root = root
self.root.title("Ollama Model Manager")
self.root.geometry("850x650") # Increased size slightly
self.root.minsize(700, 500)
# Configure style
self.style = ttk.Style()
# Use theme that works well across platforms if possible
try:
if platform.system() == "Windows":
self.style.theme_use('vista') # or 'xpnative'
elif platform.system() == "Darwin":
self.style.theme_use('aqua') # 'aqua' might require specific Tk versions
else:
self.style.theme_use('clam') # A decent default
except tk.TclError:
logging.warning("Default theme not found, using fallback.")
# Fallback theme if specific ones aren't available
available_themes = self.style.theme_names()
if 'clam' in available_themes:
self.style.theme_use('clam')
elif available_themes:
# Use the first available theme as a last resort
try:
self.style.theme_use(available_themes[0])
except tk.TclError:
logging.error("No ttk themes seem to be available. UI might look basic.")
# Define colors (adjust if needed based on theme)
# Getting theme-appropriate colors can be complex, using defaults for now
bg_color = self.style.lookup('TFrame', 'background') # Get theme background
button_bg = "#0078d7" # Example color, may not match theme well
button_fg = "white" # Example color
button_active_bg = "#005fa3" # Example color
self.style.configure("TFrame", background=bg_color)
# Use default button styling initially, override may cause inconsistencies
# self.style.configure("TButton", padding=6, relief=tk.FLAT,
# background=button_bg, foreground=button_fg,
# font=("Segoe UI", 9))
# self.style.map("TButton",
# background=[("active", button_active_bg)],
# relief=[('pressed', tk.SUNKEN), ('!pressed', tk.FLAT)])
self.style.configure("TLabel", background=bg_color, font=("Segoe UI", 10))
self.style.configure("Treeview", font=("Segoe UI", 9), rowheight=25) # Adjust font/rowheight as needed
self.style.configure("Treeview.Heading", font=("Segoe UI", 10, "bold"))
self.style.configure("TLabelframe", background=bg_color, font=("Segoe UI", 10))
self.style.configure("TLabelframe.Label", background=bg_color, font=("Segoe UI", 10, "bold")) # Ensure label uses theme bg
# Setup main container with padding
self.main_frame = ttk.Frame(root, padding="10")
self.main_frame.pack(fill=tk.BOTH, expand=True)
# Title and description
title_frame = ttk.Frame(self.main_frame)
title_frame.pack(fill=tk.X, pady=(0, 10))
title_label = ttk.Label(title_frame, text="Ollama Model Manager", font=("Segoe UI", 16, "bold"))
title_label.pack(side=tk.LEFT, pady=5)
# Create a frame for the model list section (header + treeview container)
content_frame = ttk.Frame(self.main_frame)
content_frame.pack(fill=tk.BOTH, expand=True) # This frame holds list_header and tree_container_frame
# --- Model list header ---
list_header = ttk.Frame(content_frame)
list_header.pack(fill=tk.X) # Packed first in content_frame
list_title = ttk.Label(list_header, text="Available Models", font=("Segoe UI", 12, "bold"))
list_title.pack(side=tk.LEFT, pady=5)
# --- Container for Treeview and Scrollbars ---
# This frame uses grid internally, but is packed into content_frame
tree_container_frame = ttk.Frame(content_frame)
tree_container_frame.pack(fill=tk.BOTH, expand=True, pady=(5, 0)) # Packed second in content_frame
# Create Treeview and Scrollbars inside tree_container_frame
columns = ("Name", "ID", "Size", "Modified")
self.model_tree = ttk.Treeview(tree_container_frame, columns=columns, show="headings", selectmode="browse")
# Configure column headings
self.model_tree.heading("Name", text="Name", anchor=tk.W, command=lambda: self.sort_treeview("Name"))
self.model_tree.heading("ID", text="ID", anchor=tk.W) # No sorting on ID typically needed
self.model_tree.heading("Size", text="Size", anchor=tk.W, command=lambda: self.sort_treeview("Size"))
self.model_tree.heading("Modified", text="Modified", anchor=tk.W, command=lambda: self.sort_treeview("Modified"))
# Configure column widths
self.model_tree.column("Name", width=250, stretch=tk.YES)
self.model_tree.column("ID", width=150, stretch=tk.YES)
self.model_tree.column("Size", width=100, stretch=tk.NO)
self.model_tree.column("Modified", width=150, stretch=tk.NO)
# Scrollbars (parent is tree_container_frame)
tree_scroll_y = ttk.Scrollbar(tree_container_frame, orient="vertical", command=self.model_tree.yview)
tree_scroll_x = ttk.Scrollbar(tree_container_frame, orient="horizontal", command=self.model_tree.xview)
self.model_tree.configure(yscrollcommand=tree_scroll_y.set, xscrollcommand=tree_scroll_x.set)
# Grid layout *inside* tree_container_frame
self.model_tree.grid(row=0, column=0, sticky='nsew')
tree_scroll_y.grid(row=0, column=1, sticky='ns')
tree_scroll_x.grid(row=1, column=0, columnspan=2, sticky='ew') # Span under tree and Y scroll
# Configure resizing behavior *for* tree_container_frame's grid
tree_container_frame.grid_rowconfigure(0, weight=1) # Treeview row expands vertically
tree_container_frame.grid_columnconfigure(0, weight=1) # Treeview column expands horizontally
# Bind events (remain the same)
self.model_tree.bind("<Double-1>", self.on_model_double_click)
self.model_tree.bind("<<TreeviewSelect>>", self.on_model_select)
# --- Command output section ---
output_frame = ttk.LabelFrame(self.main_frame, text="Command Output / Log")
output_frame.pack(fill=tk.X, expand=False, pady=10) # Packed in main_frame
log_font = ("Consolas", 9) if platform.system() == "Windows" else ("Monaco", 10) # Monospaced font
self.output_box = scrolledtext.ScrolledText(output_frame, width=50, height=8, wrap=tk.WORD, font=log_font)
self.output_box.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.output_box.configure(state=tk.DISABLED) # Read-only
# --- Action buttons frame ---
button_frame = ttk.Frame(self.main_frame)
button_frame.pack(fill=tk.X, pady=(5, 0)) # Packed in main_frame
# Configure button frame columns to distribute space
button_frame.columnconfigure(0, weight=1) # Left-aligned buttons will push right ones
button_frame.columnconfigure(1, weight=0) # Right-aligned buttons stay right
# Left side buttons (general actions)
left_buttons = ttk.Frame(button_frame)
left_buttons.grid(row=0, column=0, sticky='w')
self.refresh_button = ttk.Button(left_buttons, text="Refresh Models", command=self.refresh_models)
self.refresh_button.pack(side=tk.LEFT, padx=(0, 5))
# Add Import Button
self.import_button = ttk.Button(left_buttons, text="Import/Pull Model", command=self.import_model)
self.import_button.pack(side=tk.LEFT, padx=5)
# Right side buttons (model-specific actions)
right_buttons = ttk.Frame(button_frame)
right_buttons.grid(row=0, column=1, sticky='e')
self.model_actions_buttons = []
# Order: Run, Copy, Show, Delete
self.run_button = ttk.Button(right_buttons, text="Run Model", command=self.run_model)
self.run_button.pack(side=tk.LEFT, padx=5)
self.model_actions_buttons.append(self.run_button)
# Changed "Export" to "Copy" as 'ollama export' is deprecated
self.copy_button = ttk.Button(right_buttons, text="Copy Model", command=self.copy_model)
self.copy_button.pack(side=tk.LEFT, padx=5)
self.model_actions_buttons.append(self.copy_button)
self.show_in_explorer_button = ttk.Button(right_buttons, text="Show Models Dir", command=self.show_models_directory)
self.show_in_explorer_button.pack(side=tk.LEFT, padx=5)
# This button doesn't depend on selection, so not added to model_actions_buttons list for disabling
self.delete_button = ttk.Button(right_buttons, text="Delete Model", command=self.on_delete)
self.delete_button.pack(side=tk.LEFT, padx=(5, 0)) # No padding on the right
self.model_actions_buttons.append(self.delete_button)
# Status bar
self.status_var = tk.StringVar()
self.status_var.set("Ready")
status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W, padding=(5, 2))
status_bar.pack(side=tk.BOTTOM, fill=tk.X)
# Initialize model actions buttons as disabled
self.toggle_model_buttons(False)
# Sorting information
self.sort_column = "Name" # Default sort
self.sort_reverse = {col: False for col in columns}
# Initially load models
self.selected_model_name = None # Store the name (e.g., llama3:latest)
self.model_data = {} # Store details keyed by model name
self.refresh_models()
# --- Methods (Log, Get Dir, Format Size, Parse Size, Sort, Toggle Buttons, Refresh, Update List, etc.) ---
# (Include all methods from the previous version here, unchanged unless noted)
def log_output(self, message, level="INFO"):
"""Append messages to the output box."""
if not isinstance(message, str):
message = str(message)
timestamp = datetime.now().strftime("%H:%M:%S")
formatted_message = f"[{timestamp}] {level}: {message.strip()}\n"
# Ensure UI updates happen on the main thread
def update_log():
self.output_box.configure(state=tk.NORMAL)
self.output_box.insert(tk.END, formatted_message)
self.output_box.see(tk.END)
self.output_box.configure(state=tk.DISABLED)
self.root.after(0, update_log) # Schedule the update
logging.log(getattr(logging, level.upper(), logging.INFO), message.strip())
def get_ollama_models_base_directory(self):
"""Get the base Ollama models directory (parent of blobs/manifests) for the current platform"""
home_dir = os.path.expanduser("~")
system = platform.system()
potential_paths = []
if system == "Windows":
# Common locations on Windows
localappdata = os.getenv('LOCALAPPDATA')
userprofile = os.getenv('USERPROFILE')
if localappdata:
# Newer installer might use C:\Users\USER\AppData\Local\Ollama
potential_paths.append(os.path.join(localappdata, "Ollama"))
if userprofile:
# Older or manual setup might use C:\Users\USER\.ollama
potential_paths.append(os.path.join(userprofile, ".ollama"))
# Add other common paths if necessary
elif system == "Darwin": # macOS
potential_paths = [
os.path.join(home_dir, ".ollama"),
# Add /Users/Shared/.ollama ? (Unlikely default)
# Add brew path if installed via brew? Requires checking brew prefix.
]
else: # Linux and others
potential_paths = [
os.path.join(home_dir, ".ollama"), # User install
"/usr/share/ollama/.ollama", # System install (location might vary by distro)
"/var/lib/ollama/.ollama", # Another possible system location
os.path.join(home_dir, ".local/share/ollama") # Flatpak? Snap? (Check ollama docs)
]
# Check which base path exists and contains a 'models' subdirectory
for path in potential_paths:
if path and os.path.isdir(path):
# Check for manifests or blobs dir existence as confirmation
models_marker_path = os.path.join(path, "models", "manifests")
if os.path.isdir(models_marker_path):
models_dir = os.path.join(path, "models")
logging.info(f"Found Ollama models directory at: {models_dir}")
return models_dir # Return the parent 'models' directory
# Fallback or error if none found
default_path = os.path.join(home_dir, ".ollama", "models")
logging.warning(f"Could not auto-detect Ollama models directory from likely paths. Falling back to default: {default_path}")
# Check if fallback exists before returning? Or just return it.
return default_path
def format_size(self, size_bytes):
"""Helper function to convert a byte count into a human-readable string."""
if not isinstance(size_bytes, (int, float)) or size_bytes < 0:
return "N/A"
if size_bytes == 0:
return "0 B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = 0
power = 1024 # Use 1024 for KiB, MiB, etc. consistent with computing
while size_bytes >= power and i < len(size_name)-1 :
size_bytes /= float(power)
i += 1
s = f"{size_bytes:.2f}"
# Remove trailing zeros and decimal point if they are '.00'
s = s.rstrip('0').rstrip('.') if '.' in s else s
return f"{s} {size_name[i]}"
def parse_size_string(self, size_str):
"""Converts human-readable size string (e.g., '4.7 GB') to bytes."""
size_str = (size_str or "").strip().upper()
match = re.match(r'([\d.]+)\s*([KMGTPEZY]?B?)', size_str) # Allow optional 'B'
if not match:
logging.warning(f"Could not parse size string: '{size_str}'")
return 0 # Cannot parse
number_str, unit = match.groups()
try:
number = float(number_str)
except ValueError:
logging.warning(f"Invalid number in size string: '{number_str}'")
return 0 # Invalid number
unit = unit.replace("B", "") # Remove 'B' if present
power = 1 # Default for Bytes
if unit == "K": power = 1024
elif unit == "M": power = 1024**2
elif unit == "G": power = 1024**3
elif unit == "T": power = 1024**4
elif unit == "P": power = 1024**5
# Add E, Z, Y if needed
elif unit and unit != 'B': # Check if unit exists but is not recognized
logging.warning(f"Unrecognized size unit: '{unit}' in '{size_str}'")
# Optionally return 0 or try to guess? Returning 0 is safer.
return 0
return int(number * power)
def sort_treeview(self, column):
"""Sort treeview by column"""
# Check if the column exists before proceeding
if column not in self.model_tree["columns"]:
logging.error(f"Attempted to sort by non-existent column: {column}")
return
try:
reverse = not self.sort_reverse.get(column, False) # Toggle direction
self.sort_reverse[column] = reverse
self.sort_column = column
items = []
for item_id in self.model_tree.get_children(''):
# Ensure column exists before getting value
if column in self.model_tree["columns"]:
value = self.model_tree.set(item_id, column)
items.append((value, item_id))
else:
logging.warning(f"Column '{column}' not found for item '{item_id}' during sort prep.")
# Define sort key based on column
if column == "Size":
# Convert size string to bytes for proper numerical sorting
items.sort(key=lambda x: self.parse_size_string(x[0]), reverse=reverse)
elif column == "Modified":
# This is tricky because "Modified" is relative ("2 weeks ago").
# Simple string sort is often good enough here unless precise sorting is needed.
# For precise sorting, 'ollama show <model> --json' would be needed to get timestamps.
# Let's try a basic sort that handles "ago" units roughly
def sort_modified_key(mod_str):
mod_str = (mod_str or "").lower()
multipliers = {'now': 0, 'seconds': 1, 'minute': 60, 'minutes': 60,
'hour': 3600, 'hours': 3600, 'day': 86400, 'days': 86400,
'week': 604800, 'weeks': 604800, 'month': 2628000, 'months': 2628000,
'year': 31536000, 'years': 31536000}
parts = mod_str.split()
if len(parts) < 2 or parts[-1] != 'ago':
# Handle "now" or fallback for unparsed formats (sorts them together)
return multipliers.get(mod_str, float('inf')) # Sort unparsed last
try:
num = int(parts[0])
unit = parts[1]
return num * multipliers.get(unit, float('inf')) # Large number if unit unknown
except (ValueError, IndexError):
return float('inf') # Sort unparsed last
items.sort(key=lambda x: sort_modified_key(x[0]), reverse=reverse)
else: # Default: case-insensitive string sort for Name, ID
items.sort(key=lambda x: str(x[0] or "").lower(), reverse=reverse) # Handle None/empty values
# Reorder items in the treeview
for index, (val, item_id) in enumerate(items):
self.model_tree.move(item_id, '', index)
# Update column heading arrows
arrow = " ▼" if reverse else " ▲" # Add space before arrow
for col in self.model_tree["columns"]:
# Get current text, remove old arrow, add new one if needed
current_heading = self.model_tree.heading(col, "text")
base_heading = current_heading.replace(" ▲", "").replace(" ▼", "")
new_heading = base_heading + (arrow if col == column else "")
self.model_tree.heading(col, text=new_heading)
except tk.TclError as e:
# Catch potential Tcl errors during Treeview manipulation
logging.error(f"Error during treeview sort/update for column '{column}': {e}")
messagebox.showerror("UI Error", f"An error occurred while sorting the list:\n{e}")
except Exception as e:
logging.error(f"Unexpected error during treeview sort for column '{column}': {e}", exc_info=True)
messagebox.showerror("Sort Error", f"An unexpected error occurred during sorting:\n{e}")
def toggle_model_buttons(self, enabled=True):
"""Enable or disable model action buttons based on selection state"""
state = tk.NORMAL if enabled else tk.DISABLED
for button in self.model_actions_buttons:
try:
button.configure(state=state)
except tk.TclError as e:
# Button might have been destroyed if window closed during operation
logging.warning(f"Could not configure button state (possibly destroyed): {e}")
def refresh_models(self):
"""Refreshes the list of Ollama models"""
self.status_var.set("Refreshing model list...")
self.log_output("Starting model list refresh...")
# Clear selection and disable buttons immediately
current_selection = self.model_tree.selection()
if current_selection:
try:
self.model_tree.selection_remove(current_selection)
except tk.TclError as e:
logging.warning(f"Error removing selection during refresh: {e}") # Item might already be gone
# Store selected name before clearing internal state
# self.selected_model_name = None # Keep selected name to try and restore selection
self.toggle_model_buttons(False)
threading.Thread(target=self._refresh_models_thread, daemon=True).start()
def _refresh_models_thread(self):
"""Background thread to refresh models"""
try:
# Get the raw output from ollama list
list_output = self.run_command(["ollama", "list"])
if list_output is None: # Means command failed critically (e.g., not found)
# Error already logged by run_command
self.root.after(0, lambda: self.status_var.set("Error running 'ollama list'. Is it installed and in PATH?"))
return
# Use DEBUG level for raw output as it can be verbose
self.log_output(f"Raw 'ollama list' output:\n{list_output}", "DEBUG")
# --- Robust Parsing of 'ollama list' Output ---
models = {}
tree_items = []
lines = list_output.strip().split("\n")
if len(lines) <= 1:
self.log_output("No models found or empty list output.")
# Ensure UI updates happen on main thread
self.root.after(0, lambda: self.update_model_list(models, tree_items))
self.root.after(0, lambda: self.status_var.set("No models found. Use 'ollama pull' or 'Import'."))
return
header = lines[0]
# Find column start indices (handle potential variations in spacing)
try:
# Use regex to find headers, allowing flexible spacing. Case-insensitive.
name_match = re.search(r'NAME\s+', header, re.IGNORECASE)
id_match = re.search(r'ID\s+', header, re.IGNORECASE)
size_match = re.search(r'SIZE\s+', header, re.IGNORECASE)
modified_match = re.search(r'MODIFIED\s*', header, re.IGNORECASE) # MODIFIED might be at the end
if not (name_match and id_match and size_match and modified_match):
raise ValueError("Could not parse header columns accurately (NAME, ID, SIZE, MODIFIED).")
# Calculate end positions to avoid overlap if spacing is tight
name_start = name_match.start()
id_start = id_match.start()
size_start = size_match.start()
modified_start = modified_match.start()
# Estimate end points based on start of next column
name_end = id_start
id_end = size_start
size_end = modified_start
self.log_output(f"Header parsed: NAME@[{name_start}:{name_end}], ID@[{id_start}:{id_end}], SIZE@[{size_start}:{size_end}], MODIFIED@[{modified_start}:]", "DEBUG")
except Exception as e:
self.log_output(f"Error parsing header: '{header}'. Cannot process list. Error: {e}", "ERROR")
# Ensure UI updates happen on main thread
self.root.after(0, lambda: self.status_var.set("Error: Could not parse 'ollama list' header."))
self.root.after(0, lambda: self.update_model_list({}, [])) # Clear list on header parse error
return
# Process each data line
for i, line in enumerate(lines[1:], start=1): # Skip header
line = line.rstrip() # Use rstrip to keep leading spaces if any
if not line:
continue
# Extract data based on column boundaries
try:
name = line[name_start:name_end].strip()
model_id = line[id_start:id_end].strip()
size = line[size_start:size_end].strip()
modified = line[modified_start:].strip() # Takes the rest of the line
if not name or not model_id: # Basic validation
self.log_output(f"Skipping malformed line {i+1}: '{line}' (missing name or id)", "WARNING")
continue
# Use the full name (e.g., llama3:latest) as the primary key
model_key = name
models[model_key] = {
"name": name,
"id": model_id,
"size": size,
"modified": modified,
# Add other details if needed later, e.g., from 'ollama show'
}
# Pass model_key (name) as the tag for the tree item
tree_items.append((name, model_id, size, modified, model_key))
# Log parsed models at DEBUG level to avoid flooding INFO logs
self.log_output(f"Parsed Model: Name='{name}', ID='{model_id}', Size='{size}', Mod='{modified}'", "DEBUG")
except IndexError as e:
self.log_output(f"Skipping line {i+1} due to parsing error (IndexError likely short line): '{line}'. Error: {e}", "WARNING")
except Exception as parse_err:
self.log_output(f"Error parsing line {i+1}: '{line}'. Error: {parse_err}", "ERROR", exc_info=True)
# Update the UI with the results on the main thread
self.root.after(0, lambda: self.update_model_list(models, tree_items))
except Exception as e:
# Catch errors during the thread execution itself (e.g., run_command failure if not handled)
error_msg = f"Critical error in refresh thread: {str(e)}"
self.log_output(error_msg, "ERROR", exc_info=True)
# Ensure UI updates happen on main thread
self.root.after(0, lambda: self.status_var.set("Error refreshing models. See log."))
# Optionally clear the list on major error
# self.root.after(0, lambda: self.update_model_list({}, []))
def update_model_list(self, models, tree_items):
"""Update the treeview with the new model list, preserving selection and sort order."""
try:
# Preserve selection if possible (find item by model_key/name)
# Selected name is stored in self.selected_model_name
# We will find the new Treeview item ID for this name after inserting
# Clear existing items safely
if self.root.winfo_exists(): # Check if window still exists
for item in self.model_tree.get_children():
self.model_tree.delete(item)
else:
logging.warning("Window closed during model list update. Aborting.")
return
self.model_data = models
new_selection_target_id = None
for (name, model_id, size, modified, model_key) in tree_items:
# Insert item and associate the model_key (name) as a tag
item_tree_id = self.model_tree.insert("", tk.END, values=(name, model_id, size, modified), tags=(model_key,))
# Check if this is the item that was previously selected
if model_key == self.selected_model_name:
new_selection_target_id = item_tree_id
count = len(models)
if count:
self.status_var.set(f"Found {count} models")
else:
self.status_var.set("No models found. Use 'ollama pull' or 'Import'.")
# Re-apply sorting based on stored sort column and direction
if self.sort_column and self.sort_column in self.model_tree["columns"]:
# Temporarily reverse the sort direction flag so sort_treeview toggles it back
self.sort_reverse[self.sort_column] = not self.sort_reverse.get(self.sort_column, False)
self.sort_treeview(self.sort_column) # This will re-sort and update arrows
# Restore selection if the item still exists
if new_selection_target_id:
try:
if self.root.winfo_exists() and self.model_tree.exists(new_selection_target_id):
self.model_tree.selection_set(new_selection_target_id)
self.model_tree.focus(new_selection_target_id) # Also set focus
self.model_tree.see(new_selection_target_id) # Ensure it's visible
# Ensure the selection triggers the button state update
self.on_model_select(None) # Pass None as event is optional here
else:
# Item or window disappeared
self.selected_model_name = None
self.toggle_model_buttons(False)
except tk.TclError as e:
# Item might have disappeared between clear and insert or other UI issue
logging.warning(f"Error restoring selection for '{self.selected_model_name}' (ID: {new_selection_target_id}): {e}")
self.selected_model_name = None
self.toggle_model_buttons(False)
else:
# Previously selected model is gone or no selection existed
self.selected_model_name = None
self.toggle_model_buttons(False)
except tk.TclError as e:
# Catch potential errors during bulk Treeview updates if window closed etc.
logging.error(f"Error updating model list UI: {e}")
except Exception as e:
logging.error(f"Unexpected error updating model list: {e}", exc_info=True)
self.status_var.set("Error updating list display. See log.")
def on_model_select(self, event):
"""Handle model selection in treeview"""
selection = self.model_tree.selection()
if not selection:
# Selection cleared
if self.selected_model_name: # Only update if state changes
self.selected_model_name = None
self.toggle_model_buttons(False)
self.status_var.set("No model selected")
return
# Selection made
item_id = selection[0]
tags = self.model_tree.item(item_id, "tags")
if not tags: # Should have a tag (model_key/name) if inserted correctly
self.log_output(f"Selected item {item_id} has no tags!", "WARNING")
if self.selected_model_name: # Only update if state changes
self.selected_model_name = None
self.toggle_model_buttons(False)
return
model_key = tags[0] # The model name (e.g., llama3:latest) is the key
# Update state only if the selected model actually changed
if self.selected_model_name != model_key:
self.selected_model_name = model_key
if self.selected_model_name in self.model_data:
self.toggle_model_buttons(True)
self.status_var.set(f"Selected: {self.selected_model_name}")
else:
# Data mismatch - should not happen if refresh/update is correct
self.log_output(f"Selected model '{self.selected_model_name}' not found in internal data!", "ERROR")
self.selected_model_name = None
self.toggle_model_buttons(False)
def on_model_double_click(self, event):
"""Handle double-click on model (run the model)"""
# Check if a valid item was double-clicked
item_id = self.model_tree.identify_row(event.y)
if item_id:
# Ensure selection is updated first (sometimes double-click doesn't fire select)
if not self.model_tree.selection() or item_id != self.model_tree.selection()[0]:
try:
if self.model_tree.exists(item_id):
self.model_tree.selection_set(item_id)
self.on_model_select(None) # Update internal state
else: return # Item doesn't exist
except tk.TclError: return # Error selecting (e.g. during refresh)
if self.selected_model_name:
self.run_model()
def on_delete(self):
"""Deletes the selected model after confirmation"""
if not self.selected_model_name:
messagebox.showwarning("No Selection", "Please select a model to delete.")
return
# Check if model is currently running (basic check, not foolproof)
# This requires more advanced process checking, skipping for now.
confirm = messagebox.askyesno("Confirm Delete",
f"Are you sure you want to delete model '{self.selected_model_name}'?\n\n"
"This action cannot be undone.",
icon='warning', # Add warning icon
parent=self.root) # Ensure dialog is parented
if confirm:
model_to_delete = self.selected_model_name # Copy name before clearing selection
self.status_var.set(f"Deleting model {model_to_delete}...")
self.log_output(f"Attempting to delete model: {model_to_delete}")
# Disable buttons during operation
self.toggle_model_buttons(False)
threading.Thread(target=self._delete_model_thread, args=(model_to_delete,), daemon=True).start()
def _delete_model_thread(self, model_name):
"""Background thread to delete a model"""
# Use 'ollama rm' which is the standard command now
result = self.run_command(["ollama", "rm", model_name])
# Schedule UI updates back on the main thread
def handle_delete_result():
if result is not None:
self.log_output(f"Delete command output for '{model_name}':\n{result}")
# Simple check for success (ollama rm output is minimal on success)
# A more robust check might involve trying 'ollama show' afterwards.
if "deleted" in result.lower() and "error" not in result.lower():
messagebox.showinfo("Delete Successful", f"Model '{model_name}' was deleted.")
self.status_var.set(f"Model '{model_name}' deleted")
# Refresh the list after successful deletion
self.refresh_models() # This will re-enable buttons if needed
elif "no such file or directory" in result.lower() or "not found" in result.lower():
messagebox.showwarning("Delete Warning", f"Model '{model_name}' may have already been deleted or was not found.")
self.status_var.set(f"Model '{model_name}' not found for deletion")
self.refresh_models() # Refresh anyway
else:
# Show error message based on output
error_detail = result.splitlines()[-1] if result.strip() else "Unknown error."
messagebox.showerror("Delete Failed", f"Failed to delete model '{model_name}'.\n\nError: {error_detail}\n\nCheck the log for more details.")
self.status_var.set(f"Failed to delete '{model_name}'")
# Re-enable buttons if a model is still selected
self.toggle_model_buttons(bool(self.selected_model_name))
else:
# run_command already logged the error (e.g., command not found)
messagebox.showerror("Delete Error", f"Could not execute delete command for '{model_name}'. Is 'ollama' installed and in PATH? See log.")
self.status_var.set(f"Error executing delete for '{model_name}'")
# Re-enable buttons if a model is still selected
self.toggle_model_buttons(bool(self.selected_model_name))
self.root.after(0, handle_delete_result)
def show_models_directory(self):
"""Opens file explorer to the Ollama models base directory"""
models_dir = self.get_ollama_models_base_directory()
if not models_dir:
messagebox.showerror("Directory Not Found",
"Could not determine the Ollama models directory path.")
self.log_output(f"Failed to determine models directory path.", "ERROR")
return
if not os.path.isdir(models_dir):
messagebox.showwarning("Directory Not Found",
f"Ollama models directory does not exist or is not accessible.\n\nExpected path: {models_dir}\n\nEnsure Ollama is installed and has stored models.")
self.log_output(f"Models directory path found but does not exist or is not a directory: {models_dir}", "WARNING")
return
try:
self.log_output(f"Opening directory: {models_dir}")
system = platform.system()
if system == "Windows":
# Use os.startfile for better handling of paths, avoids quoting issues
os.startfile(models_dir)
elif system == "Darwin": # macOS
# Use subprocess.run with check=True for better error handling
subprocess.run(["open", models_dir], check=True)
else: # Linux and others
# Use subprocess.run with check=True
subprocess.run(["xdg-open", models_dir], check=True)
self.status_var.set(f"Opened models directory")
except FileNotFoundError:
# Handle case where 'open' or 'xdg-open' or 'startfile association' might not be available/set up
cmd = "startfile" if system == "Windows" else "open" if system == "Darwin" else "xdg-open"
messagebox.showerror("Error", f"Could not find the command ('{cmd}') or required association to open the file explorer for your system ('{system}').")
self.log_output(f"Command '{cmd}' not found or failed to open explorer on {system}", "ERROR")
except subprocess.CalledProcessError as e:
messagebox.showerror("Error", f"The command to open the file explorer failed:\n{e}")
self.log_output(f"Command failed to open explorer: {e}", "ERROR")
except Exception as e:
# Catch other potential errors (permissions?)
messagebox.showerror("Error", f"Failed to open file explorer for '{models_dir}':\n{str(e)}")
self.log_output(f"Failed to open explorer: {e}", "ERROR")
def copy_model(self):
"""Copy the selected model to a new tag using 'ollama cp'"""
if not self.selected_model_name:
messagebox.showwarning("No Selection", "Please select a model to copy.")
return
source_model = self.selected_model_name
# Ask user for the new tag/name for the copy
new_name = simpledialog.askstring("Copy Model As",
f"Enter a new name/tag for the copy of:\n'{source_model}'\n\n(e.g., my-llama:v2 or just my-llama)",
parent=self.root)
if not new_name or not new_name.strip():
self.status_var.set("Model copy cancelled.")
return
new_name = new_name.strip()
# Basic validation for the new name
if not re.match(r"^[a-zA-Z0-9-_./:]+$", new_name):
messagebox.showerror("Invalid Name", "Model name contains invalid characters.")
return
# Add default tag if missing? Ollama cp might handle this, let's assume it does for now.
# if ":" not in new_name:
# new_name += ":latest" # Add default tag if missing
# messagebox.showinfo("Tag Added", f"Assuming tag ':latest'. New name: '{new_name}'")
if new_name == source_model:
messagebox.showerror("Error", "New name cannot be the same as the original model name.")
return
# Check if target name already exists? (Optional, 'ollama cp' might overwrite or fail)
# confirm_overwrite = True
# if new_name in self.model_data:
# confirm_overwrite = messagebox.askyesno("Model Exists", f"Model '{new_name}' already exists. Overwrite?", parent=self.root)
# if not confirm_overwrite:
# self.status_var.set("Model copy cancelled.")
# return
self.status_var.set(f"Copying model {source_model} to {new_name}...")
self.log_output(f"Attempting to copy model '{source_model}' to '{new_name}'...")
self.toggle_model_buttons(False) # Disable during operation
threading.Thread(target=self._copy_model_thread, args=(source_model, new_name), daemon=True).start()
def _copy_model_thread(self, source_model, new_name):
"""Background thread to copy a model using 'ollama cp'"""
result = self.run_command(["ollama", "cp", source_model, new_name])
# Schedule UI updates on main thread
def handle_copy_result():
if result is not None:
self.log_output(f"Copy command output for '{source_model}' -> '{new_name}':\n{result}", "DEBUG") # Debug level for output
# Check for success (ollama cp output is minimal on success)
# A more robust check might involve trying 'ollama show' afterwards.
if "error" not in result.lower() and "failed" not in result.lower():
# Assume success if no explicit error message.
messagebox.showinfo("Copy Successful", f"Model '{source_model}' copied to '{new_name}'.")
self.status_var.set(f"Model copied to '{new_name}'")
# Refresh list to show the new copy
self.refresh_models() # This will re-enable buttons
else:
error_detail = result.splitlines()[-1] if result.strip() else "Unknown error."
messagebox.showerror("Copy Failed", f"Failed to copy model '{source_model}' to '{new_name}'.\n\nError: {error_detail}\n\nCheck the log for more details.")
self.status_var.set(f"Failed to copy '{source_model}'")
# Re-enable buttons if a model is still selected
self.toggle_model_buttons(bool(self.selected_model_name))
else:
# run_command logged the error
messagebox.showerror("Copy Error", f"Could not execute copy command for '{source_model}'. Is 'ollama' installed and in PATH? See log.")
self.status_var.set(f"Error executing copy for '{source_model}'")
# Re-enable buttons if a model is still selected
self.toggle_model_buttons(bool(self.selected_model_name))
self.root.after(0, handle_copy_result)
def import_model(self):
"""Import a model using 'ollama create' from a Modelfile or 'ollama pull'"""
# Ask user whether to pull or create from Modelfile using a custom dialog if possible
# For simplicity using standard messagebox
choice = messagebox.askquestion("Import Model",
"How do you want to import a model?\n\n"
"► YES = Pull from Ollama Registry (e.g., 'llama3')\n\n"
"► NO = Create from local Modelfile\n",
type=messagebox.YESNO, parent=self.root)
if choice == 'yes':
self.pull_model_dialog()
elif choice == 'no':
self.create_model_dialog()
# else: user pressed escape or closed the dialog -> do nothing
def pull_model_dialog(self):
"""Dialog to get model name and pull it"""
model_name = simpledialog.askstring("Pull Model from Registry",
"Enter the name of the model to pull:\n(e.g., 'llama3', 'mistral:7b', 'codellama:13b-instruct')",
parent=self.root)
if not model_name or not model_name.strip():
self.status_var.set("Pull cancelled.")
return
model_name = model_name.strip()
# Basic validation
if not re.match(r"^[a-zA-Z0-9-_./:]+$", model_name):
messagebox.showerror("Invalid Name", "Model name contains invalid characters.")
return
self.status_var.set(f"Pulling model {model_name}...")
self.log_output(f"Attempting to pull model: {model_name}")
# Disable import/pull button during operation? Maybe refresh too.
self.import_button.configure(state=tk.DISABLED)
self.refresh_button.configure(state=tk.DISABLED)
threading.Thread(target=self._pull_model_thread, args=(model_name,), daemon=True).start()
def _pull_model_thread(self, model_name):
"""Background thread for 'ollama pull' with progress logging"""
process = None # Initialize process variable
try:
self.log_output(f"Executing: ollama pull {model_name}")
# Use Popen for streaming output
process = subprocess.Popen(
["ollama", "pull", model_name],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Combine stdout and stderr
text=True,
encoding='utf-8',
errors='replace',
bufsize=1, # Line buffered
# Set environment for consistent encoding if needed, but text=True should handle it
# env=os.environ.copy()
)
# Stream output to log
while True:
# Check if process has exited before reading line (avoid potential blocking)
if process.poll() is not None and process.stdout.peek(1) == b'':
break # Process finished and no more output
line = process.stdout.readline()
if not line:
# Check again if process finished after readline attempt
if process.poll() is not None:
break
else:
# Should not happen with line buffering unless process terminated abruptly
logging.warning("Readline returned empty but process still running?")
continue
# Schedule logging on main thread
self.root.after(0, lambda l=line: self.log_output(f"[pull] {l.strip()}"))
# Try to update status bar with progress (simple approach)
cleaned_line = line.strip()
if "%" in cleaned_line and ("pulling" in cleaned_line or "downloading" in cleaned_line):
status_text = f"Pulling {model_name}: {cleaned_line}"
# Limit status bar length
if len(status_text) > 100: status_text = status_text[:97] + "..."
self.root.after(0, lambda s=status_text: self.status_var.set(s))
elif "verifying" in cleaned_line.lower():
self.root.after(0, lambda: self.status_var.set(f"Verifying {model_name}..."))
elif "writing" in cleaned_line.lower():
self.root.after(0, lambda: self.status_var.set(f"Writing {model_name} manifest..."))
# Ensure process is waited for after loop finishes
process.wait()
return_code = process.returncode
# Schedule final UI updates on main thread
def handle_pull_result():
# Re-enable buttons first
self.import_button.configure(state=tk.NORMAL)
self.refresh_button.configure(state=tk.NORMAL)
if return_code == 0:
self.log_output(f"Pull command for '{model_name}' completed successfully.", "INFO")
messagebox.showinfo("Pull Successful", f"Model '{model_name}' pulled successfully.")
self.status_var.set(f"Model '{model_name}' pulled")
self.refresh_models() # Refresh list
else:
self.log_output(f"Pull command for '{model_name}' failed with return code {return_code}.", "ERROR")
# Try to get last error line from log box maybe? Difficult.
messagebox.showerror("Pull Failed", f"Failed to pull model '{model_name}'.\nReturn code: {return_code}\n\nCheck the log for more details.")
self.status_var.set(f"Failed to pull '{model_name}' (code: {return_code})")
self.root.after(0, handle_pull_result)
except FileNotFoundError:
msg = "Error: 'ollama' command not found. Is Ollama installed and in your PATH?"
self.log_output(msg, "ERROR")
# Schedule UI updates on main thread
def handle_fnf_error():
self.import_button.configure(state=tk.NORMAL) # Re-enable
self.refresh_button.configure(state=tk.NORMAL)
messagebox.showerror("Command Not Found", msg)
self.status_var.set("Error: ollama not found")
self.root.after(0, handle_fnf_error)
except Exception as e:
error_msg = f"An unexpected error occurred during pull: {str(e)}"
self.log_output(error_msg, "ERROR", exc_info=True)
# Schedule UI updates on main thread
def handle_pull_exception():
self.import_button.configure(state=tk.NORMAL) # Re-enable
self.refresh_button.configure(state=tk.NORMAL)
messagebox.showerror("Pull Error", error_msg)
self.status_var.set(f"Error pulling '{model_name}'")
self.root.after(0, handle_pull_exception)
finally:
# Ensure process streams are closed if Popen was successful
if process and process.stdout:
process.stdout.close()
# No need to close stderr as it was redirected to stdout
def create_model_dialog(self):
"""Dialog to get Modelfile path and model name for 'ollama create'"""
modelfile_path = filedialog.askopenfilename(
title="Select Modelfile to Create From",
filetypes=[("Modelfile", "Modelfile*"), ("Text files", "*.txt"), ("All files", "*.*")],
parent=self.root # Parent the dialog
)
if not modelfile_path:
self.status_var.set("Create from Modelfile cancelled.")
return
# Suggest a default model name based on the Modelfile name
base_name = os.path.basename(modelfile_path)
suggested_name = os.path.splitext(base_name)[0].lower().replace(" ", "-") + ":latest"
model_name = simpledialog.askstring("Assign Model Name",
f"Enter the name for the model being created from:\n'{base_name}'\n\n(e.g., 'my-custom-model:latest' or just 'my-model')",
initialvalue=suggested_name,
parent=self.root)
if not model_name or not model_name.strip():
self.status_var.set("Create cancelled (no model name provided).")
return
model_name = model_name.strip()
# Basic validation
if not re.match(r"^[a-zA-Z0-9-_./:]+$", model_name):
messagebox.showerror("Invalid Name", "Model name contains invalid characters.")
return
# Add default tag if missing? Let 'ollama create' handle it.
# if ":" not in model_name:
# model_name += ":latest"
self.status_var.set(f"Creating model {model_name} from {os.path.basename(modelfile_path)}...")
self.log_output(f"Attempting to create model '{model_name}' from Modelfile: {modelfile_path}")
# Disable buttons? Create can also take time.
self.import_button.configure(state=tk.DISABLED)
self.refresh_button.configure(state=tk.DISABLED)
threading.Thread(target=self._create_model_thread, args=(model_name, modelfile_path), daemon=True).start()
def _create_model_thread(self, model_name, modelfile_path):
"""Background thread for 'ollama create' with progress"""
process = None # Initialize process variable
try:
self.log_output(f"Executing: ollama create {shlex.quote(model_name)} -f {shlex.quote(modelfile_path)}")
# Use Popen for streaming
process = subprocess.Popen(
["ollama", "create", model_name, "-f", modelfile_path],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Combine output
text=True,
encoding='utf-8',
errors='replace',
bufsize=1 # Line buffered
)
# Stream output
while True:
# Check if process finished before reading
if process.poll() is not None and process.stdout.peek(1) == b'':
break
line = process.stdout.readline()
if not line:
if process.poll() is not None: break
else: continue
# Schedule log update on main thread
self.root.after(0, lambda l=line: self.log_output(f"[create] {l.strip()}"))
# Update status bar (simplified)
cleaned_line = line.strip().lower()
status_msg = f"Creating {model_name}..."
if "transferring" in cleaned_line: status_msg = f"Creating {model_name}: Transferring layers..."
elif "success" in cleaned_line: status_msg = f"Creating {model_name}: Success!"
# Limit length
if len(status_msg) > 80: status_msg = status_msg[:77]+"..."
self.root.after(0, lambda s=status_msg: self.status_var.set(s))
process.wait()
return_code = process.returncode
# Schedule final UI update on main thread
def handle_create_result():
self.import_button.configure(state=tk.NORMAL) # Re-enable
self.refresh_button.configure(state=tk.NORMAL)
if return_code == 0:
self.log_output(f"Create command for '{model_name}' completed successfully.", "INFO")
messagebox.showinfo("Create Successful", f"Model '{model_name}' created successfully.")
self.status_var.set(f"Model '{model_name}' created")
self.refresh_models() # Refresh list
else:
self.log_output(f"Create command for '{model_name}' failed with return code {return_code}.", "ERROR")
messagebox.showerror("Create Failed", f"Failed to create model '{model_name}'.\nReturn code: {return_code}\n\nCheck the log for details (e.g., Modelfile errors).")
self.status_var.set(f"Failed to create '{model_name}' (code: {return_code})")
self.root.after(0, handle_create_result)
except FileNotFoundError:
msg = "Error: 'ollama' command not found. Is Ollama installed and in your PATH?"
self.log_output(msg, "ERROR")
def handle_fnf_error():
self.import_button.configure(state=tk.NORMAL)
self.refresh_button.configure(state=tk.NORMAL)
messagebox.showerror("Command Not Found", msg)
self.status_var.set("Error: ollama not found")
self.root.after(0, handle_fnf_error)
except Exception as e:
error_msg = f"An unexpected error occurred during create: {str(e)}"
self.log_output(error_msg, "ERROR", exc_info=True)
def handle_create_exception():
self.import_button.configure(state=tk.NORMAL)
self.refresh_button.configure(state=tk.NORMAL)
messagebox.showerror("Create Error", error_msg)
self.status_var.set(f"Error creating '{model_name}'")
self.root.after(0, handle_create_exception)
finally:
# Ensure process streams are closed if Popen was successful
if process and process.stdout:
process.stdout.close()
def run_model(self):
"""Run the selected model interactively in a new terminal window"""
if not self.selected_model_name:
messagebox.showwarning("No Selection", "Please select a model to run.", parent=self.root)
return
model_to_run = self.selected_model_name
self.status_var.set(f"Attempting to run model {model_to_run} in new terminal...")
self.log_output(f"Launching interactive session for: {model_to_run}")
try:
system = platform.system()
# Command to run ollama interactively
cmd_args = ["ollama", "run", model_to_run]
if system == "Windows":
# Use subprocess.Popen with CREATE_NEW_CONSOLE for robustness
# This opens a new window inheriting the environment.
# Title setting is not directly possible with CREATE_NEW_CONSOLE.
subprocess.Popen(cmd_args, creationflags=subprocess.CREATE_NEW_CONSOLE)
# Alternative using 'start' (might be less reliable with complex paths/args):
# quoted_cmd = " ".join(shlex.quote(arg) for arg in cmd_args)
# subprocess.Popen(f'start "Ollama Chat: {model_to_run}" cmd /c "{quoted_cmd}"', shell=True)
elif system == "Darwin": # macOS
# Use osascript to open Terminal.app and run the command
# Ensure arguments are quoted correctly for the shell inside Terminal
quoted_cmd = " ".join(shlex.quote(arg) for arg in cmd_args)
# The script tells Terminal to activate and run the command in a new window/tab
script = f'tell application "Terminal" to activate do script "{quoted_cmd}"'
subprocess.Popen(['osascript', '-e', script])
else: # Linux and others (try common terminals)
terminals = {
# Terminal: command format (use {cmd} for the quoted command string)
"gnome-terminal": ["gnome-terminal", "--", "bash", "-c", "{cmd}; exec bash"],
"konsole": ["konsole", "-e", "bash", "-c", "{cmd}; exec bash"],
"xfce4-terminal": ["xfce4-terminal", "-e", "bash -c '{cmd}; exec bash'"], # Needs nested quotes handled carefully
"lxterminal": ["lxterminal", "-e", "bash", "-c", "{cmd}; exec bash"],
"mate-terminal": ["mate-terminal", "--", "bash", "-c", "{cmd}; exec bash"],
"xterm": ["xterm", "-T", "Ollama Chat: {model}", "-e", "bash", "-c", "{cmd}; exec bash"] # xterm is slightly different
}
found_terminal = False
quoted_cmd = " ".join(shlex.quote(arg) for arg in cmd_args)
for term, term_cmd_tpl in terminals.items():
try:
# Check if terminal executable exists in PATH
if subprocess.call(['which', term], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) == 0:
# Format the command template
term_cmd = [
part.format(cmd=quoted_cmd, model=model_to_run)
if isinstance(part, str) else part
for part in term_cmd_tpl
]
# Special handling for xfce4-terminal quoting if needed
if term == "xfce4-terminal":
# The template already handles nested quotes
pass
self.log_output(f"Trying to launch with {term}: {' '.join(shlex.quote(c) for c in term_cmd)}", "DEBUG")
subprocess.Popen(term_cmd)
found_terminal = True
self.log_output(f"Launched ollama run in {term}")
break # Stop after successfully launching one
except FileNotFoundError:
self.log_output(f"Terminal '{term}' not found in PATH.", "DEBUG")
continue # Terminal not found, try next
except Exception as term_err:
self.log_output(f"Error trying to launch with {term}: {term_err}", "WARNING")
# Continue trying other terminals
if not found_terminal:
messagebox.showerror("Terminal Error", "Could not find a known terminal emulator (tried gnome-terminal, konsole, xfce4-terminal, etc.) to run the model interactively.", parent=self.root)
self.status_var.set(f"Could not find terminal to run {model_to_run}")
return
self.status_var.set(f"Launched '{model_to_run}' in new terminal")
except Exception as e:
messagebox.showerror("Error", f"Failed to launch Ollama interactive session:\n{str(e)}", parent=self.root)
self.log_output(f"Failed to run model {model_to_run}: {e}", "ERROR", exc_info=True)
self.status_var.set(f"Error launching {model_to_run}")
# --- run_command: Centralized command execution ---
def run_command(self, command_args):
"""Runs a command given as a list and returns its output as text, logging errors."""
if not command_args:
self.log_output("run_command called with empty args", "ERROR")
return None
command_str = ' '.join(shlex.quote(str(arg)) for arg in command_args) # Ensure args are strings for quote
# Show only base command in status bar for brevity
base_cmd_name = os.path.basename(command_args[0])
# Use root.after to ensure status update is on main thread
self.root.after(0, lambda: self.status_var.set(f"Running: {base_cmd_name}..."))
self.log_output(f"Executing: {command_str}", "DEBUG") # Use DEBUG level for execution log
try:
# Check if ollama is the command and log its location for debugging
if base_cmd_name == "ollama" or base_cmd_name == "ollama.exe":
try:
where_cmd = "where" if platform.system() == "Windows" else "which"
# Run synchronously as it's quick and needed for context
location_result = subprocess.run([where_cmd, "ollama"], capture_output=True, text=True, check=False, encoding='utf-8', errors='replace', timeout=5)
if location_result.returncode == 0 and location_result.stdout:
found_path = location_result.stdout.strip().splitlines()[0] # Take first line if multiple found
self.log_output(f"Using ollama found at: {found_path}", "DEBUG")
else:
self.log_output("Could not determine ollama location via 'where'/'which'. Is it in PATH?", "WARNING")
except subprocess.TimeoutExpired:
self.log_output("Timeout trying to find ollama location.", "WARNING")
except Exception as loc_err:
self.log_output(f"Error finding ollama location: {loc_err}", "WARNING")
# Execute the command
# Combine stdout and stderr to capture all output, especially errors
# Set startupinfo for Windows to prevent console window pop-up for non-interactive commands
startupinfo = None
if platform.system() == "Windows":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE # Hide console window
process = subprocess.run(
command_args,
capture_output=True,
text=True, # Decode output as text using default encoding
check=False, # Don't raise exception on non-zero exit code
encoding='utf-8', # Explicitly specify utf-8
errors='replace', # Handle potential decoding errors
startupinfo=startupinfo # Hide window on Windows
# timeout=300 # Optional: Add a timeout (e.g., 5 minutes) for commands
)
# Log output regardless of success/failure (useful for debugging)
# Use DEBUG level for full output to avoid cluttering INFO logs
if process.stdout:
self.log_output(f"Command stdout:\n---\n{process.stdout.strip()}\n---", "DEBUG")
# Stderr is captured separately with capture_output=True
if process.stderr:
self.log_output(f"Command stderr:\n---\n{process.stderr.strip()}\n---", "DEBUG")
# Check return code
if process.returncode != 0:
# Log the error prominently at ERROR level
error_message = f"Command failed with exit code {process.returncode}: {command_str}\n"
# Combine stdout and stderr for the error message context
output = (process.stdout.strip() + "\n" + process.stderr.strip()).strip()
if not output:
output = "(No output captured)"
error_message += f"Output:\n{output}"
self.log_output(error_message, "ERROR")
# Return a combined string indicating failure for simple checks
# Often better to return None or raise, but this keeps compatibility
return f"Error (code {process.returncode}):\n{output}"
# Success case: return stdout (prefer stdout over stderr if both exist on success)
return process.stdout if process.stdout else "" # Return empty string if no stdout
except FileNotFoundError:
msg = f"Error: Command '{command_args[0]}' not found. Please ensure it is installed and in your system's PATH."
self.log_output(msg, "ERROR")
self.root.after(0, lambda: self.status_var.set(f"Error: {command_args[0]} not found"))
return None # Indicate critical failure
except subprocess.TimeoutExpired:
msg = f"Error: Command timed out after specified duration: {command_str}"
self.log_output(msg, "ERROR")
self.root.after(0, lambda: self.status_var.set("Error: Command timed out"))
return "Error: Command timed out"
except OSError as e:
# Catch other OS errors like permission denied
msg = f"OS error running command '{command_str}': {e}"
self.log_output(msg, "ERROR", exc_info=True)
self.root.after(0, lambda: self.status_var.set(f"OS Error running command"))
return f"OS Error: {e}"
except Exception as e:
# Catch other potential exceptions
msg = f"Unexpected error running command '{command_str}': {str(e)}"
self.log_output(msg, "ERROR", exc_info=True)
self.root.after(0, lambda: self.status_var.set(f"Unexpected error running command"))
return f"Unexpected Error: {str(e)}"
# --- Main execution ---
def main():
root = tk.Tk()
# Set an application name for WM_CLASS on Linux if possible
if platform.system() == "Linux":
root.tk.call('wm', 'iconphoto', root._w, tk.PhotoImage(file='')) # Placeholder for icon
root.wm_class("OllamaModelManager") # Set window class/name hint
# Optional: Add an app icon (replace 'icon.png' with your actual icon file)
# try:
# if platform.system() == "Windows":
# root.iconbitmap("icon.ico") # Use .ico on Windows
# else:
# # For Linux/Mac, PhotoImage supports PNG usually
# # Ensure tk is recent enough for PNG support without PIL
# img = tk.PhotoImage(file="icon.png")
# root.tk.call('wm', 'iconphoto', root._w, img)
# except tk.TclError:
# logging.warning("Could not load or set application icon.")
# except FileNotFoundError:
# logging.warning("Icon file (icon.png/icon.ico) not found.")
app = OllamaModelManager(root)
# Handle window close gracefully
def on_closing():
# Add cleanup if needed (e.g., stop running threads?)
logging.info("Ollama Model Manager closing.")
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
if __name__ == "__main__":
# Add basic console logging config if running directly
if not logging.getLogger().hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
main()