Sorting single-select options without deleting/recreating them — preserving option IDs via modify_column_data

Hi everyone,

I wanted to share a Python script that sorts the options of a single-select column alphabetically without deleting and recreating the options.

The goal was to reproduce the same behavior as manually reordering select options via drag & drop in the SeaTable UI:

  • preserve existing option IDs

  • preserve colors and text colors

  • do not delete options

  • do not recreate options

  • do not read or modify row values

  • only change the order of column.data.options

Background

I first tried the obvious documented REST approaches:

  1. Updating the column via /columns/ and passing updated column_data

  2. Updating options via /column-options/

However, in my test environment:

  • /columns/ did not accept the column_data update for this use case.

  • /column-options/ accepted the payload, but did not persist the changed order.

  • The actual UI behavior appears to use an internal modify_column_data operation.

The working approach is therefore to send an internal DTable operation:

{
  "op_type": "modify_column_data",
  "table_id": "0000",
  "column_key": "...",
  "column_data": {
    "options": [
      ...
    ]
  }
}

The script sends this operation via Engine.IO polling to update-dtable, similar to how the web client updates the base.

Safety measures included

The script includes several safeguards:

  • DRY_RUN

  • explicit confirmation flag for the undocumented internal operation

  • before/after JSON output

  • verification that option IDs and metadata remain unchanged

  • verification that the saved option order actually changed after the update

  • no row reads or row updates

Tested result

In my test table, the original order was:

Eins
Zwei
Drei
Vier

After running the script, the saved order became:

Drei
Eins
Vier
Zwei

The option IDs, colors and text colors were preserved.

The server acknowledged the update and increased the dtable_version.

Important caveat

This uses an internal, undocumented modify_column_data operation. It works in my environment, but I would not treat it as an officially supported API contract.

I am posting this for two reasons:

  1. It may be useful for others who need to reorder select options without destroying option IDs.

  2. I would be interested to know whether there is an officially supported API endpoint for this use case, or whether SeaTable plans to expose one.

Script below.

# -*- coding: utf-8 -*-
"""
SeaTable/BwTable: Select-Optionen ohne Löschen/Neuanlegen sortieren.
Finale Version: Engine.IO-Polling, keine python-socketio-Abhängigkeit.

Ziel:
- Reihenfolge von Single-/Multiple-Select-Optionen alphabetisch sortieren
- KEINE Optionen löschen
- KEINE Optionen neu anlegen
- KEINE Zellwerte lesen oder verändern
- Option-IDs, Farben, Textfarben und sonstige Options-Metadaten erhalten

Technischer Ansatz:
- SeaTable speichert die Optionsliste in column.data.options.
- Der Webclient ändert diese Metadaten intern über die Operation:
    op_type = "modify_column_data"
- Diese Operation wird über Socket.IO / Engine.IO an update-dtable gesendet.
- Der öffentliche REST-Endpunkt /column-options/ ändert die gespeicherte Reihenfolge nicht zuverlässig.

Sicherheitsmodell:
1. Erst mit DRY_RUN = True ausführen.
2. Ausgabe prüfen.
3. Für echte Änderung setzen:
       DRY_RUN = False
       CONFIRM_UNDOCUMENTED_SOCKET_OPERATION = True

Hinweis:
Diese Lösung nutzt einen internen Webclient-Mechanismus. Funktioniert wie Drag & Drop,
ist aber nicht als offizieller stabiler REST-Endpunkt dokumentiert.
"""

from seatable_api import context

import copy
import json
import time
import unicodedata
import requests


# ============================================================
# KONFIGURATION
# ============================================================

# Standard: aktuelle Tabelle aus dem SeaTable-Script-Kontext.
# Alternativ fest setzen, z. B. TABLE_NAME = "Table1"
TABLE_NAME = context.current_table

# Name der Single-/Multiple-Select-Spalte
COLUMN_NAME = "Status"

# Erst True lassen. Für echte Änderung auf False setzen.
DRY_RUN = True

# Harte Sicherheitsbremse für den undokumentierten internen Socket-Mechanismus.
# Für echte Änderung bewusst auf True setzen.
CONFIRM_UNDOCUMENTED_SOCKET_OPERATION = False

# "asc" oder "desc"
SORT_DIRECTION = "asc"

# Deutsche Sortierung: Ä->Ae, Ö->Oe, Ü->Ue, ß->ss
GERMAN_SORTING = True

# Leere Optionsnamen am Ende halten.
EMPTY_NAMES_LAST = True

# Technische Parameter
REQUEST_TIMEOUT = 30
VERIFY_RETRIES = 10
VERIFY_SLEEP_SECONDS = 0.5
SOCKET_LANG = "de"

# Diagnoseausgaben
PRINT_BACKUP_JSON = True
PRINT_OPERATION_JSON = True


# ============================================================
# JSON / AUSGABE
# ============================================================

def pretty(value):
    return json.dumps(value, ensure_ascii=False, indent=2, sort_keys=False)


def compact_json(value):
    return json.dumps(value, ensure_ascii=False, separators=(",", ":"))


def print_section(title):
    print("")
    print(title)
    print("-" * len(title))


def mask_sensitive(value):
    """
    Verhindert versehentliches Ausgeben von Tokens in Diagnoseobjekten.
    """
    sensitive_keys = ("token", "authorization", "cookie", "jwt", "secret")

    if isinstance(value, dict):
        result = {}
        for key, val in value.items():
            key_text = str(key).lower()
            if any(marker in key_text for marker in sensitive_keys):
                result[key] = "***"
            else:
                result[key] = mask_sensitive(val)
        return result

    if isinstance(value, list):
        return [mask_sensitive(item) for item in value]

    return value


# ============================================================
# SORTIERUNG / OPTIONSPRÜFUNG
# ============================================================

def normalize_sort_value(value):
    value = str(value or "").strip()

    if GERMAN_SORTING:
        replacements = {
            "Ä": "Ae", "Ö": "Oe", "Ü": "Ue",
            "ä": "ae", "ö": "oe", "ü": "ue",
            "ß": "ss",
        }
        for src, target in replacements.items():
            value = value.replace(src, target)

    value = unicodedata.normalize("NFKD", value)
    value = "".join(char for char in value if not unicodedata.combining(char))

    return value.casefold()


def option_sort_key(option):
    return normalize_sort_value(option.get("name", ""))


def sort_options(options):
    direction = SORT_DIRECTION.lower().strip()
    if direction not in ("asc", "desc"):
        raise Exception("SORT_DIRECTION muss 'asc' oder 'desc' sein: {}".format(SORT_DIRECTION))

    reverse = direction == "desc"

    # Wichtig: Bei reverse=True sollen leere Namen trotzdem am Ende bleiben.
    if EMPTY_NAMES_LAST:
        named = []
        empty = []
        for option in options:
            name = str(option.get("name", "") or "").strip()
            if name:
                named.append(option)
            else:
                empty.append(option)

        return sorted(named, key=option_sort_key, reverse=reverse) + empty

    return sorted(options, key=option_sort_key, reverse=reverse)


def option_id(option):
    value = option.get("id")
    if value is None or str(value).strip() == "":
        return ""
    return str(value)


def option_ids(options):
    return ["id:{}".format(option_id(option)) for option in options]


def option_identity_minimal(option):
    """
    Knappes Anzeigeobjekt für Logs.
    """
    result = {
        "id": option.get("id"),
        "name": option.get("name", ""),
    }
    if "color" in option:
        result["color"] = option.get("color")
    if "textColor" in option:
        result["textColor"] = option.get("textColor")
    return result


def canonical_option(option):
    """
    Vollständige, stabile Repräsentation einer Option.
    Damit prüfen wir, dass nur die Reihenfolge geändert wird.
    """
    return json.dumps(option, ensure_ascii=False, sort_keys=True, separators=(",", ":"))


def validate_options(options):
    if not options:
        raise Exception("Keine Select-Optionen gefunden.")

    ids = []
    for index, option in enumerate(options, start=1):
        oid = option_id(option)
        if not oid:
            raise Exception("Option an Position {} hat keine ID: {}".format(index, pretty(option)))
        ids.append(oid)

    duplicate_ids = sorted({oid for oid in ids if ids.count(oid) > 1})
    if duplicate_ids:
        raise Exception("Doppelte Option-IDs gefunden: {}".format(pretty(duplicate_ids)))


def assert_same_options_before_after(before, after):
    before_by_id = {option_id(option): canonical_option(option) for option in before}
    after_by_id = {option_id(option): canonical_option(option) for option in after}

    if set(before_by_id.keys()) != set(after_by_id.keys()):
        raise Exception(
            "Abbruch: Optionsmenge/IDs unterscheiden sich.\nVorher:\n{}\nNachher:\n{}".format(
                pretty(sorted(before_by_id.keys())),
                pretty(sorted(after_by_id.keys())),
            )
        )

    changed_payloads = []
    for oid in sorted(before_by_id.keys()):
        if before_by_id[oid] != after_by_id[oid]:
            changed_payloads.append(oid)

    if changed_payloads:
        raise Exception(
            "Abbruch: Mindestens eine Option wurde inhaltlich verändert, nicht nur verschoben: {}".format(
                pretty(changed_payloads)
            )
        )


def print_options(title, options):
    print_section(title)
    for i, option in enumerate(options, start=1):
        color = option.get("color")
        text_color = option.get("textColor")
        suffix_parts = []
        if option.get("id") is not None:
            suffix_parts.append("id: {}".format(option.get("id")))
        if color:
            suffix_parts.append("color: {}".format(color))
        if text_color:
            suffix_parts.append("textColor: {}".format(text_color))

        suffix = " [{}]".format(", ".join(suffix_parts)) if suffix_parts else ""
        print("{:>3}. {}{}".format(i, option.get("name", ""), suffix))


# ============================================================
# REST API: APP ACCESS TOKEN / METADATA
# ============================================================

def get_base_access():
    server_url = context.server_url.rstrip("/")
    url = "{}/api/v2.1/dtable/app-access-token/".format(server_url)

    response = requests.get(
        url,
        headers={
            "Authorization": "Bearer {}".format(context.api_token),
            "Accept": "application/json",
        },
        timeout=REQUEST_TIMEOUT,
    )

    if not response.ok:
        raise Exception(
            "Base-Token Fehler: HTTP {} {}".format(
                response.status_code,
                response.text[:1000],
            )
        )

    data = response.json()

    if not data.get("dtable_uuid") or not data.get("access_token"):
        raise Exception("Unerwartete app-access-token Antwort: {}".format(pretty(mask_sensitive(data))))

    return {
        "server_url": server_url,
        "api_root": "{}/api-gateway".format(server_url),
        "base_uuid": data["dtable_uuid"],
        "base_token": data["access_token"],
    }


def api_headers(access):
    return {
        "Authorization": "Bearer {}".format(access["base_token"]),
        "Accept": "application/json",
    }


def api_url(access, path):
    return "{}/api/v2/dtables/{}{}".format(
        access["api_root"],
        access["base_uuid"],
        path,
    )


def api_get(access, path, params=None):
    response = requests.get(
        api_url(access, path),
        headers=api_headers(access),
        params=params,
        timeout=REQUEST_TIMEOUT,
    )

    if not response.ok:
        raise Exception(
            "GET Fehler {}: HTTP {} {}".format(
                path,
                response.status_code,
                response.text[:1000],
            )
        )

    return response.json()


def get_metadata(access):
    data = api_get(access, "/metadata/")
    return data.get("metadata", data)


def find_table(metadata):
    for table in metadata.get("tables", []):
        if table.get("name") == TABLE_NAME:
            return table

    available = [table.get("name") for table in metadata.get("tables", [])]
    raise Exception(
        "Tabelle nicht gefunden: {}\nVerfügbare Tabellen: {}".format(
            TABLE_NAME,
            pretty(available),
        )
    )


def find_column(table):
    for column in table.get("columns", []):
        if column.get("name") == COLUMN_NAME:
            return column

    available = [column.get("name") for column in table.get("columns", [])]
    raise Exception(
        "Spalte nicht gefunden: {}\nVerfügbare Spalten: {}".format(
            COLUMN_NAME,
            pretty(available),
        )
    )


def fetch_current_column(access):
    metadata = get_metadata(access)
    table = find_table(metadata)
    column = find_column(table)
    return table, column


# ============================================================
# ENGINE.IO / SOCKET.IO POLLING
# ============================================================

def parse_engineio_packets(raw_text):
    """
    Engine.IO v4 Polling kann mehrere Pakete mit Record-Separator \x1e liefern.
    """
    if not raw_text:
        return []

    if "\x1e" in raw_text:
        return [part for part in raw_text.split("\x1e") if part]

    return [raw_text]


def parse_engineio_open_packet(raw_text):
    packets = parse_engineio_packets(raw_text)
    for packet in packets:
        if packet.startswith("0"):
            return json.loads(packet[1:])

    raise Exception("Kein Engine.IO Open-Paket empfangen: {}".format(raw_text[:500]))


def polling_get(session, url, params):
    response = session.get(url, params=params, timeout=REQUEST_TIMEOUT)
    if not response.ok:
        raise Exception(
            "Socket.IO GET Fehler: HTTP {} {}".format(
                response.status_code,
                response.text[:1000],
            )
        )
    return parse_engineio_packets(response.text)


def polling_post(session, url, params, payload):
    response = session.post(
        url,
        params=params,
        data=payload,
        headers={"Content-Type": "text/plain;charset=UTF-8"},
        timeout=REQUEST_TIMEOUT,
    )

    if not response.ok:
        raise Exception(
            "Socket.IO POST Fehler: HTTP {} {}".format(
                response.status_code,
                response.text[:1000],
            )
        )

    return response.text


def wait_for_socket_ack(session, url, params, ack_id, label):
    """
    Wartet auf Socket.IO ACK.

    Engine.IO message packet: 4
    Socket.IO ack packet:    3
    ACK-ID 0 Beispiel:       430[{"status":1,...}]
    ACK-ID 1 Beispiel:       431[{"status":1,...}]
    """
    expected_prefix = "43{}".format(ack_id)
    deadline = time.time() + REQUEST_TIMEOUT
    last_packets = []

    while time.time() < deadline:
        packets = polling_get(session, url, params)
        last_packets = packets

        for packet in packets:
            if packet == "2":
                # Engine.IO ping -> pong
                polling_post(session, url, params, "3")
                continue

            if packet == "40" or packet.startswith("40"):
                # Socket.IO namespace connected
                continue

            if packet.startswith(expected_prefix):
                payload = packet[len(expected_prefix):] or "[]"
                try:
                    return json.loads(payload)
                except Exception:
                    raise Exception("ACK {} konnte nicht als JSON gelesen werden: {}".format(label, packet))

            if packet.startswith("44"):
                raise Exception("Socket.IO Fehlerpaket bei {}: {}".format(label, packet))

        time.sleep(0.05)

    raise Exception(
        "Timeout beim Warten auf Socket.IO ACK: {}\nLetzte Pakete:\n{}".format(
            label,
            pretty(last_packets),
        )
    )


def assert_ack_ok(ack, label):
    main = ack[0] if isinstance(ack, list) and ack else ack

    if not isinstance(main, dict):
        raise Exception("Unerwartetes ACK-Format bei {}: {}".format(label, pretty(mask_sensitive(ack))))

    # SeaTable liefert status typischerweise als 1 oder true.
    if not main.get("status"):
        raise Exception("Socket ACK fehlgeschlagen bei {}: {}".format(label, pretty(mask_sensitive(ack))))


def send_operation_via_engineio_polling(access, operation):
    """
    Minimaler Socket.IO/Engine.IO-v4-Polling-Client nur mit requests.
    Keine externe socketio-Library erforderlich.
    """
    session = requests.Session()
    socket_url = "{}/api-gateway/socket.io/".format(access["server_url"])

    base_params = {
        "EIO": "4",
        "transport": "polling",
        "dtable_uuid": access["base_uuid"],
        "lang": SOCKET_LANG,
        "t": str(int(time.time() * 1000)),
    }

    # 1) Engine.IO öffnen
    open_response = session.get(socket_url, params=base_params, timeout=REQUEST_TIMEOUT)
    if not open_response.ok:
        raise Exception(
            "Socket.IO Open Fehler: HTTP {} {}".format(
                open_response.status_code,
                open_response.text[:1000],
            )
        )

    open_packet = parse_engineio_open_packet(open_response.text)
    sid = open_packet.get("sid")
    if not sid:
        raise Exception("Socket.IO SID fehlt im Open-Paket: {}".format(pretty(open_packet)))

    params = dict(base_params)
    params["sid"] = sid

    # 2) Socket.IO Namespace verbinden
    polling_post(session, socket_url, params, "40")

    # 3) join-room(base_uuid, base_token) mit ACK-ID 0
    join_payload = "420{}".format(compact_json([
        "join-room",
        access["base_uuid"],
        access["base_token"],
    ]))
    polling_post(session, socket_url, params, join_payload)
    join_ack = wait_for_socket_ack(session, socket_url, params, 0, "join-room")
    assert_ack_ok(join_ack, "join-room")

    # 4) update-dtable(base_uuid, operation_as_json_string) mit ACK-ID 1
    update_payload = "421{}".format(compact_json([
        "update-dtable",
        access["base_uuid"],
        compact_json(operation),
    ]))
    polling_post(session, socket_url, params, update_payload)
    update_ack = wait_for_socket_ack(session, socket_url, params, 1, "update-dtable")
    assert_ack_ok(update_ack, "update-dtable")

    # 5) Sauberer Disconnect. Fehler hier sind irrelevant, weil update_ack schon ok war.
    try:
        polling_post(session, socket_url, params, "41")
    except Exception:
        pass

    return {
        "mode": "engineio-polling",
        "join_ack": mask_sensitive(join_ack),
        "update_ack": mask_sensitive(update_ack),
    }


# ============================================================
# VERIFIKATION
# ============================================================

def verify_result(access, expected_sorted_options, original_options):
    expected_ids = option_ids(expected_sorted_options)
    last_actual_ids = None
    last_options = None

    for _ in range(VERIFY_RETRIES):
        _, current_column = fetch_current_column(access)
        current_data = current_column.get("data") or {}
        current_options = current_data.get("options") or []
        actual_ids = option_ids(current_options)

        last_actual_ids = actual_ids
        last_options = current_options

        if actual_ids == expected_ids:
            assert_same_options_before_after(original_options, current_options)
            return current_options

        time.sleep(VERIFY_SLEEP_SECONDS)

    raise Exception(
        "Reihenfolge wurde nicht wie erwartet gespeichert.\nErwartet:\n{}\nTatsächlich:\n{}\nLetzte Optionen:\n{}".format(
            pretty(expected_ids),
            pretty(last_actual_ids),
            pretty([option_identity_minimal(option) for option in (last_options or [])]),
        )
    )


# ============================================================
# MAIN
# ============================================================

def main():
    print("Sortiere Select-Optionen ohne Löschen/Neuanlegen via modify_column_data.")
    print("Tabelle:", TABLE_NAME)
    print("Spalte:", COLUMN_NAME)
    print("DRY_RUN:", DRY_RUN)
    print("CONFIRM_UNDOCUMENTED_SOCKET_OPERATION:", CONFIRM_UNDOCUMENTED_SOCKET_OPERATION)
    print("Sortierung:", SORT_DIRECTION)

    access = get_base_access()

    table, column = fetch_current_column(access)

    column_type = column.get("type")
    if column_type not in ("single-select", "multiple-select"):
        raise Exception("Spalte ist keine Single-/Multiple-Select-Spalte: {}".format(column_type))

    table_id = table.get("_id") or table.get("id")
    if not table_id:
        raise Exception("Table-ID fehlt in Metadata für Tabelle: {}".format(TABLE_NAME))

    column_key = column.get("key")
    if not column_key:
        raise Exception("Column-Key fehlt in Metadata für Spalte: {}".format(COLUMN_NAME))

    column_data = copy.deepcopy(column.get("data") or {})
    options = column_data.get("options") or []

    validate_options(options)

    sorted_options = sort_options(copy.deepcopy(options))
    validate_options(sorted_options)
    assert_same_options_before_after(options, sorted_options)

    changed = option_ids(options) != option_ids(sorted_options)

    print_options("Aktuelle Reihenfolge", options)
    print_options("Neue Reihenfolge", sorted_options)

    print("")
    print("Table-ID:", table_id)
    print("Column-Key:", column_key)
    print("Column-Type:", column_type)
    print("Optionen gesamt:", len(options))
    print("Reihenfolge ändert sich:", changed)
    print("Optionsmenge/IDs/Metadaten unverändert geprüft: True")
    print("Zellwerte werden nicht gelesen und nicht verändert.")

    backup = {
        "table_name": TABLE_NAME,
        "table_id": table_id,
        "column_name": COLUMN_NAME,
        "column_key": column_key,
        "column_type": column_type,
        "sort_direction": SORT_DIRECTION,
        "options_before": options,
        "options_after": sorted_options,
    }

    if PRINT_BACKUP_JSON:
        print_section("Backup / Vorher-Nachher-JSON")
        print(pretty(backup))

    if not changed:
        print("")
        print("Bereits sortiert. Keine Änderung nötig.")
        return

    new_column_data = copy.deepcopy(column_data)
    new_column_data["options"] = sorted_options

    operation = {
        "op_type": "modify_column_data",
        "table_id": table_id,
        "column_key": column_key,
        "column_data": new_column_data,
    }

    if PRINT_OPERATION_JSON:
        print_section("Geplante Operation")
        print(pretty({
            "op_type": operation["op_type"],
            "table_id": operation["table_id"],
            "column_key": operation["column_key"],
            "column_data": {
                "options": [option_identity_minimal(option) for option in operation["column_data"].get("options", [])],
            },
        }))

    if DRY_RUN:
        print("")
        print("DRY_RUN = True. Es wurde nichts geändert.")
        return

    if not CONFIRM_UNDOCUMENTED_SOCKET_OPERATION:
        raise Exception(
            "Abbruch: CONFIRM_UNDOCUMENTED_SOCKET_OPERATION ist False.\n"
            "Für echte Änderung bewusst setzen:\n"
            "  DRY_RUN = False\n"
            "  CONFIRM_UNDOCUMENTED_SOCKET_OPERATION = True"
        )

    print("")
    print("Sende modify_column_data via Engine.IO-Polling / update-dtable...")
    result = send_operation_via_engineio_polling(access, operation)

    print_section("Socket-Ergebnis")
    print(pretty(result))

    print("")
    print("Prüfe gespeicherte Optionsreihenfolge...")
    verified_options = verify_result(access, sorted_options, options)

    print_options("Gespeicherte Reihenfolge nach Update", verified_options)

    print("")
    print("Fertig. Optionen wurden ohne Löschen/Neuanlegen sortiert.")


main()

Thanks for reporting the issue and feature request. We will give it a check.