migrate-clickhouse-events-partition.sh 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. #!/usr/bin/env bash
  2. set -euo pipefail
  3. DB_NAME="${CLICKHOUSE_DATABASE:-omsdb}"
  4. SRC_TABLE="${CLICKHOUSE_SRC_TABLE:-events}"
  5. DST_TABLE="${CLICKHOUSE_DST_TABLE:-events_v2}"
  6. BACKUP_TABLE="${CLICKHOUSE_BACKUP_TABLE:-events_backup}"
  7. INGESTOR_PM2_NAME="${INGESTOR_PM2_NAME:-ingestor-service}"
  8. CURRENT_MONTH="${CURRENT_MONTH:-$(date +%Y%m)}"
  9. DRY_RUN=1
  10. SKIP_PM2=0
  11. FORCE=0
  12. usage() {
  13. cat <<'EOF'
  14. Usage:
  15. bash oms/scripts/migrate-clickhouse-events-partition.sh [options]
  16. Options:
  17. --execute Actually run the migration. Without this flag, the script is a dry-run.
  18. --skip-pm2 Do not stop/start PM2 ingestor-service during cutover.
  19. --force Allow destination/backup tables to already exist.
  20. --current-month Override the current month used for cutover backfill, format YYYYMM.
  21. --help Show this help message.
  22. Environment variables:
  23. CLICKHOUSE_HOST Default: localhost
  24. CLICKHOUSE_PORT Default: 9000
  25. CLICKHOUSE_USER Optional
  26. CLICKHOUSE_PASSWORD Optional
  27. CLICKHOUSE_CONTAINER Default: clickhouse
  28. CLICKHOUSE_USE_DOCKER Default: auto (prefer docker, fallback to native client)
  29. CLICKHOUSE_DATABASE Default: omsdb
  30. CLICKHOUSE_SRC_TABLE Default: events
  31. CLICKHOUSE_DST_TABLE Default: events_v2
  32. CLICKHOUSE_BACKUP_TABLE Default: events_backup
  33. INGESTOR_PM2_NAME Default: ingestor-service
  34. CURRENT_MONTH Default: current system month in YYYYMM
  35. Notes:
  36. - The script assumes ClickHouse writes come only from ingestor-service.
  37. - By default it runs clickhouse-client inside the Docker container named by CLICKHOUSE_CONTAINER.
  38. - event-api can stay online; RabbitMQ will buffer events while ingestor-service is paused.
  39. - Review the printed plan first, then rerun with --execute.
  40. EOF
  41. }
  42. while [[ $# -gt 0 ]]; do
  43. case "$1" in
  44. --execute)
  45. DRY_RUN=0
  46. shift
  47. ;;
  48. --dry-run)
  49. DRY_RUN=1
  50. shift
  51. ;;
  52. --skip-pm2)
  53. SKIP_PM2=1
  54. shift
  55. ;;
  56. --force)
  57. FORCE=1
  58. shift
  59. ;;
  60. --current-month)
  61. CURRENT_MONTH="$2"
  62. shift 2
  63. ;;
  64. --help|-h)
  65. usage
  66. exit 0
  67. ;;
  68. *)
  69. echo "Unknown argument: $1" >&2
  70. usage
  71. exit 1
  72. ;;
  73. esac
  74. done
  75. if [[ ! "$CURRENT_MONTH" =~ ^[0-9]{6}$ ]]; then
  76. echo "CURRENT_MONTH must be in YYYYMM format, got: $CURRENT_MONTH" >&2
  77. exit 1
  78. fi
  79. CLICKHOUSE_HOST="${CLICKHOUSE_HOST:-localhost}"
  80. CLICKHOUSE_PORT="${CLICKHOUSE_PORT:-9000}"
  81. CLICKHOUSE_USER="${CLICKHOUSE_USER:-}"
  82. CLICKHOUSE_PASSWORD="${CLICKHOUSE_PASSWORD:-}"
  83. CLICKHOUSE_CONTAINER="${CLICKHOUSE_CONTAINER:-clickhouse}"
  84. CLICKHOUSE_USE_DOCKER="${CLICKHOUSE_USE_DOCKER:-auto}"
  85. CH_ARGS=(
  86. --host "$CLICKHOUSE_HOST"
  87. --port "$CLICKHOUSE_PORT"
  88. --database "$DB_NAME"
  89. --multiquery
  90. )
  91. if [[ -n "$CLICKHOUSE_USER" ]]; then
  92. CH_ARGS+=(--user "$CLICKHOUSE_USER")
  93. fi
  94. if [[ -n "$CLICKHOUSE_PASSWORD" ]]; then
  95. CH_ARGS+=(--password "$CLICKHOUSE_PASSWORD")
  96. fi
  97. require_command() {
  98. if ! command -v "$1" >/dev/null 2>&1; then
  99. echo "Required command not found: $1" >&2
  100. exit 1
  101. fi
  102. }
  103. docker_available=0
  104. native_clickhouse_available=0
  105. if command -v docker >/dev/null 2>&1; then
  106. docker_available=1
  107. fi
  108. if command -v clickhouse-client >/dev/null 2>&1; then
  109. native_clickhouse_available=1
  110. fi
  111. case "$CLICKHOUSE_USE_DOCKER" in
  112. auto)
  113. if [[ $docker_available -eq 1 ]]; then
  114. CLICKHOUSE_EXEC_MODE="docker"
  115. elif [[ $native_clickhouse_available -eq 1 ]]; then
  116. CLICKHOUSE_EXEC_MODE="native"
  117. else
  118. echo "Neither docker nor clickhouse-client is available on this host." >&2
  119. exit 1
  120. fi
  121. ;;
  122. 1|true|yes|docker)
  123. if [[ $docker_available -eq 0 ]]; then
  124. echo "Docker is required but not available." >&2
  125. exit 1
  126. fi
  127. CLICKHOUSE_EXEC_MODE="docker"
  128. ;;
  129. 0|false|no|native)
  130. if [[ $native_clickhouse_available -eq 0 ]]; then
  131. echo "clickhouse-client is required but not available." >&2
  132. exit 1
  133. fi
  134. CLICKHOUSE_EXEC_MODE="native"
  135. ;;
  136. *)
  137. echo "Unsupported CLICKHOUSE_USE_DOCKER value: $CLICKHOUSE_USE_DOCKER" >&2
  138. exit 1
  139. ;;
  140. esac
  141. if [[ "$CLICKHOUSE_EXEC_MODE" == "docker" ]]; then
  142. if ! docker inspect "$CLICKHOUSE_CONTAINER" >/dev/null 2>&1; then
  143. echo "ClickHouse container not found: $CLICKHOUSE_CONTAINER" >&2
  144. exit 1
  145. fi
  146. fi
  147. if [[ $SKIP_PM2 -eq 0 ]]; then
  148. require_command pm2
  149. fi
  150. run_sql() {
  151. local sql="$1"
  152. if [[ "$CLICKHOUSE_EXEC_MODE" == "docker" ]]; then
  153. docker exec -i "$CLICKHOUSE_CONTAINER" clickhouse-client "${CH_ARGS[@]}" --query "$sql"
  154. else
  155. clickhouse-client "${CH_ARGS[@]}" --query "$sql"
  156. fi
  157. }
  158. print_sql_block() {
  159. local title="$1"
  160. local sql="$2"
  161. printf '\n[%s]\n%s\n' "$title" "$sql"
  162. }
  163. run_step() {
  164. local title="$1"
  165. local command="$2"
  166. printf '\n== %s ==\n' "$title"
  167. printf '%s\n' "$command"
  168. if [[ $DRY_RUN -eq 0 ]]; then
  169. eval "$command"
  170. fi
  171. }
  172. source_exists="$(run_sql "EXISTS TABLE ${DB_NAME}.${SRC_TABLE} FORMAT TabSeparatedRaw")"
  173. if [[ "$source_exists" != "1" ]]; then
  174. echo "Source table ${DB_NAME}.${SRC_TABLE} does not exist." >&2
  175. exit 1
  176. fi
  177. dst_exists="$(run_sql "EXISTS TABLE ${DB_NAME}.${DST_TABLE} FORMAT TabSeparatedRaw")"
  178. backup_exists="$(run_sql "EXISTS TABLE ${DB_NAME}.${BACKUP_TABLE} FORMAT TabSeparatedRaw")"
  179. if [[ $FORCE -eq 0 && "$dst_exists" == "1" ]]; then
  180. echo "Destination table ${DB_NAME}.${DST_TABLE} already exists. Use --force only if you know it is safe." >&2
  181. exit 1
  182. fi
  183. if [[ $FORCE -eq 0 && "$backup_exists" == "1" ]]; then
  184. echo "Backup table ${DB_NAME}.${BACKUP_TABLE} already exists. Use --force only if you know it is safe." >&2
  185. exit 1
  186. fi
  187. month_list="$(run_sql "SELECT DISTINCT toYYYYMM(time) AS ym FROM ${DB_NAME}.${SRC_TABLE} ORDER BY ym FORMAT TSV")"
  188. if [[ -z "$month_list" ]]; then
  189. echo "No data found in ${DB_NAME}.${SRC_TABLE}." >&2
  190. exit 1
  191. fi
  192. historical_months=()
  193. while IFS= read -r ym; do
  194. [[ -z "$ym" ]] && continue
  195. if [[ "$ym" != "$CURRENT_MONTH" ]]; then
  196. historical_months+=("$ym")
  197. fi
  198. done <<< "$month_list"
  199. create_sql=$(cat <<EOF
  200. CREATE TABLE IF NOT EXISTS ${DB_NAME}.${DST_TABLE}
  201. (
  202. log_id String,
  203. uid String,
  204. project UInt8,
  205. os Nullable(String),
  206. version Nullable(String),
  207. event String,
  208. time DateTime,
  209. res Nullable(String),
  210. \`from\` Nullable(String),
  211. position Nullable(Int32),
  212. duration Nullable(UInt32),
  213. ad_type Nullable(String),
  214. ad_src Nullable(String),
  215. revenue Nullable(Float64),
  216. cc Nullable(String),
  217. raw_json_data String
  218. )
  219. ENGINE = MergeTree
  220. PARTITION BY toYYYYMM(time)
  221. ORDER BY (uid, time, event)
  222. PRIMARY KEY (uid, time)
  223. TTL time + INTERVAL 6 MONTH
  224. EOF
  225. )
  226. printf 'Migration Plan\n'
  227. printf 'Database: %s\n' "$DB_NAME"
  228. printf 'Source table: %s\n' "$SRC_TABLE"
  229. printf 'Destination table: %s\n' "$DST_TABLE"
  230. printf 'Backup table: %s\n' "$BACKUP_TABLE"
  231. printf 'Current month for cutover: %s\n' "$CURRENT_MONTH"
  232. printf 'Historical months to backfill: %s\n' "${historical_months[*]:-(none)}"
  233. printf 'PM2 control: %s\n' "$([[ $SKIP_PM2 -eq 0 ]] && echo enabled || echo skipped)"
  234. printf 'ClickHouse execution mode: %s\n' "$CLICKHOUSE_EXEC_MODE"
  235. if [[ "$CLICKHOUSE_EXEC_MODE" == "docker" ]]; then
  236. printf 'ClickHouse container: %s\n' "$CLICKHOUSE_CONTAINER"
  237. fi
  238. printf 'Mode: %s\n' "$([[ $DRY_RUN -eq 0 ]] && echo execute || echo dry-run)"
  239. print_sql_block "Create destination table" "$create_sql"
  240. for ym in "${historical_months[@]}"; do
  241. month_start="${ym:0:4}-${ym:4:2}-01 00:00:00"
  242. insert_sql="INSERT INTO ${DB_NAME}.${DST_TABLE} SELECT * FROM ${DB_NAME}.${SRC_TABLE} WHERE time >= toDateTime('${month_start}') AND time < addMonths(toDateTime('${month_start}'), 1)"
  243. print_sql_block "Backfill month ${ym}" "$insert_sql"
  244. done
  245. current_month_start="${CURRENT_MONTH:0:4}-${CURRENT_MONTH:4:2}-01 00:00:00"
  246. current_month_sql="INSERT INTO ${DB_NAME}.${DST_TABLE} SELECT * FROM ${DB_NAME}.${SRC_TABLE} WHERE time >= toDateTime('${current_month_start}') AND time < now()"
  247. print_sql_block "Cutover current month backfill" "$current_month_sql"
  248. rename_sql="RENAME TABLE ${DB_NAME}.${SRC_TABLE} TO ${DB_NAME}.${BACKUP_TABLE}, ${DB_NAME}.${DST_TABLE} TO ${DB_NAME}.${SRC_TABLE}"
  249. print_sql_block "Atomic rename" "$rename_sql"
  250. rollback_sql="RENAME TABLE ${DB_NAME}.${SRC_TABLE} TO ${DB_NAME}.${DST_TABLE}_bad, ${DB_NAME}.${BACKUP_TABLE} TO ${DB_NAME}.${SRC_TABLE}"
  251. print_sql_block "Rollback if needed" "$rollback_sql"
  252. if [[ $DRY_RUN -eq 1 ]]; then
  253. printf '\nDry-run only. Re-run with --execute to perform the migration.\n'
  254. exit 0
  255. fi
  256. run_step "Create partitioned destination table" "run_sql \"$create_sql\""
  257. for ym in "${historical_months[@]}"; do
  258. month_start="${ym:0:4}-${ym:4:2}-01 00:00:00"
  259. insert_sql="INSERT INTO ${DB_NAME}.${DST_TABLE} SELECT * FROM ${DB_NAME}.${SRC_TABLE} WHERE time >= toDateTime('${month_start}') AND time < addMonths(toDateTime('${month_start}'), 1)"
  260. run_step "Backfill month ${ym}" "run_sql \"$insert_sql\""
  261. done
  262. if [[ $SKIP_PM2 -eq 0 ]]; then
  263. run_step "Stop PM2 ingestor" "pm2 stop ${INGESTOR_PM2_NAME}"
  264. fi
  265. run_step "Backfill current month ${CURRENT_MONTH}" "run_sql \"$current_month_sql\""
  266. old_rows="$(run_sql "SELECT count() FROM ${DB_NAME}.${SRC_TABLE} FORMAT TabSeparatedRaw")"
  267. new_rows="$(run_sql "SELECT count() FROM ${DB_NAME}.${DST_TABLE} FORMAT TabSeparatedRaw")"
  268. printf '\nRow count check: source=%s destination=%s\n' "$old_rows" "$new_rows"
  269. if [[ "$old_rows" != "$new_rows" ]]; then
  270. echo "Row count mismatch detected. Migration aborted before rename." >&2
  271. if [[ $SKIP_PM2 -eq 0 ]]; then
  272. pm2 start "$INGESTOR_PM2_NAME"
  273. fi
  274. exit 1
  275. fi
  276. run_step "Atomic rename" "run_sql \"$rename_sql\""
  277. if [[ $SKIP_PM2 -eq 0 ]]; then
  278. run_step "Start PM2 ingestor" "pm2 start ${INGESTOR_PM2_NAME}"
  279. fi
  280. verify_sql="SELECT partition, sum(rows) AS rows, formatReadableSize(sum(data_compressed_bytes)) AS compressed_size, count() AS parts FROM system.parts WHERE database = '${DB_NAME}' AND \`table\` = '${SRC_TABLE}' AND active = 1 GROUP BY partition ORDER BY partition"
  281. run_step "Verify partitions" "run_sql \"$verify_sql\""
  282. printf '\nMigration completed. Keep %s.%s for rollback until you are satisfied.\n' "$DB_NAME" "$BACKUP_TABLE"