migrate-clickhouse-events-partition.sh 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. #!/usr/bin/env bash
  2. set -euo pipefail
  3. DB_NAME="${CLICKHOUSE_DATABASE:-omsdb}"
  4. SRC_TABLE="${CLICKHOUSE_SRC_TABLE:-events}"
  5. DST_TABLE="${CLICKHOUSE_DST_TABLE:-events_v2}"
  6. BACKUP_TABLE="${CLICKHOUSE_BACKUP_TABLE:-events_backup}"
  7. INGESTOR_PM2_NAME="${INGESTOR_PM2_NAME:-ingestor-service}"
  8. CURRENT_MONTH="${CURRENT_MONTH:-$(date +%Y%m)}"
  9. DRY_RUN=1
  10. SKIP_PM2=0
  11. FORCE=0
  12. RECONCILE_ONLY=0
  13. usage() {
  14. cat <<'EOF'
  15. Usage:
  16. bash oms/scripts/migrate-clickhouse-events-partition.sh [options]
  17. Options:
  18. --execute Actually run the migration. Without this flag, the script is a dry-run.
  19. --skip-pm2 Do not stop/start PM2 ingestor-service during cutover.
  20. --force Allow destination/backup tables to already exist.
  21. --reconcile-only Only reconcile missing rows from source into destination, without create/rename.
  22. --current-month Override the current month used for cutover backfill, format YYYYMM.
  23. --help Show this help message.
  24. Environment variables:
  25. CLICKHOUSE_HOST Default: localhost
  26. CLICKHOUSE_PORT Default: 9000
  27. CLICKHOUSE_USER Optional
  28. CLICKHOUSE_PASSWORD Optional
  29. CLICKHOUSE_CONTAINER Default: clickhouse
  30. CLICKHOUSE_USE_DOCKER Default: auto (prefer docker, fallback to native client)
  31. CLICKHOUSE_DATABASE Default: omsdb
  32. CLICKHOUSE_SRC_TABLE Default: events
  33. CLICKHOUSE_DST_TABLE Default: events_v2
  34. CLICKHOUSE_BACKUP_TABLE Default: events_backup
  35. INGESTOR_PM2_NAME Default: ingestor-service
  36. CURRENT_MONTH Default: current system month in YYYYMM
  37. Notes:
  38. - The script assumes ClickHouse writes come only from ingestor-service.
  39. - By default it runs clickhouse-client inside the Docker container named by CLICKHOUSE_CONTAINER.
  40. - event-api can stay online; RabbitMQ will buffer events while ingestor-service is paused.
  41. - Review the printed plan first, then rerun with --execute.
  42. EOF
  43. }
  44. while [[ $# -gt 0 ]]; do
  45. case "$1" in
  46. --execute)
  47. DRY_RUN=0
  48. shift
  49. ;;
  50. --dry-run)
  51. DRY_RUN=1
  52. shift
  53. ;;
  54. --skip-pm2)
  55. SKIP_PM2=1
  56. shift
  57. ;;
  58. --force)
  59. FORCE=1
  60. shift
  61. ;;
  62. --reconcile-only)
  63. RECONCILE_ONLY=1
  64. shift
  65. ;;
  66. --current-month)
  67. CURRENT_MONTH="$2"
  68. shift 2
  69. ;;
  70. --help|-h)
  71. usage
  72. exit 0
  73. ;;
  74. *)
  75. echo "Unknown argument: $1" >&2
  76. usage
  77. exit 1
  78. ;;
  79. esac
  80. done
  81. if [[ ! "$CURRENT_MONTH" =~ ^[0-9]{6}$ ]]; then
  82. echo "CURRENT_MONTH must be in YYYYMM format, got: $CURRENT_MONTH" >&2
  83. exit 1
  84. fi
  85. CLICKHOUSE_HOST="${CLICKHOUSE_HOST:-localhost}"
  86. CLICKHOUSE_PORT="${CLICKHOUSE_PORT:-9000}"
  87. CLICKHOUSE_USER="${CLICKHOUSE_USER:-}"
  88. CLICKHOUSE_PASSWORD="${CLICKHOUSE_PASSWORD:-}"
  89. CLICKHOUSE_CONTAINER="${CLICKHOUSE_CONTAINER:-clickhouse}"
  90. CLICKHOUSE_USE_DOCKER="${CLICKHOUSE_USE_DOCKER:-auto}"
  91. CH_ARGS=(
  92. --host "$CLICKHOUSE_HOST"
  93. --port "$CLICKHOUSE_PORT"
  94. --database "$DB_NAME"
  95. --multiquery
  96. )
  97. if [[ -n "$CLICKHOUSE_USER" ]]; then
  98. CH_ARGS+=(--user "$CLICKHOUSE_USER")
  99. fi
  100. if [[ -n "$CLICKHOUSE_PASSWORD" ]]; then
  101. CH_ARGS+=(--password "$CLICKHOUSE_PASSWORD")
  102. fi
  103. require_command() {
  104. if ! command -v "$1" >/dev/null 2>&1; then
  105. echo "Required command not found: $1" >&2
  106. exit 1
  107. fi
  108. }
  109. docker_available=0
  110. native_clickhouse_available=0
  111. if command -v docker >/dev/null 2>&1; then
  112. docker_available=1
  113. fi
  114. if command -v clickhouse-client >/dev/null 2>&1; then
  115. native_clickhouse_available=1
  116. fi
  117. case "$CLICKHOUSE_USE_DOCKER" in
  118. auto)
  119. if [[ $docker_available -eq 1 ]]; then
  120. CLICKHOUSE_EXEC_MODE="docker"
  121. elif [[ $native_clickhouse_available -eq 1 ]]; then
  122. CLICKHOUSE_EXEC_MODE="native"
  123. else
  124. echo "Neither docker nor clickhouse-client is available on this host." >&2
  125. exit 1
  126. fi
  127. ;;
  128. 1|true|yes|docker)
  129. if [[ $docker_available -eq 0 ]]; then
  130. echo "Docker is required but not available." >&2
  131. exit 1
  132. fi
  133. CLICKHOUSE_EXEC_MODE="docker"
  134. ;;
  135. 0|false|no|native)
  136. if [[ $native_clickhouse_available -eq 0 ]]; then
  137. echo "clickhouse-client is required but not available." >&2
  138. exit 1
  139. fi
  140. CLICKHOUSE_EXEC_MODE="native"
  141. ;;
  142. *)
  143. echo "Unsupported CLICKHOUSE_USE_DOCKER value: $CLICKHOUSE_USE_DOCKER" >&2
  144. exit 1
  145. ;;
  146. esac
  147. if [[ "$CLICKHOUSE_EXEC_MODE" == "docker" ]]; then
  148. if ! docker inspect "$CLICKHOUSE_CONTAINER" >/dev/null 2>&1; then
  149. echo "ClickHouse container not found: $CLICKHOUSE_CONTAINER" >&2
  150. exit 1
  151. fi
  152. fi
  153. if [[ $SKIP_PM2 -eq 0 ]]; then
  154. require_command pm2
  155. fi
  156. run_sql() {
  157. local sql="$1"
  158. if [[ "$CLICKHOUSE_EXEC_MODE" == "docker" ]]; then
  159. docker exec "$CLICKHOUSE_CONTAINER" clickhouse-client "${CH_ARGS[@]}" --query "$sql"
  160. else
  161. clickhouse-client "${CH_ARGS[@]}" --query "$sql" </dev/null
  162. fi
  163. }
  164. print_sql_block() {
  165. local title="$1"
  166. local sql="$2"
  167. printf '\n[%s]\n%s\n' "$title" "$sql"
  168. }
  169. run_sql_step() {
  170. local title="$1"
  171. local sql="$2"
  172. printf '\n== %s ==\n' "$title"
  173. printf 'SQL:\n%s\n' "$sql"
  174. if [[ $DRY_RUN -eq 0 ]]; then
  175. run_sql "$sql"
  176. fi
  177. }
  178. run_cmd_step() {
  179. local title="$1"
  180. shift
  181. printf '\n== %s ==\n' "$title"
  182. printf 'CMD: %s\n' "$*"
  183. if [[ $DRY_RUN -eq 0 ]]; then
  184. "$@"
  185. fi
  186. }
  187. source_exists="$(run_sql "EXISTS TABLE ${DB_NAME}.${SRC_TABLE} FORMAT TabSeparatedRaw")"
  188. if [[ "$source_exists" != "1" ]]; then
  189. echo "Source table ${DB_NAME}.${SRC_TABLE} does not exist." >&2
  190. exit 1
  191. fi
  192. dst_exists="$(run_sql "EXISTS TABLE ${DB_NAME}.${DST_TABLE} FORMAT TabSeparatedRaw")"
  193. backup_exists="$(run_sql "EXISTS TABLE ${DB_NAME}.${BACKUP_TABLE} FORMAT TabSeparatedRaw")"
  194. if [[ $FORCE -eq 0 && "$dst_exists" == "1" ]]; then
  195. if [[ $RECONCILE_ONLY -eq 0 ]]; then
  196. echo "Destination table ${DB_NAME}.${DST_TABLE} already exists. Use --force only if you know it is safe." >&2
  197. exit 1
  198. fi
  199. fi
  200. if [[ $FORCE -eq 0 && "$backup_exists" == "1" ]]; then
  201. if [[ $RECONCILE_ONLY -eq 0 ]]; then
  202. echo "Backup table ${DB_NAME}.${BACKUP_TABLE} already exists. Use --force only if you know it is safe." >&2
  203. exit 1
  204. fi
  205. fi
  206. month_list="$(run_sql "SELECT DISTINCT toYYYYMM(time) AS ym FROM ${DB_NAME}.${SRC_TABLE} ORDER BY ym FORMAT TSV")"
  207. if [[ -z "$month_list" ]]; then
  208. echo "No data found in ${DB_NAME}.${SRC_TABLE}." >&2
  209. exit 1
  210. fi
  211. historical_months=()
  212. while IFS= read -r ym; do
  213. [[ -z "$ym" ]] && continue
  214. if [[ "$ym" != "$CURRENT_MONTH" ]]; then
  215. historical_months+=("$ym")
  216. fi
  217. done <<< "$month_list"
  218. create_sql=$(cat <<EOF
  219. CREATE TABLE IF NOT EXISTS ${DB_NAME}.${DST_TABLE}
  220. (
  221. log_id String,
  222. uid String,
  223. project UInt8,
  224. os Nullable(String),
  225. version Nullable(String),
  226. event String,
  227. time DateTime,
  228. res Nullable(String),
  229. \`from\` Nullable(String),
  230. position Nullable(Int32),
  231. duration Nullable(UInt32),
  232. ad_type Nullable(String),
  233. ad_src Nullable(String),
  234. revenue Nullable(Float64),
  235. cc Nullable(String),
  236. raw_json_data String
  237. )
  238. ENGINE = MergeTree
  239. PARTITION BY toYYYYMM(time)
  240. ORDER BY (uid, time, event)
  241. PRIMARY KEY (uid, time)
  242. TTL time + INTERVAL 6 MONTH
  243. EOF
  244. )
  245. printf 'Migration Plan\n'
  246. printf 'Database: %s\n' "$DB_NAME"
  247. printf 'Source table: %s\n' "$SRC_TABLE"
  248. printf 'Destination table: %s\n' "$DST_TABLE"
  249. printf 'Backup table: %s\n' "$BACKUP_TABLE"
  250. printf 'Current month for cutover: %s\n' "$CURRENT_MONTH"
  251. printf 'Historical months to backfill: %s\n' "${historical_months[*]:-(none)}"
  252. printf 'PM2 control: %s\n' "$([[ $SKIP_PM2 -eq 0 ]] && echo enabled || echo skipped)"
  253. printf 'ClickHouse execution mode: %s\n' "$CLICKHOUSE_EXEC_MODE"
  254. if [[ "$CLICKHOUSE_EXEC_MODE" == "docker" ]]; then
  255. printf 'ClickHouse container: %s\n' "$CLICKHOUSE_CONTAINER"
  256. fi
  257. printf 'Mode: %s\n' "$([[ $DRY_RUN -eq 0 ]] && echo execute || echo dry-run)"
  258. printf 'Reconcile only: %s\n' "$([[ $RECONCILE_ONLY -eq 1 ]] && echo yes || echo no)"
  259. diff_sql="SELECT src.ym, src.rows AS source_rows, ifNull(dst.rows, 0) AS destination_rows, src.rows - ifNull(dst.rows, 0) AS missing_rows FROM (SELECT toYYYYMM(time) AS ym, count() AS rows FROM ${DB_NAME}.${SRC_TABLE} GROUP BY ym) AS src LEFT JOIN (SELECT toYYYYMM(time) AS ym, count() AS rows FROM ${DB_NAME}.${DST_TABLE} GROUP BY ym) AS dst USING (ym) ORDER BY ym FORMAT PrettyCompact"
  260. current_month_start="${CURRENT_MONTH:0:4}-${CURRENT_MONTH:4:2}-01 00:00:00"
  261. build_reconcile_sql_for_month() {
  262. local ym="$1"
  263. local month_start="${ym:0:4}-${ym:4:2}-01 00:00:00"
  264. cat <<EOF
  265. INSERT INTO ${DB_NAME}.${DST_TABLE}
  266. SELECT src.*
  267. FROM ${DB_NAME}.${SRC_TABLE} AS src
  268. LEFT JOIN ${DB_NAME}.${DST_TABLE} AS dst ON src.log_id = dst.log_id
  269. WHERE src.time >= toDateTime('${month_start}')
  270. AND src.time < addMonths(toDateTime('${month_start}'), 1)
  271. AND empty(dst.log_id)
  272. EOF
  273. }
  274. print_sql_block "Per-month diff" "$diff_sql"
  275. if [[ $RECONCILE_ONLY -eq 1 ]]; then
  276. reconcile_months="$(run_sql "SELECT src.ym FROM (SELECT toYYYYMM(time) AS ym, count() AS rows FROM ${DB_NAME}.${SRC_TABLE} GROUP BY ym) AS src LEFT JOIN (SELECT toYYYYMM(time) AS ym, count() AS rows FROM ${DB_NAME}.${DST_TABLE} GROUP BY ym) AS dst USING (ym) WHERE src.rows > ifNull(dst.rows, 0) ORDER BY ym FORMAT TSV")"
  277. if [[ -z "$reconcile_months" ]]; then
  278. printf '\nNo missing months detected. Source and destination are already aligned by month.\n'
  279. exit 0
  280. fi
  281. while IFS= read -r ym; do
  282. [[ -z "$ym" ]] && continue
  283. reconcile_sql="$(build_reconcile_sql_for_month "$ym")"
  284. print_sql_block "Reconcile month ${ym} missing rows" "$reconcile_sql"
  285. done <<< "$reconcile_months"
  286. if [[ $DRY_RUN -eq 1 ]]; then
  287. printf '\nDry-run only. Re-run with --execute --reconcile-only to perform reconciliation.\n'
  288. exit 0
  289. fi
  290. if [[ $SKIP_PM2 -eq 0 ]]; then
  291. run_cmd_step "Stop PM2 ingestor" pm2 stop "${INGESTOR_PM2_NAME}"
  292. fi
  293. printf '\nPer-month diff before reconcile:\n'
  294. run_sql "$diff_sql"
  295. while IFS= read -r ym; do
  296. [[ -z "$ym" ]] && continue
  297. reconcile_sql="$(build_reconcile_sql_for_month "$ym")"
  298. run_sql_step "Reconcile month ${ym} missing rows" "$reconcile_sql"
  299. done <<< "$reconcile_months"
  300. old_rows="$(run_sql "SELECT count() FROM ${DB_NAME}.${SRC_TABLE} FORMAT TabSeparatedRaw")"
  301. new_rows="$(run_sql "SELECT count() FROM ${DB_NAME}.${DST_TABLE} FORMAT TabSeparatedRaw")"
  302. printf '\nRow count check after reconcile: source=%s destination=%s\n' "$old_rows" "$new_rows"
  303. printf '\nPer-month diff after reconcile:\n'
  304. run_sql "$diff_sql"
  305. if [[ $SKIP_PM2 -eq 0 ]]; then
  306. run_cmd_step "Start PM2 ingestor" pm2 start "${INGESTOR_PM2_NAME}"
  307. fi
  308. exit 0
  309. fi
  310. print_sql_block "Create destination table" "$create_sql"
  311. for ym in "${historical_months[@]}"; do
  312. month_start="${ym:0:4}-${ym:4:2}-01 00:00:00"
  313. insert_sql="INSERT INTO ${DB_NAME}.${DST_TABLE} SELECT * FROM ${DB_NAME}.${SRC_TABLE} WHERE time >= toDateTime('${month_start}') AND time < addMonths(toDateTime('${month_start}'), 1)"
  314. print_sql_block "Backfill month ${ym}" "$insert_sql"
  315. done
  316. current_month_sql="INSERT INTO ${DB_NAME}.${DST_TABLE} SELECT * FROM ${DB_NAME}.${SRC_TABLE} WHERE time >= toDateTime('${current_month_start}')"
  317. print_sql_block "Cutover current month backfill" "$current_month_sql"
  318. rename_sql="RENAME TABLE ${DB_NAME}.${SRC_TABLE} TO ${DB_NAME}.${BACKUP_TABLE}, ${DB_NAME}.${DST_TABLE} TO ${DB_NAME}.${SRC_TABLE}"
  319. print_sql_block "Atomic rename" "$rename_sql"
  320. rollback_sql="RENAME TABLE ${DB_NAME}.${SRC_TABLE} TO ${DB_NAME}.${DST_TABLE}_bad, ${DB_NAME}.${BACKUP_TABLE} TO ${DB_NAME}.${SRC_TABLE}"
  321. print_sql_block "Rollback if needed" "$rollback_sql"
  322. if [[ $DRY_RUN -eq 1 ]]; then
  323. printf '\nDry-run only. Re-run with --execute to perform the migration.\n'
  324. exit 0
  325. fi
  326. run_sql_step "Create partitioned destination table" "$create_sql"
  327. for ym in "${historical_months[@]}"; do
  328. month_start="${ym:0:4}-${ym:4:2}-01 00:00:00"
  329. insert_sql="INSERT INTO ${DB_NAME}.${DST_TABLE} SELECT * FROM ${DB_NAME}.${SRC_TABLE} WHERE time >= toDateTime('${month_start}') AND time < addMonths(toDateTime('${month_start}'), 1)"
  330. run_sql_step "Backfill month ${ym}" "$insert_sql"
  331. done
  332. if [[ $SKIP_PM2 -eq 0 ]]; then
  333. run_cmd_step "Stop PM2 ingestor" pm2 stop "${INGESTOR_PM2_NAME}"
  334. fi
  335. run_sql_step "Backfill current month ${CURRENT_MONTH}" "$current_month_sql"
  336. old_rows="$(run_sql "SELECT count() FROM ${DB_NAME}.${SRC_TABLE} FORMAT TabSeparatedRaw")"
  337. new_rows="$(run_sql "SELECT count() FROM ${DB_NAME}.${DST_TABLE} FORMAT TabSeparatedRaw")"
  338. printf '\nRow count check: source=%s destination=%s\n' "$old_rows" "$new_rows"
  339. if [[ "$old_rows" != "$new_rows" ]]; then
  340. echo "Row count mismatch detected. Migration aborted before rename." >&2
  341. printf '\nPer-month diff:\n'
  342. run_sql "$diff_sql"
  343. if [[ $SKIP_PM2 -eq 0 ]]; then
  344. pm2 start "$INGESTOR_PM2_NAME"
  345. fi
  346. exit 1
  347. fi
  348. run_sql_step "Atomic rename" "$rename_sql"
  349. if [[ $SKIP_PM2 -eq 0 ]]; then
  350. run_cmd_step "Start PM2 ingestor" pm2 start "${INGESTOR_PM2_NAME}"
  351. fi
  352. verify_sql="SELECT partition, sum(rows) AS rows, formatReadableSize(sum(data_compressed_bytes)) AS compressed_size, count() AS parts FROM system.parts WHERE database = '${DB_NAME}' AND \`table\` = '${SRC_TABLE}' AND active = 1 GROUP BY partition ORDER BY partition"
  353. run_sql_step "Verify partitions" "$verify_sql"
  354. printf '\nMigration completed. Keep %s.%s for rollback until you are satisfied.\n' "$DB_NAME" "$BACKUP_TABLE"