#define _LARGEFILE_SOURCE
#define _FILE_OFFSET_BITS 64

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>
#include <assert.h>

typedef uint32_t hashvalue_t;    // ハッシュ値用の型
typedef uint64_t rowid_t;        // 行番号の型

#define TEXT_LENGTH_MAX (119)    // テキストデータの1行の最大文字数(改行込み)
#define HASH_TABLES_MAX (8192)   // 作成するハッシュテーブルの数
#define INDEX_READ_CACHE (2048)  // 結合時一度に読み込む索引のレコード数
#define HASH_READ_CACHE (256)    // 重複行確認の時一度に読み込むハッシュテーブルのレコード数(スレッド単位)

static char WORKDIR[32];         // ハッシュテーブルの作成先
static int  index_fd = -1;       // 索引用fd

/* スレッド関係 */
#define INSERT_THREADS_MAX (100) // 同時起動スレッド最大数
static pthread_mutex_t hash_mutex[ HASH_TABLES_MAX ]; // flockの代わり
static struct 
{
  pthread_mutex_t mutex;        // メンバ書き換え時の排他制御
  pthread_cond_t  cond_end;     // スレッド終了通知用
  int             now_threads;  // 現在起動中のスレッド数
  rowid_t         now_rowid;    // 現在読み込み中の行番号(行数)
} tc = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };

// ハッシュテーブルに格納する1レコード
typedef struct tagHASH_RECORD
{
  rowid_t rowid;  // 行番号
  size_t  length; // 文字数
  char    text[ TEXT_LENGTH_MAX + 1 ];  // データ
} HASH_RECORD;

// アクセスパス
typedef struct tagRAW_INDEX
{
  rowid_t     linenum;    // 行番号
  hashvalue_t hash_value; // ハッシュ値
  off_t       seek;       // ハッシュテーブル内での位置
} RAW_INDEX;

// 文字列からハッシュ値を計算して返す
static hashvalue_t calcHashValue(const char *s, const size_t bufsize)
{
  const uint32_t    *source        = (const hashvalue_t *)s;
  const size_t      source_bufsize = bufsize/sizeof(hashvalue_t); // 細かいことは気にしない^^;
  hashvalue_t       hash_value     = 0;

  for( size_t N=0; N<source_bufsize; N ++) { hash_value += *source ++; }

  return hash_value % HASH_TABLES_MAX;
}

static size_t hist_cursor = 0;
static struct 
{
  hashvalue_t val;
  int         fd;
} fd_history[ 0x80 ];

// ヒストリからハッシュテーブルを検索無ければオープン
static int history_fd_open(hashvalue_t hash_value, int use_write )
{
  char table_name[ sizeof(WORKDIR) + 32 ];
  int    table_fd = -1;
  size_t N        = 0;

  for( N=0; N<0x80; N++ )
  {
    if( fd_history[N].val == hash_value )
    {
      table_fd = fd_history[N].fd;
      break;
    }
  }
  if( table_fd == -1 )
  {
    // ヒストリ内になかったのでヒストリの置き換え
    snprintf(table_name, sizeof(table_name), "%s/HASH_%lu",WORKDIR, hash_value);
    if( use_write == 0 ) {
      if( (table_fd = open(table_name, O_RDONLY)) == -1 ){ return -1; }
    } else {
      if( (table_fd = open(table_name, O_RDWR | O_CREAT, 0600)) == -1 ){ return -1; }
    }
    if( fd_history[hist_cursor].fd != -1 ) close( fd_history[hist_cursor].fd );
    fd_history[hist_cursor].val= hash_value;
    fd_history[hist_cursor].fd = table_fd;
    hist_cursor++; hist_cursor &= 0x7f;
  }
  return table_fd;
}

static int history_fd_close(void)
{
  for(size_t N=0; N<0x80; N++ )
  {
    if( fd_history[N].fd != -1 ) close( fd_history[N].fd );
    fd_history[N].val = (hashvalue_t)-1;
    fd_history[N].fd = -1;
  }
  return 0;
}

// 索引(アクセスパス)を作成する
static int index_insert(const RAW_INDEX item, rowid_t replace)
{
  static pthread_mutex_t fd_mutex  = PTHREAD_MUTEX_INITIALIZER;
  static const RAW_INDEX dummy[INSERT_THREADS_MAX] = {{0,0,0}};
  static rowid_t         rowid_max = 0;

  if( index_fd == -1 ) return 1;

  pthread_mutex_lock(&fd_mutex);
    if( rowid_max < item.linenum )
    {
      // 索引行(出力先)がないので追加
      lseek(index_fd, 0, SEEK_END);
      while( rowid_max < item.linenum )
      {
        write( index_fd, dummy, sizeof(dummy) );
        rowid_max += INSERT_THREADS_MAX;
      }
      fdatasync( index_fd );
    }

    if( replace > 0 )
    {
      // 重複データの無効化
      lseek(index_fd, (replace-1) * sizeof(RAW_INDEX) , SEEK_SET);
      write(index_fd, &dummy[0], sizeof(RAW_INDEX));
    }
    // 索引行の上書き
    lseek(index_fd, (item.linenum-1) * sizeof(RAW_INDEX), SEEK_SET);
    assert( write(index_fd, &item, sizeof(RAW_INDEX)) == sizeof(RAW_INDEX) );
  pthread_mutex_unlock(&fd_mutex);

  return 0;
}

// ハッシュテーブルへ一行追加
static int hash_insert(const HASH_RECORD ins_item)
{
  RAW_INDEX   index      = {0,0,0};
  hashvalue_t hash_value = 0;

  int         table_fd   = -1;
  off_t       seek       = 0;
  HASH_RECORD sel_buf[ HASH_READ_CACHE ], *sel_item  = NULL;
  ssize_t     read_size = 0;
  size_t      N,nblock  = 0;
  rowid_t     repline   = 0;

  /* ハッシュテーブルへ追加 */
  hash_value = calcHashValue(ins_item.text, sizeof(ins_item.text));

  pthread_mutex_lock( &hash_mutex[hash_value] );
    pthread_mutex_lock( &tc.mutex );
      table_fd = history_fd_open(hash_value, 1);
    pthread_mutex_unlock( &tc.mutex );
    lseek(table_fd, 0, SEEK_SET); seek = 0;
    while( (read_size = read( table_fd, sel_buf, sizeof(sel_buf) ) ) > 0 )
    {
      nblock   = read_size/sizeof(HASH_RECORD);
      sel_item = sel_buf;
      for(N=0; N<nblock; N++, sel_item ++)
      {
        // 一致するデータがあれば追加しない
        if( sel_item->length == ins_item.length )
        {
          if( strncmp( sel_item->text, ins_item.text, ins_item.length ) == 0 )
          {
            repline = sel_item->rowid;
            seek = lseek(table_fd, seek + N * sizeof(HASH_RECORD), SEEK_SET);
            break;
          }
        }
      }
      seek = lseek(table_fd, 0, SEEK_CUR);
    }
    assert( (write(table_fd, &ins_item, sizeof(HASH_RECORD))) == sizeof(HASH_RECORD) );
  pthread_mutex_unlock( &hash_mutex[hash_value] );

  /* 索引の追加 */
  index.linenum    = ins_item.rowid;
  index.hash_value = hash_value;
  index.seek       = seek;
  if( index_insert( index, repline) != 0 ) { return 1; };

  return 0;
}

static void *hash_insert_async(void *p)
{
  HASH_RECORD    item = *(HASH_RECORD *)p;

  free(p); // 内容のコピーがすめば用済み
  assert( hash_insert(item) == 0 );

  pthread_mutex_lock(&tc.mutex);
  assert( tc.now_threads > 0 );
  tc.now_threads --;
  pthread_cond_signal(&tc.cond_end);
  pthread_mutex_unlock(&tc.mutex);

  return NULL;
}

/* 入力ファイルからハッシュテーブルを作成 */
int analyze(const char *filename)
{
  FILE            *in    = NULL;
  HASH_RECORD     *item  = NULL;
  int             result = 0;

  pthread_t       thread;
  pthread_attr_t  attr;
  struct timespec ts;

  if( index_fd == -1 ) { return 1; }
  if( (in = fopen(filename, "r")) == NULL ){ return 1; }

  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  lseek(index_fd, 0, SEEK_SET);
  while( ! feof(in) )
  {
    item = (HASH_RECORD *)calloc(1, sizeof(HASH_RECORD) ); /* 開放は hash_insert_async()内 */
    if( item == NULL ) break;
    if( fgets(item->text, TEXT_LENGTH_MAX, in) != NULL )
    {
      pthread_mutex_lock(&tc.mutex);
        item->rowid  = ++tc.now_rowid;
        item->length = strlen( item->text );
        while( tc.now_threads >= INSERT_THREADS_MAX )
        {
          /* スレッド枠のあきまち*/
          ts.tv_sec = time(NULL) + 2;
          ts.tv_nsec= 0;
          pthread_cond_timedwait(&tc.cond_end, &tc.mutex, &ts);
        }
        assert( tc.now_threads < INSERT_THREADS_MAX );
        tc.now_threads ++;
        result = pthread_create(&thread, &attr, (void *(*)(void*))hash_insert_async, (void *)item);
        assert( result == 0 );
        if( result != 0 ) tc.now_threads --;
      pthread_mutex_unlock(&tc.mutex);
    }
  }
  /* 放置したスレッドの回収 */
  while( tc.now_threads > 0 )
  {
    ts.tv_sec = time(NULL) + 2;
    ts.tv_nsec= 0;
    pthread_mutex_lock(&tc.mutex);
      pthread_cond_timedwait(&tc.cond_end, &tc.mutex, &ts);
    pthread_mutex_unlock(&tc.mutex);
  }

  fsync(index_fd);
  history_fd_close();

  fclose(in); in = NULL;
  return 0;
}

/* ハッシュテーブルを結合して出力 */
int hash_join(const char *filename)
{
  int  out_fd   = -1;
  int  table_fd = -1;

  HASH_RECORD item             = {0,0,{'\0'}};
  hashvalue_t store_hash_value = (hashvalue_t)-1;

  RAW_INDEX   indexbuf[ INDEX_READ_CACHE ], *index = NULL;
  rowid_t     in_count  = 0,out_count = 0;
  ssize_t     read_size = -1;
  int         nblock    = 0;

  if( index_fd == -1 ) { return 1; }

  // 出力ファイルのオープン
  if( ( out_fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644) ) == -1 )
  {
    return 1;
  }

  // ハッシュテーブルを参照してデータ結合
  lseek(index_fd, 0, SEEK_SET);
  while( in_count < tc.now_rowid )
  {
    read_size = read( index_fd, indexbuf, sizeof(indexbuf) );
    if( read_size < 1 ) break;
    nblock = read_size / sizeof(RAW_INDEX);
    index  = indexbuf;
    for( ; nblock > 0; nblock--, index ++ )
    {
      in_count ++;
      if( index->linenum < 1 ) continue; // あとで上書きされてるので無視
      if( index->hash_value != store_hash_value )
      {
        table_fd = history_fd_open(index->hash_value, 0);
        store_hash_value = index->hash_value;
      }
      assert( table_fd > 0 );
      // ハッシュテーブルから出力ファイルへ1レコード出力
      lseek(table_fd, index->seek, SEEK_SET);
      assert( read(table_fd, &item, sizeof(HASH_RECORD)) == sizeof(HASH_RECORD));
      assert( write(out_fd, item.text, item.length) == item.length );
      out_count ++;
      if( in_count == tc.now_rowid ) break;
    }
    usleep(1000);
  }
  history_fd_close();
  close( out_fd );   out_fd   = -1;
  printf("input:%lld, output:%lld\n", in_count, out_count);
  return 0;
}

int setup(void)
{
  char cmd[256];
  char index_name [ sizeof(WORKDIR) + 32 ];

  snprintf(WORKDIR, sizeof(WORKDIR), "work.%d", getpid());
  snprintf(cmd, sizeof(cmd),"/usr/bin/mkdir -m 700 %s", WORKDIR);
  system(cmd);

  snprintf(index_name, sizeof(index_name), "%s/rawindex", WORKDIR);
  if( ( index_fd = open(index_name, O_RDWR | O_CREAT, 0600) ) == -1 ) { return 1; }

  for(size_t N=0; N<HASH_TABLES_MAX; N++){ pthread_mutex_init( &hash_mutex[N], NULL ); }
  for(size_t N=0; N<0x80; N++ )
  {
    fd_history[N].val = (hashvalue_t)-1;
    fd_history[N].fd = -1;
  }

  return 0;
}

int teardown(void)
{
  char cmd[256];

  assert( tc.now_threads == 0 );
 
  close( index_fd ); index_fd   = -1;
  snprintf(cmd, sizeof(cmd),"/usr/bin/rm -rf %s", WORKDIR);
  system(cmd);
  return 0;
}

int main( int argc, char *argv[] )
{
  int result = 1;
  if( argc < 3 ) printf("usage: %s in_file out_file\n", argv[0]);
  result = setup();
  result = analyze( argv[1] );
  result = hash_join( argv[2] );
  result = teardown();

printf("size of HASH_RECORD = %u\n", sizeof(HASH_RECORD) );
printf("size of RAW_INDEX   = %u\n", sizeof(RAW_INDEX) );

  return result;
}
