| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416 |
- #!/usr/bin/env bash
- set -euo pipefail
- DB_NAME="${CLICKHOUSE_DATABASE:-omsdb}"
- SRC_TABLE="${CLICKHOUSE_SRC_TABLE:-events}"
- DST_TABLE="${CLICKHOUSE_DST_TABLE:-events_v2}"
- BACKUP_TABLE="${CLICKHOUSE_BACKUP_TABLE:-events_backup}"
- INGESTOR_PM2_NAME="${INGESTOR_PM2_NAME:-ingestor-service}"
- CURRENT_MONTH="${CURRENT_MONTH:-$(date +%Y%m)}"
- DRY_RUN=1
- SKIP_PM2=0
- FORCE=0
- RECONCILE_ONLY=0
- usage() {
- cat <<'EOF'
- Usage:
- bash oms/scripts/migrate-clickhouse-events-partition.sh [options]
- Options:
- --execute Actually run the migration. Without this flag, the script is a dry-run.
- --skip-pm2 Do not stop/start PM2 ingestor-service during cutover.
- --force Allow destination/backup tables to already exist.
- --reconcile-only Only reconcile missing rows from source into destination, without create/rename.
- --current-month Override the current month used for cutover backfill, format YYYYMM.
- --help Show this help message.
- Environment variables:
- CLICKHOUSE_HOST Default: localhost
- CLICKHOUSE_PORT Default: 9000
- CLICKHOUSE_USER Optional
- CLICKHOUSE_PASSWORD Optional
- CLICKHOUSE_CONTAINER Default: clickhouse
- CLICKHOUSE_USE_DOCKER Default: auto (prefer docker, fallback to native client)
- CLICKHOUSE_DATABASE Default: omsdb
- CLICKHOUSE_SRC_TABLE Default: events
- CLICKHOUSE_DST_TABLE Default: events_v2
- CLICKHOUSE_BACKUP_TABLE Default: events_backup
- INGESTOR_PM2_NAME Default: ingestor-service
- CURRENT_MONTH Default: current system month in YYYYMM
- Notes:
- - The script assumes ClickHouse writes come only from ingestor-service.
- - By default it runs clickhouse-client inside the Docker container named by CLICKHOUSE_CONTAINER.
- - event-api can stay online; RabbitMQ will buffer events while ingestor-service is paused.
- - Review the printed plan first, then rerun with --execute.
- EOF
- }
- while [[ $# -gt 0 ]]; do
- case "$1" in
- --execute)
- DRY_RUN=0
- shift
- ;;
- --dry-run)
- DRY_RUN=1
- shift
- ;;
- --skip-pm2)
- SKIP_PM2=1
- shift
- ;;
- --force)
- FORCE=1
- shift
- ;;
- --reconcile-only)
- RECONCILE_ONLY=1
- shift
- ;;
- --current-month)
- CURRENT_MONTH="$2"
- shift 2
- ;;
- --help|-h)
- usage
- exit 0
- ;;
- *)
- echo "Unknown argument: $1" >&2
- usage
- exit 1
- ;;
- esac
- done
- if [[ ! "$CURRENT_MONTH" =~ ^[0-9]{6}$ ]]; then
- echo "CURRENT_MONTH must be in YYYYMM format, got: $CURRENT_MONTH" >&2
- exit 1
- fi
- CLICKHOUSE_HOST="${CLICKHOUSE_HOST:-localhost}"
- CLICKHOUSE_PORT="${CLICKHOUSE_PORT:-9000}"
- CLICKHOUSE_USER="${CLICKHOUSE_USER:-}"
- CLICKHOUSE_PASSWORD="${CLICKHOUSE_PASSWORD:-}"
- CLICKHOUSE_CONTAINER="${CLICKHOUSE_CONTAINER:-clickhouse}"
- CLICKHOUSE_USE_DOCKER="${CLICKHOUSE_USE_DOCKER:-auto}"
- CH_ARGS=(
- --host "$CLICKHOUSE_HOST"
- --port "$CLICKHOUSE_PORT"
- --database "$DB_NAME"
- --multiquery
- )
- if [[ -n "$CLICKHOUSE_USER" ]]; then
- CH_ARGS+=(--user "$CLICKHOUSE_USER")
- fi
- if [[ -n "$CLICKHOUSE_PASSWORD" ]]; then
- CH_ARGS+=(--password "$CLICKHOUSE_PASSWORD")
- fi
- require_command() {
- if ! command -v "$1" >/dev/null 2>&1; then
- echo "Required command not found: $1" >&2
- exit 1
- fi
- }
- docker_available=0
- native_clickhouse_available=0
- if command -v docker >/dev/null 2>&1; then
- docker_available=1
- fi
- if command -v clickhouse-client >/dev/null 2>&1; then
- native_clickhouse_available=1
- fi
- case "$CLICKHOUSE_USE_DOCKER" in
- auto)
- if [[ $docker_available -eq 1 ]]; then
- CLICKHOUSE_EXEC_MODE="docker"
- elif [[ $native_clickhouse_available -eq 1 ]]; then
- CLICKHOUSE_EXEC_MODE="native"
- else
- echo "Neither docker nor clickhouse-client is available on this host." >&2
- exit 1
- fi
- ;;
- 1|true|yes|docker)
- if [[ $docker_available -eq 0 ]]; then
- echo "Docker is required but not available." >&2
- exit 1
- fi
- CLICKHOUSE_EXEC_MODE="docker"
- ;;
- 0|false|no|native)
- if [[ $native_clickhouse_available -eq 0 ]]; then
- echo "clickhouse-client is required but not available." >&2
- exit 1
- fi
- CLICKHOUSE_EXEC_MODE="native"
- ;;
- *)
- echo "Unsupported CLICKHOUSE_USE_DOCKER value: $CLICKHOUSE_USE_DOCKER" >&2
- exit 1
- ;;
- esac
- if [[ "$CLICKHOUSE_EXEC_MODE" == "docker" ]]; then
- if ! docker inspect "$CLICKHOUSE_CONTAINER" >/dev/null 2>&1; then
- echo "ClickHouse container not found: $CLICKHOUSE_CONTAINER" >&2
- exit 1
- fi
- fi
- if [[ $SKIP_PM2 -eq 0 ]]; then
- require_command pm2
- fi
- run_sql() {
- local sql="$1"
- if [[ "$CLICKHOUSE_EXEC_MODE" == "docker" ]]; then
- docker exec "$CLICKHOUSE_CONTAINER" clickhouse-client "${CH_ARGS[@]}" --query "$sql"
- else
- clickhouse-client "${CH_ARGS[@]}" --query "$sql" </dev/null
- fi
- }
- print_sql_block() {
- local title="$1"
- local sql="$2"
- printf '\n[%s]\n%s\n' "$title" "$sql"
- }
- run_sql_step() {
- local title="$1"
- local sql="$2"
- printf '\n== %s ==\n' "$title"
- printf 'SQL:\n%s\n' "$sql"
- if [[ $DRY_RUN -eq 0 ]]; then
- run_sql "$sql"
- fi
- }
- run_cmd_step() {
- local title="$1"
- shift
- printf '\n== %s ==\n' "$title"
- printf 'CMD: %s\n' "$*"
- if [[ $DRY_RUN -eq 0 ]]; then
- "$@"
- fi
- }
- source_exists="$(run_sql "EXISTS TABLE ${DB_NAME}.${SRC_TABLE} FORMAT TabSeparatedRaw")"
- if [[ "$source_exists" != "1" ]]; then
- echo "Source table ${DB_NAME}.${SRC_TABLE} does not exist." >&2
- exit 1
- fi
- dst_exists="$(run_sql "EXISTS TABLE ${DB_NAME}.${DST_TABLE} FORMAT TabSeparatedRaw")"
- backup_exists="$(run_sql "EXISTS TABLE ${DB_NAME}.${BACKUP_TABLE} FORMAT TabSeparatedRaw")"
- if [[ $FORCE -eq 0 && "$dst_exists" == "1" ]]; then
- if [[ $RECONCILE_ONLY -eq 0 ]]; then
- echo "Destination table ${DB_NAME}.${DST_TABLE} already exists. Use --force only if you know it is safe." >&2
- exit 1
- fi
- fi
- if [[ $FORCE -eq 0 && "$backup_exists" == "1" ]]; then
- if [[ $RECONCILE_ONLY -eq 0 ]]; then
- echo "Backup table ${DB_NAME}.${BACKUP_TABLE} already exists. Use --force only if you know it is safe." >&2
- exit 1
- fi
- fi
- month_list="$(run_sql "SELECT DISTINCT toYYYYMM(time) AS ym FROM ${DB_NAME}.${SRC_TABLE} ORDER BY ym FORMAT TSV")"
- if [[ -z "$month_list" ]]; then
- echo "No data found in ${DB_NAME}.${SRC_TABLE}." >&2
- exit 1
- fi
- historical_months=()
- while IFS= read -r ym; do
- [[ -z "$ym" ]] && continue
- if [[ "$ym" != "$CURRENT_MONTH" ]]; then
- historical_months+=("$ym")
- fi
- done <<< "$month_list"
- create_sql=$(cat <<EOF
- CREATE TABLE IF NOT EXISTS ${DB_NAME}.${DST_TABLE}
- (
- log_id String,
- uid String,
- project UInt8,
- os Nullable(String),
- version Nullable(String),
- event String,
- time DateTime,
- res Nullable(String),
- \`from\` Nullable(String),
- position Nullable(Int32),
- duration Nullable(UInt32),
- ad_type Nullable(String),
- ad_src Nullable(String),
- revenue Nullable(Float64),
- cc Nullable(String),
- raw_json_data String
- )
- ENGINE = MergeTree
- PARTITION BY toYYYYMM(time)
- ORDER BY (uid, time, event)
- PRIMARY KEY (uid, time)
- TTL time + INTERVAL 6 MONTH
- EOF
- )
- printf 'Migration Plan\n'
- printf 'Database: %s\n' "$DB_NAME"
- printf 'Source table: %s\n' "$SRC_TABLE"
- printf 'Destination table: %s\n' "$DST_TABLE"
- printf 'Backup table: %s\n' "$BACKUP_TABLE"
- printf 'Current month for cutover: %s\n' "$CURRENT_MONTH"
- printf 'Historical months to backfill: %s\n' "${historical_months[*]:-(none)}"
- printf 'PM2 control: %s\n' "$([[ $SKIP_PM2 -eq 0 ]] && echo enabled || echo skipped)"
- printf 'ClickHouse execution mode: %s\n' "$CLICKHOUSE_EXEC_MODE"
- if [[ "$CLICKHOUSE_EXEC_MODE" == "docker" ]]; then
- printf 'ClickHouse container: %s\n' "$CLICKHOUSE_CONTAINER"
- fi
- printf 'Mode: %s\n' "$([[ $DRY_RUN -eq 0 ]] && echo execute || echo dry-run)"
- printf 'Reconcile only: %s\n' "$([[ $RECONCILE_ONLY -eq 1 ]] && echo yes || echo no)"
- diff_sql="SELECT src.ym, src.rows AS source_rows, ifNull(dst.rows, 0) AS destination_rows, src.rows - ifNull(dst.rows, 0) AS missing_rows FROM (SELECT toYYYYMM(time) AS ym, count() AS rows FROM ${DB_NAME}.${SRC_TABLE} GROUP BY ym) AS src LEFT JOIN (SELECT toYYYYMM(time) AS ym, count() AS rows FROM ${DB_NAME}.${DST_TABLE} GROUP BY ym) AS dst USING (ym) ORDER BY ym FORMAT PrettyCompact"
- current_month_start="${CURRENT_MONTH:0:4}-${CURRENT_MONTH:4:2}-01 00:00:00"
- build_reconcile_sql_for_month() {
- local ym="$1"
- local month_start="${ym:0:4}-${ym:4:2}-01 00:00:00"
- cat <<EOF
- INSERT INTO ${DB_NAME}.${DST_TABLE}
- SELECT src.*
- FROM ${DB_NAME}.${SRC_TABLE} AS src
- LEFT JOIN ${DB_NAME}.${DST_TABLE} AS dst ON src.log_id = dst.log_id
- WHERE src.time >= toDateTime('${month_start}')
- AND src.time < addMonths(toDateTime('${month_start}'), 1)
- AND empty(dst.log_id)
- EOF
- }
- print_sql_block "Per-month diff" "$diff_sql"
- if [[ $RECONCILE_ONLY -eq 1 ]]; then
- reconcile_months="$(run_sql "SELECT src.ym FROM (SELECT toYYYYMM(time) AS ym, count() AS rows FROM ${DB_NAME}.${SRC_TABLE} GROUP BY ym) AS src LEFT JOIN (SELECT toYYYYMM(time) AS ym, count() AS rows FROM ${DB_NAME}.${DST_TABLE} GROUP BY ym) AS dst USING (ym) WHERE src.rows > ifNull(dst.rows, 0) ORDER BY ym FORMAT TSV")"
- if [[ -z "$reconcile_months" ]]; then
- printf '\nNo missing months detected. Source and destination are already aligned by month.\n'
- exit 0
- fi
- while IFS= read -r ym; do
- [[ -z "$ym" ]] && continue
- reconcile_sql="$(build_reconcile_sql_for_month "$ym")"
- print_sql_block "Reconcile month ${ym} missing rows" "$reconcile_sql"
- done <<< "$reconcile_months"
- if [[ $DRY_RUN -eq 1 ]]; then
- printf '\nDry-run only. Re-run with --execute --reconcile-only to perform reconciliation.\n'
- exit 0
- fi
- if [[ $SKIP_PM2 -eq 0 ]]; then
- run_cmd_step "Stop PM2 ingestor" pm2 stop "${INGESTOR_PM2_NAME}"
- fi
- printf '\nPer-month diff before reconcile:\n'
- run_sql "$diff_sql"
- while IFS= read -r ym; do
- [[ -z "$ym" ]] && continue
- reconcile_sql="$(build_reconcile_sql_for_month "$ym")"
- run_sql_step "Reconcile month ${ym} missing rows" "$reconcile_sql"
- done <<< "$reconcile_months"
- old_rows="$(run_sql "SELECT count() FROM ${DB_NAME}.${SRC_TABLE} FORMAT TabSeparatedRaw")"
- new_rows="$(run_sql "SELECT count() FROM ${DB_NAME}.${DST_TABLE} FORMAT TabSeparatedRaw")"
- printf '\nRow count check after reconcile: source=%s destination=%s\n' "$old_rows" "$new_rows"
- printf '\nPer-month diff after reconcile:\n'
- run_sql "$diff_sql"
- if [[ $SKIP_PM2 -eq 0 ]]; then
- run_cmd_step "Start PM2 ingestor" pm2 start "${INGESTOR_PM2_NAME}"
- fi
- exit 0
- fi
- print_sql_block "Create destination table" "$create_sql"
- for ym in "${historical_months[@]}"; do
- month_start="${ym:0:4}-${ym:4:2}-01 00:00:00"
- insert_sql="INSERT INTO ${DB_NAME}.${DST_TABLE} SELECT * FROM ${DB_NAME}.${SRC_TABLE} WHERE time >= toDateTime('${month_start}') AND time < addMonths(toDateTime('${month_start}'), 1)"
- print_sql_block "Backfill month ${ym}" "$insert_sql"
- done
- current_month_sql="INSERT INTO ${DB_NAME}.${DST_TABLE} SELECT * FROM ${DB_NAME}.${SRC_TABLE} WHERE time >= toDateTime('${current_month_start}')"
- print_sql_block "Cutover current month backfill" "$current_month_sql"
- rename_sql="RENAME TABLE ${DB_NAME}.${SRC_TABLE} TO ${DB_NAME}.${BACKUP_TABLE}, ${DB_NAME}.${DST_TABLE} TO ${DB_NAME}.${SRC_TABLE}"
- print_sql_block "Atomic rename" "$rename_sql"
- rollback_sql="RENAME TABLE ${DB_NAME}.${SRC_TABLE} TO ${DB_NAME}.${DST_TABLE}_bad, ${DB_NAME}.${BACKUP_TABLE} TO ${DB_NAME}.${SRC_TABLE}"
- print_sql_block "Rollback if needed" "$rollback_sql"
- if [[ $DRY_RUN -eq 1 ]]; then
- printf '\nDry-run only. Re-run with --execute to perform the migration.\n'
- exit 0
- fi
- run_sql_step "Create partitioned destination table" "$create_sql"
- for ym in "${historical_months[@]}"; do
- month_start="${ym:0:4}-${ym:4:2}-01 00:00:00"
- insert_sql="INSERT INTO ${DB_NAME}.${DST_TABLE} SELECT * FROM ${DB_NAME}.${SRC_TABLE} WHERE time >= toDateTime('${month_start}') AND time < addMonths(toDateTime('${month_start}'), 1)"
- run_sql_step "Backfill month ${ym}" "$insert_sql"
- done
- if [[ $SKIP_PM2 -eq 0 ]]; then
- run_cmd_step "Stop PM2 ingestor" pm2 stop "${INGESTOR_PM2_NAME}"
- fi
- run_sql_step "Backfill current month ${CURRENT_MONTH}" "$current_month_sql"
- old_rows="$(run_sql "SELECT count() FROM ${DB_NAME}.${SRC_TABLE} FORMAT TabSeparatedRaw")"
- new_rows="$(run_sql "SELECT count() FROM ${DB_NAME}.${DST_TABLE} FORMAT TabSeparatedRaw")"
- printf '\nRow count check: source=%s destination=%s\n' "$old_rows" "$new_rows"
- if [[ "$old_rows" != "$new_rows" ]]; then
- echo "Row count mismatch detected. Migration aborted before rename." >&2
- printf '\nPer-month diff:\n'
- run_sql "$diff_sql"
- if [[ $SKIP_PM2 -eq 0 ]]; then
- pm2 start "$INGESTOR_PM2_NAME"
- fi
- exit 1
- fi
- run_sql_step "Atomic rename" "$rename_sql"
- if [[ $SKIP_PM2 -eq 0 ]]; then
- run_cmd_step "Start PM2 ingestor" pm2 start "${INGESTOR_PM2_NAME}"
- fi
- verify_sql="SELECT partition, sum(rows) AS rows, formatReadableSize(sum(data_compressed_bytes)) AS compressed_size, count() AS parts FROM system.parts WHERE database = '${DB_NAME}' AND \`table\` = '${SRC_TABLE}' AND active = 1 GROUP BY partition ORDER BY partition"
- run_sql_step "Verify partitions" "$verify_sql"
- printf '\nMigration completed. Keep %s.%s for rollback until you are satisfied.\n' "$DB_NAME" "$BACKUP_TABLE"
|